MQTT实现集群分布式消费

devtools/2024/12/22 14:06:12/

今天被问到启用多个应用消费时,每个消费者都会受到订阅消息的事。很久前用过,这里梳理记录一下:

MQTT协议本身是支持共享订阅功能。

这里这个共享订阅比较特殊,他有点类似kafka的消费组的概念。但是设计和实现上区别比较大。

设计理念

MQTT 共享订阅:

        主要是为了在 MQTT 的发布 / 订阅模式下,实现多个订阅者对同一主题消息的高效共享和负载均衡,确保消息能够被多个订阅者均匀接收和处理,同时避免消息的重复消费。


Kafka 消费组:

        基于 Kafka 的分布式消息队列架构设计,旨在实现多个消费者对多个主题分区消息的并行消费,通过消费组内的消费者协调机制,提高消息处理的吞吐量和可扩展性。


实现方式

MQTT 共享订阅:

        通过在订阅主题前添加 “$share/<group>/<topic>” 的形式来标识共享订阅组,其中<group>为共享订阅组的名称,<topic>为实际的订阅主题。当消息发布到该主题时,MQTT broker 会根据一定的算法将消息分配到不同的订阅者,实现负载均衡。


Kafka 消费组:

        Kafka 将每个主题划分为多个分区,每个分区中的消息是有序的。消费者以消费组的形式进行组织,同一消费组内的消费者共同消费主题的所有分区。每个分区在同一时刻只能被一个消费者消费,不同消费组之间相互独立,互不影响。


消息分配策略

MQTT 共享订阅:

        通常采用轮询或随机等简单的分配策略,将消息依次或随机分配给不同的订阅者。分配过程相对简单直接,主要考虑的是在多个订阅者之间实现基本的负载均衡。


Kafka 消费组:

        采用的是基于分区分配策略,如范围分配、轮询分配等。根据消费者数量和分区数量的关系,将分区合理分配给不同的消费者,确保每个消费者能够均匀地处理消息,同时充分利用分区的并行性。


消费语义

MQTT 共享订阅:

        一般遵循 “最多一次” 或 “至少一次” 的消息传递语义。在 “最多一次” 语义下,消息可能会丢失;在 “至少一次” 语义下,消息可能会被重复接收和处理,需要应用层自行处理去重等问题。


Kafka 消费组:

支持多种消费语义,如 “最多一次”“至少一次”“精确一次” 等。通过配置不同的参数和使用 Kafka 提供的事务机制等,可以实现不同的消费语义,满足不同应用场景的需求。


应用场景

MQTT 共享订阅:

        适用于对实时性要求较高、消息量相对较小且需要多个订阅者共享消息的场景,如物联网设备数据的实时监控和分发,多个客户端需要同时接收并处理设备上报的实时数据。

Kafka 消费组:

        更适合处理大规模的消息流,对消息的吞吐量和可扩展性要求较高的场景,如实时日志收集与分析、大数据流处理等,能够支持多个消费者并行处理大量的消息数据。

以上对比了下mqtt和kafka的实现区别。这里回归mqtt共享订阅的实现。

MQTT共享订阅实现

        首先,MQTT共享订阅需要EMQx版本支持,EMQx5.0以上版本默认支持,一下版本需要部分需要配置,部分无法支持,这个需要的去官网确认。

        

然后就是上面介绍的共享订阅的规则。这边其实是订阅时路径的约定:

“$share/<group>/<topic>”

需要用"$share"标识是共享订阅,后面紧跟的第一个单词是group分组名称,在后面可以和普通topic一样使用。

需要注意的是,订阅的时候采用这个格式,但是发布消息的时候,topic需要正常设置。不需要配置“$share/<group>/”的信息。

共享订阅策略,就是配置是轮训、或者加权轮询、或者随机等负载方案了。

注意,这里推荐配合qos使用,避免消息的丢失。

以上。


http://www.ppmy.cn/devtools/144393.html

相关文章

算法日记 49 day 图论(A*算法)

这算是算法的最后一篇了&#xff0c;原本A*之前还有一些相关的最短路径算法的&#xff0c;比如dijkstra的堆优化&#xff0c;SPFA等等&#xff0c;但是有些我没看懂&#xff0c;就不写了&#xff0c;用A*做个结尾。 题目&#xff1a;骑士的攻击 127. 骑士的攻击 (kamacoder.co…

35. Three.js案例-创建带阴影的球体与平面

35. Three.js案例-创建带阴影的球体与平面 实现效果 知识点 WebGLRenderer WebGLRenderer 是Three.js中用于渲染场景的主要类之一&#xff0c;它负责将场景中的对象渲染到画布上。 构造器 new THREE.WebGLRenderer(parameters : Object) 参数类型描述parametersObject可选…

CPU性能优化-磁盘空间和解析时间

即使考虑了跟踪文件的压缩格式&#xff0c;编码后的数据仍然会占用很大的磁盘空间。通常&#xff0c;每条指令不超过1字节&#xff0c;但是考虑到CPU执行指令的速度&#xff0c;数据仍然非常多。根据负载&#xff0c;CPU编码以100MB/s的速度处理PT跟踪文件的情况是很常见的&…

怎么给视频加上背景音乐和文字?适合新手

在当今的视频创作领域&#xff0c;给视频添加背景音乐和文字能够极大地提升视频的吸引力与表现力。下面就为大家详细介绍如何借助便捷的剪辑工具&#xff0c;完成视频的背景音乐与文字添加工作。 工具&#xff1a;影忆 1.前期筹备要点 1.1背景音乐的筛选 背景音乐与视频主题及情…

机器学习经典算法(scikit-learn)

安装库&#xff1a;pip install scikit-learn numpy 线性回归 (Linear Regression) import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from sklearn.datasets impor…

CLION中运行远程的GUI程序

在CLION中运行远程GUI程序&#xff0c;很有可能会遇到下面错误 Gtk-WARNING **: cannot open display: 这是因为远程的GUI程序不能再本地机器上显示。这个问题一般有两种解决方法 通过SSH的ForwardX11的方法&#xff0c;就是将远程的GUI程序显示到本地机器上&#xff0c;一般在…

前端配置跨域的详细指南

在现代Web开发中&#xff0c;跨域资源共享&#xff08;CORS, Cross-Origin Resource Sharing&#xff09;是一个非常重要的概念。浏览器出于安全考虑&#xff0c;默认情况下不允许跨域请求。如果你需要在前端与不同源的服务器进行通信&#xff0c;就必须配置跨域支持。本文将介…

redis 在 win10中的使用

执行以下命令安装redis服务 redis-server.exe --service-install redis.windows.conf --loglevel verbose