RocketMQ延时消息

news/2024/10/11 5:50:04/

RocketMQ消息发送基本示例(推送消费者)-CSDN博客

RocketMQ消费者主动拉取消息示例-CSDN博客

RocketMQ顺序消息-CSDN博客

RocketMQ广播消息-CSDN博客

延时消息:

延时消息实现的效果就是产者调用 producer.send 方法后,消息会立即发送到 Broker,并被存储在指定的队列中。RocketMQ 使用内部的延迟队列机制来实现延时消息。消息在到达 Broker 后,会根据设定的延迟级别放入相应的延迟队列。每个延迟级别对应一个特定的延迟时间(如 1 分钟、5 分钟等)。消息在延迟队列中等待,直到延迟时间过去。到达指定时间后,RocketMQ 会将消息移动到实际的消费队列中,这时消息才会对消费者可见。

预定日常定时发送:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h                    18个级别

可以修改broker.conf文件     messageDelayLevel=3s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

message.setDelayTimeLevel(3) 就是选定第3个等级

5.0版本以上支持 指定时间定时发送

message.setDelayTimeMs(10L) 指定时间定时发送.默认支持最大延迟时间3天.

在broker.conf种可以修改 timerMaxDelaySec=2592000   默认最大3天  72 小时

package com.example.rocketmqdemo.scheldule;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalTime;/*** 预定日程生产者* @author hrui* @date 2024/8/1 11:48*/
public class schelduleProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("schelduleProducer");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号Message message = new Message("scheldule", "Tag1", ("schelduleProducer" + i).getBytes(StandardCharsets.UTF_8));//在发送之前设置定时发送等级//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h//从1开始18个等级//message.setDelayTimeLevel(5);//交给Broker 1分钟后延迟发送给消费者//自定义时间发送  最大72小时message.setDelayTimeMs(30000L);//30秒producer.send(message);System.out.println("消息定时发送成功,已交给Broker:"+ LocalTime.now());}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

package com.example.rocketmqdemo.scheldule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.time.LocalDate;
import java.time.LocalTime;
import java.util.List;/*** @author hrui* @date 2024/8/1 11:55*/
public class ScheduleConsumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//订阅主题"Topic1",过滤标签为"*",表示接收所有消息consumer.subscribe("scheldule", "*");//设置消息监听器,处理接收到的消息//可以传入两种类型的监听器://1. MessageListenerOrderly(顺序消费):保证消息按顺序处理//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Thread.currentThread().getName());for (int i=0;i<list.size();i++){System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的System.out.println(LocalTime.now());}//返回消费状态,CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例,开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}


http://www.ppmy.cn/news/1503529.html

相关文章

IPython 日常使用小技巧

IPython 是一个强大的交互式 Python shell&#xff0c;可以提升你的编程效率和体验。以下是一些常用的 IPython 使用技巧&#xff1a; 一、基本使用 1.启动 IPython 在终端中输入以下命令启动 IPython&#xff1a; ipython2.自动补全 在 IPython 中&#xff0c;可以通过按…

基于单片机控制的家电产品硬件故障诊断

摘要&#xff1a; 在现阶段家用电子产品生产制造的过程中&#xff0c;需要应用到非常多的单片机以及单片机控制技术&#xff0c;单片机凭借着自身体积小、反应快、功耗低的优势迅速抢占了家电产品的市场。并且在单片机实际的应用过程中&#xff0c;通过对单片机控制技术的掌握可…

【视觉SLAM】 G2O库编写步骤介绍

介绍G2O&#xff0c;并阐述基本使用方法。 G2O以稀疏优化器&#xff08;SparseOptimizer&#xff09;为核心&#xff0c;分为图的构建与求解器构建两部分&#xff0c;分别对应该图的上下两部分。 G2O编程步骤共分七步&#xff0c;如图所示&#xff1a; 构建求解器 1、创建一…

零基础入门转录组数据分析——机器学习算法之SVM-RFE(筛选特征基因)

零基础入门转录组数据分析——机器学习算法之SVM-RFE&#xff08;筛选特征基因&#xff09; 目录 零基础入门转录组数据分析——机器学习算法之SVM-RFE&#xff08;筛选特征基因&#xff09;1. SVM-RFE基础知识2. SVM-RFE&#xff08;Rstudio&#xff09;——代码实操2. 1 数据…

721. 账户合并

721. 账户合并 题目链接&#xff1a;721. 账户合并 代码如下&#xff1a; //参考链接:https://leetcode.cn/problems/accounts-merge/solutions/564305/zhang-hu-he-bing-by-leetcode-solution-3dyq class UnionFind { public:vector<int> parent;UnionFind(int n){par…

协程的八种创建方式

协程简介 在深入了解创建方式之前&#xff0c;我们先简要回顾一下协程是什么。协程是轻量级的线程。它们在协作式多任务处理中运行&#xff0c;允许在不阻塞线程的情况下挂起和恢复。这使得协程非常适合进行异步编程和高性能的并发任务。&#x1f310; Kotlin中创建协程的方式…

VulnHub-Tomato靶机渗透教程 简单易懂 报错链接

Tomato靶机是一个用于渗透测试和漏洞研究的虚拟机。 环境准备 攻击机&#xff08;Kali Linux&#xff09;IP&#xff1a;192.168.252.134 目标机 IP&#xff1a;192.168.252.133 这里我两台虚拟机都是NAT模式 渗透步骤 1.端口扫描 这里我没用kali自带的 我用的物理机上…

你了解渗透测试吗?渗透测试在网络安全行业属于食物链什么层级?

可能大家对于行业存在食物链这一说法&#xff0c;印象最深的往往是编制行业&#xff0c;所谓的等级主义在那里确实更为明显。但实际上&#xff0c;各行各业都或多或少存在这种层级结构&#xff0c;网络安全领域也不例外&#xff0c;毕竟都是一些所谓的“人情世故”。那么就网络…