Flink Sql Redis Connector 新版本来袭

ops/2024/10/10 13:42:05/

1.新版本功能和性能介绍

1.1 流批一体

新版本使用了Flink最新的Source接口和SinkWriter接口,可以使用一套代码完成流式读取数据和批量读取数据

1.2 吞吐量大

新版本使用jedispipline和jedisClusterPipeline对数据进行写入和读取,每分钟可以达到千万级别的数据写入或者读取,且对机器要求较低

1.3 兼容所有版本的Flink

新版本使用新的接口重写不但可以适用旧版本的Flink,也兼容新版本的Flink

2.使用方式

使用方式还是和之前版本一样,但是新增了一些连接参数

1.使用案例和讲解
1.读取数据案例
CREATE TABLE orders (`order_id` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hgetall','key' = 'orders'
);select * from orders#集群模式
create table redis_sink (
site_id STRING,
inverter_id STRING,
start_time STRING,
PRIMARY KEY(site_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'cluster',
'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003',
'password' = '123123',
'command' = 'hgetall',
'key' = 'site_inverter'
)cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3注:redis表必须定义主键,可以是单个主键,也可以是联合主键以下为sql读取结果,直接将redis数据解析成我们需要的表格形式2.写入数据案例
1. generate source data
CREATE TABLE order_source (`order_number` BIGINT,`price` DECIMAL(32,2),`order_time` TIMESTAMP(3),PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);2. define redis sink table CREATE TABLE orders (`order_number` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hmset','key' = 'orders'
);3. insert data to redis sink table (cast data type to string)insert into redis_sinkselectcast(order_number as STRING) order_number,cast(price as STRING) price,cast(order_time as STRING) order_timefrom orders

3.新增的连接参数

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter
sink.parallelismoptional1Intsink data parallelism
sink.delivery-guaranteeoptionalAT_LEAST_ONCEEnumAT_LEAST_ONCE or EXACTLY_ONCE
sink.buffer-flush.max-rowsoptional1000Intsink data row size
sink.buffer-flush.intervaloptional1sdurationSpecifies the batch flush interval

4.代码地址

Github:  https://github.com/niuhu3/flink_sql_redis_connector/tree/0.1.0

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

后面会给大家更新写这个连接器的思路,也会试着去更新新的连接器。


http://www.ppmy.cn/ops/90165.html

相关文章

代码随想录算法训练营day36|动态规划part04

第一题&#xff1a;1049. Last Stone Weight II 一维数组版本 class Solution {public int lastStoneWeightII(int[] stones) {int sum 0;for (int i : stones) {sum i;}int target sum >> 1;//初始化dp数组int[] dp new int[target 1];for (int i 0; i < sto…

【深度学习】【语音】TTS,StyleTTS 2,论文

StyleTTS 2 是一款创新的文本转语音(TTS)模型,通过使用样式扩散和大规模语音语言模型(SLM)的对抗训练,实现了接近人类水平的TTS合成。以下是StyleTTS 2在技术上的几个关键点和其在性能上的突出表现: 技术重点 样式扩散(Style Diffusion): StyleTTS 2 将语音样式建模…

中建海龙科技模块化集成建筑(MiC建筑):高效省时,建筑新选择

在当今快速发展的建筑行业中&#xff0c;时间成本往往成为制约项目进度的关键因素。中建海龙科技凭借其原创的模块化集成建筑&#xff08;MiC建筑&#xff09;技术&#xff0c;不仅实现了建筑的高质量、高效率&#xff0c;更在节省时间方面展现出了显著优势。 模块化集成建筑&…

WriterSide 文档、接口自动编译并部署到GitPage

WriterSide 自动编译并部署到GitPage 1. GitHub 创建空仓库2. 配置GitHub 仓库的编译部署方式3. WriteSide 创建项目4. 创建自动、编译部署配置文件5. 自动编译、部署1. GitHub 创建空仓库 在 GitHub 创建一个空的仓库 仓库创建成功后, 记录仓库的远程地址 仓库地址需要修改…

html+css前端作业和平精英2个页面(无js)

htmlcss前端作业和平精英2个页面&#xff08;无js&#xff09;有视频播放器等功能效果 网页作品代码简单&#xff0c;可使用任意HTML编辑软件&#xff08;如&#xff1a;Dreamweaver、HBuilder、Vscode 、Sublime 、Webstorm、Text 、Notepad 等任意html编辑软件进行运行及修改…

架构师软考-每日两道单选题6

第11题 单选题 在软件系统工具中&#xff0c;版本控制工具属于&#xff08; &#xff09;&#xff0c;软件评价工具属于&#xff08;/&#xff09;。 A 软件开发工具 B 软件维护工具 C 编码与排错工具 D 软件管理和软件支持工具 解析 在软件系统工具中&#xff0c;版本控制工…

【C++入门(下)】—— 我与C++的不解之缘(二)

前言 接上篇&#xff0c;继续来学习C&#xff0c;本篇内容大概有 引用&#xff0c;inline 和 nullptr。 六、引用&#xff1a; 6.1、引用的定义 引用不是新定义一个变量&#xff0c;而是给已存在的变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空间&#xff0c;它…

数模——灰色关联分析算法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 文章目录 前言 一、基本概念了解 1.什么是灰色系统&#xff1f; 2.什么是关联分析&#xff1f; 二、模型原理 三、建模过程 1.找母序列&#xff08;参考序列&am…