MQTT实现集群分布式消费

embedded/2024/12/22 13:58:57/

今天被问到启用多个应用消费时,每个消费者都会受到订阅消息的事。很久前用过,这里梳理记录一下:

MQTT协议本身是支持共享订阅功能。

这里这个共享订阅比较特殊,他有点类似kafka的消费组的概念。但是设计和实现上区别比较大。

设计理念

MQTT 共享订阅:

        主要是为了在 MQTT 的发布 / 订阅模式下,实现多个订阅者对同一主题消息的高效共享和负载均衡,确保消息能够被多个订阅者均匀接收和处理,同时避免消息的重复消费。


Kafka 消费组:

        基于 Kafka 的分布式消息队列架构设计,旨在实现多个消费者对多个主题分区消息的并行消费,通过消费组内的消费者协调机制,提高消息处理的吞吐量和可扩展性。


实现方式

MQTT 共享订阅:

        通过在订阅主题前添加 “$share/<group>/<topic>” 的形式来标识共享订阅组,其中<group>为共享订阅组的名称,<topic>为实际的订阅主题。当消息发布到该主题时,MQTT broker 会根据一定的算法将消息分配到不同的订阅者,实现负载均衡。


Kafka 消费组:

        Kafka 将每个主题划分为多个分区,每个分区中的消息是有序的。消费者以消费组的形式进行组织,同一消费组内的消费者共同消费主题的所有分区。每个分区在同一时刻只能被一个消费者消费,不同消费组之间相互独立,互不影响。


消息分配策略

MQTT 共享订阅:

        通常采用轮询或随机等简单的分配策略,将消息依次或随机分配给不同的订阅者。分配过程相对简单直接,主要考虑的是在多个订阅者之间实现基本的负载均衡。


Kafka 消费组:

        采用的是基于分区分配策略,如范围分配、轮询分配等。根据消费者数量和分区数量的关系,将分区合理分配给不同的消费者,确保每个消费者能够均匀地处理消息,同时充分利用分区的并行性。


消费语义

MQTT 共享订阅:

        一般遵循 “最多一次” 或 “至少一次” 的消息传递语义。在 “最多一次” 语义下,消息可能会丢失;在 “至少一次” 语义下,消息可能会被重复接收和处理,需要应用层自行处理去重等问题。


Kafka 消费组:

支持多种消费语义,如 “最多一次”“至少一次”“精确一次” 等。通过配置不同的参数和使用 Kafka 提供的事务机制等,可以实现不同的消费语义,满足不同应用场景的需求。


应用场景

MQTT 共享订阅:

        适用于对实时性要求较高、消息量相对较小且需要多个订阅者共享消息的场景,如物联网设备数据的实时监控和分发,多个客户端需要同时接收并处理设备上报的实时数据。

Kafka 消费组:

        更适合处理大规模的消息流,对消息的吞吐量和可扩展性要求较高的场景,如实时日志收集与分析、大数据流处理等,能够支持多个消费者并行处理大量的消息数据。

以上对比了下mqtt和kafka的实现区别。这里回归mqtt共享订阅的实现。

MQTT共享订阅实现

        首先,MQTT共享订阅需要EMQx版本支持,EMQx5.0以上版本默认支持,一下版本需要部分需要配置,部分无法支持,这个需要的去官网确认。

        

然后就是上面介绍的共享订阅的规则。这边其实是订阅时路径的约定:

“$share/<group>/<topic>”

需要用"$share"标识是共享订阅,后面紧跟的第一个单词是group分组名称,在后面可以和普通topic一样使用。

需要注意的是,订阅的时候采用这个格式,但是发布消息的时候,topic需要正常设置。不需要配置“$share/<group>/”的信息。

共享订阅策略,就是配置是轮训、或者加权轮询、或者随机等负载方案了。

注意,这里推荐配合qos使用,避免消息的丢失。

以上。


http://www.ppmy.cn/embedded/147837.html

相关文章

扩展SpringBoot中的SpringMVC的默认配置

SpringBoot默认已经给我们做了很多SpringMVC的配置&#xff0c;哪些配置&#xff1f; 视图解析器ViewResolver静态资料的目录默认首页index.html图标名字和图标所在目录&#xff0c;favicon.ico类型转换器Converter&#xff0c;格式转换器的Formatter消息转换器HttpMessageCon…

java中随机数的生成

随机数的生成 方法一(使用Math.random方法)方法二(使用Random类) 方法一(使用Math.random方法) Math.random()方法就是专门生成一个0~1之间随机数的方法. 范围:[0,1). //生成[0,10)之间随机数.int num (int) (Math.random() * 10);System.out.println(num);方法二(使用Random…

【ETCD】【实操篇(二)】如何从源码编译并在window上搭建etcd集群?

要在 Windows 上编译 etcd 及 etcdctl 工具&#xff0c;并使用 bat 脚本启动 etcd 集群&#xff0c;首先需要准备好开发环境并确保依赖项正确安装。下面是从 etcd 3.5 源码开始编译和启动 etcd 集群的详细步骤&#xff1a; 目录 1. 安装 Go 环境2. 获取 etcd 源码3. 编译 etcd…

关于electron项目运行时,只编译渲染进程,不编译主进程问题

现象 编译到此处卡住没有报错也没下文 看下命令执行后操作 找到对应命令后操作 这里的await 没有收到完成信号导致 再看vue-cli-service 命令 此处发现是这个方法报错导致 再看这个方法 结论 所以项目不能存在 yarn.lock 而去使用npm&#xff0c;vuecli会优先执行yarn 命令&a…

Mac配置 Node镜像源的时候报错解决办法

在Mac电脑中配置国内镜像源的时候报错,提示权限问题,无法写入配置文件。本文提供解决方法,青测有效。 一、原因分析 遇到的错误是由于 .npm 目录下的文件被 root 用户所拥有,导致当前用户无法写入相关配置文件。 二、解决办法 在终端输入以下命令,输入管理员密码即可。 su…

C++9--前置++和后置++重载,const,日期类的实现(对前几篇知识点的应用)

目录 1.前置和后置重载 2.const成员 3.日期类的实现 1.前置和后置重载 #include<iostream> using namespace std;class Date { public:Date(int year 2024, int month 1, int day 1){_year year;_month month;_day day;}//前置&#xff1a;返回1之后的结果//注意…

sqlite基础

在 SQLite 中&#xff0c;可以使用 CREATE INDEX 语句为表中的字段添加索引&#xff0c;以加速查询操作。 1. 为单个字段添加索引 假设有一个表 users&#xff0c;并且你想为 email 字段创建索引&#xff1a; CREATE INDEX idx_users_email ON users(email);这条语句会为 us…

go-zero(十四)实践:缓存一致性保证、缓存击穿、缓存穿透与缓存雪崩解决方案

go zero 实践&#xff1a;缓存一致性保证、缓存击穿、缓存穿透与缓存雪崩解决方案 缓存 作为一种重要的技术手段&#xff0c;可以有效提高系统的响应速度&#xff0c;降低对数据库的压力。但是缓存的使用伴随一些常见问题&#xff0c;如缓存一致性、缓存击穿、缓存穿透和缓存雪…