使用redis 的stream 做消息中间件 多线程消费消息

ops/2024/12/13 10:08:57/

redis_stream__0">1.redis stream 特点

1.支持消息持久化
2.消费者组模式
3.消息确认机制
4. 消息重试机制
5. 死信队列

2. 消息生产者服务

2.1 如下代码
java">@Service
@Slf4j
public class StreamMessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";/*** 发送消息*/public String sendMessage(String topic, Object message) {try {StringRecord record = StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(message))).withStreamKey(STREAM_KEY + ":" + topic);RecordId recordId = redisTemplate.opsForStream().add(record);log.info("消息发送成功: topic={}, messageId={}", topic, recordId);return recordId.getValue();} catch (Exception e) {log.error("消息发送失败: topic={}, message={}", topic, message, e);throw new RuntimeException("消息发送失败", e);}}/*** 批量发送消息*/public List<String> sendMessages(String topic, List<Object> messages) {try {List<MapRecord<String, String, String>> records = messages.stream().map(msg -> StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(msg))).withStreamKey(STREAM_KEY + ":" + topic)).collect(Collectors.toList());List<String> messageIds = new ArrayList<>();for (MapRecord<String, String, String> record : records) {RecordId recordId = redisTemplate.opsForStream().add(record);messageIds.add(recordId.getValue());}log.info("批量消息发送成功: topic={}, count={}", topic, messageIds.size());return messageIds;} catch (Exception e) {log.error("批量消息发送失败: topic={}", topic, e);throw new RuntimeException("批量消息发送失败", e);}}
}

3.消息消费者服务(多线程消费)

