kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

server/2024/11/15 5:00:01/

1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

 2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

Step name:自定义步骤名称。

Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

 3、安全验证设置,如下图所示:

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的密码。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

 3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

Duration(ms):
请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Number of records
指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Maximum concurrent batches

指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


Message prefetch limit
请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

 4、字段设置,这里采用默认值就行了,不用调整。

5、子转换结果字段设置,选择子转换返回数据的步骤。 

 6、同上一节课,本次不再介绍。

 7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。


http://www.ppmy.cn/server/7834.html

相关文章

深入理解计算机网络:从基本原理到实践应用

前言: 计算机网络是现代信息技术的基石,它连接了全球数以亿计的设备,使得信息传输和资源共享成为可能。本文将从计算机网络的基本原理出发,深入探讨其关键技术,并分享一些实践应用的经验。 一、计算机网络的基本原理 1…

打破国外垄断|暴雨发布纯血国产电脑

要说现在国产手机这边已然进入纯自研模式,但电脑这边却还是仍未打破国外技术垄断。但就在刚刚,暴雨发布自研架构台式机open Station X ,这是纯血鸿蒙系统之后国产又一款纯血产品发布!标志的我们已经彻底打破西方在硬件及软件方面的…

安装IntelliJ IDEA

文章目录 一、前言二、下载IDEA三、安装四、破解 一、前言 工欲善其事必先利其器,学习JAVA的第一步,首先是安装IDE,配置环境; 常用的JAVA IDE是IntelliJ IDEA和eclipse,我选择IntelliJ IDEA 二、下载IDEA 官网下载&…

nginx 导致websocket无法连接的解决办法

答:在config配置文件中 map $http_upgrade $connection_upgrade { default upgrade; close; } server { listen 443 ssl; server_name your_domain.com; ssl_certificate /path/to/ssl_certificate.crt; ssl_certificate_key /path/to/…

Pytorch的下载安装

本文为自己整理的Pytorch下载相关的内容笔记,以便日后查阅 一. 基本命令 1.查看conda版本 conda --version2.创建conda新环境 conda create –n 名称 python版本3.查看已经创建的conda环境 conda info --envs4.进入虚拟环境 conda activate 环境名称 为了避免…

WebSocket 快速入门 - springboo聊天功能

目录 一、概述 1、HTTP(超文本传输协议) 2、轮询和长轮询 3、WebSocket 二、WebSocket快速使用 1、基于Java注解实现WebSocket服务器端 2、JS前端测试 三、WebSocket进阶使用 1、如何获取当前用户信息 2、 后端聊天功能实现 一、概述 HTTP…

Java学习-详述main方法、可变参数、数组的工具类、二维数组

详述main方法 【1】main方法:程序的入口,在同一个类中,如果有多个方法,那么虚拟机就会识别main方法,从这个方法作为程序的入口 【2】main方法格式严格要求: public static void main(String[] args){} p…

【计算机网络】(二)计算机网络的体系结构与参考模型

目录 【计算机网络】(二)计算机网络的体系结构与参考模型 前言 2.1 计算机网络体系结构的形成 2.2 实体、协议、服务、服务访问点 2.2.1 实体 2.2.2 协议 2.2.3 服务 2.2.4 服务访问点 2.3 OSI的体系结构 2.3.1 物理层 2.3.2 数据链路层 2.3.3 网络…