kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ producer 实战

ops/2024/9/24 13:21:46/

1、MQTT介绍

MQTT (Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,设计用于连接低带宽、高延迟或不可靠网络的设备。

MQTT 是基于发布/订阅模式(Publish/Subscribe)的协议,其中设备可以发布消息到一个主题(Topic),其他设备可以订阅这个主题以接收相关消息。这种模式使得设备之间可以进行灵活的通信,而不需要直接连接到彼此。

常见的支持支持MQTT的中间件有RabbitMQ和ActiveMQ。

2、今天我们基于RabbitMQ来进行学习下MQTT 生产者,如下图所示:

3、通过步骤【生成记录】生成10条记录,记录中有一个message字段,类型为String字符串,message的值为我是java小金刚,如下图所示:

4、通过步骤【MQTT producer】将message值推送到RabbitMQ中供其他应用消费。

Step name:自定义步骤名称

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Specify topic:选择“指定topic”以输入特定的主题名称,静态指定。

Get data from field:动态指定topic。

Topic name:在主题名称字段中,输入您希望发布流数据(消息)的 MQTT 主题的名称。每个 MQTT 生产者步骤将启动一个单独的线程进行发布。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

Message field:设置消息字段,来源于前置步骤,下拉选择需要的字段。

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的秘密。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

Keep Alive Interval:指定在 PDI 客户端完成传输一个控制数据包和开始传输下一个数据包之间允许经过的最大间隔秒数。

Max Inflight:指定在任何给定时间点处理中的最大消息数量。

Connection Timeout:指定在未收到消息时断开连接的时间,以秒为单位。

Clean Session:指定broker是否会存储或清除会话的消息。请选择以下之一。

True
当设置为 True 时,经纪人不会存储客户端的任何信息。所有来自先前持久会话的信息都将被清除。
False
当设置为 False 时,经纪人将存储客户端的所有订阅。当 QoS(服务质量)参数设置为 1 或 2 时,所有未接收的消息将被存储。

Storage Level:消息是存储在内存中还是在磁盘上的。

默认(留空)是内存。
对于磁盘,请输入有效路径。

Server URIs:指定 MQTT 服务器的统一资源标识符(URI),如mqtt://example.com:1883

MQTT Version:请指定此步骤连接到的 MQTT 协议版本,如MQTT 3.1.1

Automatic Reconnect:客户端在与服务器断开连接时尝试自动重新连接。请选择 True 或 False:

True
是,尝试重新连接到服务器。
False
否,不尝试重新连接到服务器。

 5、RabbitMQ MQTT协议配置,

1)需要先安装然RabbitMQ。

2)启用MQTT插件,通过命令rabbitmq-plugins enable rabbitmq_mqtt进行启动。

3)开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。

 6、spoon中运行转换文件,此时发现RabbitMQ中已经成功写入数据,如下图所示:


http://www.ppmy.cn/ops/13107.html

相关文章

c# 连接数据库、excel数据批量导入到数据库

string str $"select from TBa where ... ";DataSet ds new DataSet();using (SqlConnection conn new SqlConnection("server000.000.0.000;database数据库名;user id登录的用户名;password密码;Poolingtrue")){try{conn.Open();SqlCommand com new Sq…

如何实现redis的高可用?

1.主从模式:就是一个住节点,多个从节点,但是弊端是一个主节点崩了,需要手动的切换从节点,这个挺麻烦的 2.哨兵模式:就是为了解决主从模式的弊端从而在主从模式的基础上加了哨兵,从而可以实现自动…

基于 Flexbox 的纯 CSS 框架:兼容性好、文档丰富 | 开源日报 No.232

jgthms/bulma Stars: 48.3k License: MIT bulma 是基于 Flexbox 的现代 CSS 框架。 基于 Flexbox 技术。提供快速安装方式,支持 NPM、Yarn 和 Bower。仅包含 CSS 文件,没有 JavaScript 部分。兼容性良好,在主流浏览器上运行良好。提供丰富的…

Linux内核驱动开发-字符设备驱动框架

1前置条件 &#xff08;1&#xff09;【linux】内核编译结束 &#xff08;2&#xff09;【linux】目录配置跳转文件&#xff1a;补充&#xff1a;配置的跳转文件只能在【linux】目录下使用&#xff0c;子目录无法使用2驱动框架 2.1编写驱动程序 #include <linux/init.h&g…

JPA Example 默认 join

起因&#xff1a; 由于同事增加了一个对象关联。并且采用了Example查询&#xff0c;导致了一个Null的问题。 主表&#xff1a;BoBookingorder 关联表&#xff1a;BoJobcontainerinfo 一 关联如下&#xff1a; Entity Table(name "bo_bookingorder",catalog &quo…

阿斯达年代记三强争霸账号怎么注册 游戏账号注册教程分享

即将于4月24日隆重推出的《阿斯达时代&#xff1a;三巨头对决》这款大规模多人在线角色扮演游戏巨制&#xff0c;是由Netmarble公司携手STUDIO DRAGON联手创作。游戏中&#xff0c;围绕着阿斯大陆的主权掌控&#xff0c;三个主要阵营——阿斯达、亚高及非法者组织之间展开了扣人…

比赛记录:Codeforces Round 940 (Div. 2) and CodeCraft-23 A~E

传送门:CF [前题提要]:感觉这场题目其实都很经典.遗憾的是赛时C题答案统计看成了不同的下棋的方案数,然后以为刚开始不能放不是一种答案(直接特判输出了0),卡了一场比赛.幸好最后15min险过CD,不然掉大分了 A. Stickogon 难点在于读懂题意. 读懂题意之后不难发现每根木棒都形成…

【机器学习】科学库使用第5篇:Matplotlib,学习目标【附代码文档】

机器学习&#xff08;科学计算库&#xff09;完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;机器学习&#xff08;常用科学计算库的使用&#xff09;基础定位、目标&#xff0c;机器学习概述定位,目标,学习目标,学习目标,1 人工智能应用场景,2 人工智能小…