3.1 代码如下
java">@Service
@Slf4j
public class StreamMessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private final Map<String, StreamMessageHandler<?>> handlers = new ConcurrentHashMap<>();/*** 注册消息处理器*/public <T> void registerHandler(String topic, Class<T> messageType, Consumer<T> handler) {handlers.put(topic, new StreamMessageHandler<>(messageType, handler));}/*** 启动消费*/@PostConstructpublic void startConsuming() {for (String topic : handlers.keySet()) {String streamKey = STREAM_KEY + ":" + topic;String consumerGroup = "group:" + topic;String consumerName = "consumer:" + UUID.randomUUID().toString();try {// 创建消费者组(如果不存在)createConsumerGroupIfNotExists(streamKey, consumerGroup);// 启动消费线程Thread consumerThread = new Thread(() -> consumeMessages(streamKey, consumerGroup, consumerName, topic));consumerThread.setName("stream-consumer-" + topic);consumerThread.start();} catch (Exception e) {log.error("启动消费者失败: topic={}", topic, e);}}}private void createConsumerGroupIfNotExists(String streamKey, String groupName) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {// 组已存在,忽略异常log.debug("Consumer group already exists: {}", groupName);}}private void consumeMessages(String streamKey, String group, String consumer, String topic) {StreamMessageHandler<?> handler = handlers.get(topic);while (!Thread.currentThread().isInterrupted()) {try {// 读取消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(Consumer.from(group, consumer),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (records != null && !records.isEmpty()) {for (MapRecord<String, String, String> record : records) {try {// 处理消息processMessage(record, handler);// 确认消息redisTemplate.opsForStream().acknowledge(streamKey, group, record.getId());} catch (Exception e) {log.error("消息处理失败: messageId={}", record.getId(), e);}}}} catch (Exception e) {log.error("消息消费异常: topic={}", topic, e);try {Thread.sleep(1000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}}private <T> void processMessage(MapRecord<String, String, String> record, StreamMessageHandler<T> handler) {try {String messageJson = record.getValue().get("message");T message = JSON.parseObject(messageJson, handler.getMessageType());handler.getHandler().accept(message);} catch (Exception e) {log.error("消息处理失败: {}", record, e);throw e;}}
}@Data
@AllArgsConstructor
class StreamMessageHandler<T> {private Class<T> messageType;private Consumer<T> handler;
}

4.消息重试服务

4.1 代码如下
java">@Service
@Slf4j
public class StreamMessageRetryService {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private static final int MAX_RETRY_COUNT = 3;/*** 处理待处理的消息*/@Scheduled(fixedDelay = 60000) // 每分钟执行一次public void processPendingMessages() {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;String groupName = "group:" + topic;try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);if (pending != null) {for (PendingMessage message : pending.getPendingMessages()) {processRetry(streamKey, groupName, message);}}} catch (Exception e) {log.error("处理待处理消息失败: topic={}", topic, e);}}}private void processRetry(String streamKey, String groupName, PendingMessage message) {try {if (message.getTotalDeliveryCount() > MAX_RETRY_COUNT) {// 超过重试次数,移动到死信队列moveToDeadLetter(streamKey, groupName, message.getIdAsString());} else {// 重新投递消息redisTemplate.opsForStream().claim(streamKey, groupName, "retry-consumer", Duration.ofMinutes(1), message.getIdAsString());}} catch (Exception e) {log.error("处理重试消息失败: messageId={}", message.getIdAsString(), e);}}private void moveToDeadLetter(String streamKey, String groupName, String messageId) {try {// 读取消息内容List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream().range(streamKey, Range.closed(messageId, messageId));if (messages != null && !messages.isEmpty()) {MapRecord<String, String, String> message = messages.get(0);// 存储到死信队列redisTemplate.opsForStream().add(streamKey + ":dead", message.getValue());// 确认原消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageId);}} catch (Exception e) {log.error("移动消息到死信队列失败: messageId={}", messageId, e);}}
}

5.使用示例

5.1代码如下
java">@Service
@Slf4j
public class MessageService {@Autowiredprivate StreamMessageProducer producer;@Autowiredprivate StreamMessageConsumer consumer;@PostConstructpublic void init() {// 注册订单消息处理器consumer.registerHandler("order", OrderMessage.class, this::processOrderMessage);// 注册支付消息处理器consumer.registerHandler("payment", PaymentMessage.class, this::processPaymentMessage);}/*** 发送订单消息*/public String sendOrderMessage(OrderMessage message) {return producer.sendMessage("order", message);}/*** 处理订单消息*/private void processOrderMessage(OrderMessage message) {try {log.info("处理订单消息: {}", message);// 处理订单逻辑} catch (Exception e) {log.error("订单消息处理失败", e);throw e;}}/*** 处理支付消息*/private void processPaymentMessage(PaymentMessage message) {try {log.info("处理支付消息: {}", message);// 处理支付逻辑} catch (Exception e) {log.error("支付消息处理失败", e);throw e;}}
}@Data
@AllArgsConstructor
class OrderMessage {private String orderId;private String status;private BigDecimal amount;
}@Data
@AllArgsConstructor
class PaymentMessage {private String paymentId;private String orderId;private BigDecimal amount;private String status;
}

6.监控和管理服务

6.1 代码如下
java">@Service
@Slf4j
public class StreamMonitorService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 获取Stream信息*/public StreamInfo getStreamInfo(String topic) {try {String streamKey = STREAM_KEY + ":" + topic;StreamInfo.StreamInfoBuilder builder = StreamInfo.builder();// 获取消息总数Long length = redisTemplate.opsForStream().size(streamKey);builder.messageCount(length != null ? length : 0);// 获取消费者组信息StreamInfo.GroupInfo groupInfo = getGroupInfo(streamKey);builder.groupInfo(groupInfo);// 获取最新消息IDbuilder.lastMessageId(getLastMessageId(streamKey));return builder.build();} catch (Exception e) {log.error("获取Stream信息失败: topic={}", topic, e);throw new RuntimeException("获取Stream信息失败", e);}}/*** 清理过期消息*/@Scheduled(cron = "0 0 * * * *") // 每小时执行public void cleanupOldMessages() {try {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;// 保留最近24小时的消息long maxLength = 1000000; // 最大消息数redisTemplate.opsForStream().trim(streamKey, maxLength, true);}} catch (Exception e) {log.error("清理过期消息失败", e);}}
}@Data
@Builder
class StreamInfo {private long messageCount;private String lastMessageId;private GroupInfo groupInfo;@Data@Builderstatic class GroupInfo {private String name;private int consumerCount;private long pendingMessageCount;}
}

http://www.ppmy.cn/ops/141513.html

相关文章

vue2-代理打包问题;CORS针对AJAX 请求,而不适用于资源请求

打包后请求被转发出现问题&#xff08;如返回 405 Method Not Allowed&#xff09;&#xff0c;通常是由以下原因导致的&#xff1a; 1. 代理配置未生效 原因分析 在开发环境中&#xff0c;Vue CLI 的 devServer.proxy 仅在本地开发服务器&#xff08;npm run serve&#xff…

iPhone怎么一键删除照片:快速清理存储空间

在我们的iPhone中&#xff0c;照片往往是占据大量存储空间的主要内容。随着时间的推移&#xff0c;无数快照、截图和下载的图片不断积累&#xff0c;最终使设备的存储空间告急。幸运的是&#xff0c;iPhone 提供了一些简便的方法来一键删除这些照片&#xff0c;而借助于专业工具…

虚拟机网络部署固化IP

有时我们发现在重启虚拟机后&#xff0c;Linux连接不上了&#xff0c;查看原来是IP变了&#xff0c;这是由于IP没有固化导致&#xff0c;所以要先固化ip。 配置网络环境&#xff1a; 1. 关闭防火墙 &#xff08; 重要 &#xff09; 1:查看防火状态 systemctl status firewa…

Django项目中使用SimpleUI

SimpleUI是一个简洁、美观的Django后台管理界面,它可以让你的Django Admin更加直观和易用。本文将指导你如何安装和配置SimpleUI,并在admin.py中进行自定义配置,以及如何修改APP和模型名称为中文,并关闭首页右侧版本信息和使用分析,最后添加自定义或第三方APP名,并添加自…

【探商宝】OpenAI 发布 Sora:视频生成领域的重大突破

2024 年 12 月 10 日&#xff0c;OpenAI 正式推出了备受瞩目的人工智能视频生成模型 Sora&#xff0c;这一举措在科技界引起了轩然大波&#xff0c;为视频创作领域带来了全新的可能性和变革. 一、Sora 的功能与特性 1. 强大的视频生成能力 Sora 能够根据用户输入的文本描述生…

React的Hooks详解

React Hooks 是 React 16.0 版本引入的一个新特性&#xff0c;它允许你在函数组件中使用状态和其他React特性&#xff0c;而不必使用类组件。以下是一些常用的Hooks及其详解&#xff1a; useState useState 是最基础的 Hook&#xff0c;用于在函数组件中添加状态。 const [st…

基于Kubesphere实现微服务的CI/CD——部署微服务项目(三)

目录 一、kubesphere安装 1、安装本地持久存储 1.1、default-storage-class.yaml 1.2、 openebs-operator.yaml 1.3、安装 Default StorageClass 2、安装kubesphere 2.1、安装Helm 2.2、安装kubesphere 二、配置kubesphere 1、安装插件 2、创建devops项目 3、配置…

jupyter或者python文件导入其他python文件模块中方法路径问题

一 使用绝对导入&#xff08;通用性更好&#xff0c;更推荐在多种场景下使用&#xff09; 无论你的项目目录是否严格符合 Python 包结构&#xff0c;都可以通过把目标模块所在的目录添加到 sys.path 中&#xff0c;然后进行绝对导入。例如&#xff0c;假设 aaas.py 文件在 C:/m…