redis_stream__0">1.redis stream 特点
1.支持消息持久化
2.消费者组模式
3.消息确认机制
4. 消息重试机制
5. 死信队列
2. 消息生产者服务
2.1 如下代码
java">@Service
@Slf4j
public class StreamMessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";public String sendMessage(String topic, Object message) {try {StringRecord record = StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(message))).withStreamKey(STREAM_KEY + ":" + topic);RecordId recordId = redisTemplate.opsForStream().add(record);log.info("消息发送成功: topic={}, messageId={}", topic, recordId);return recordId.getValue();} catch (Exception e) {log.error("消息发送失败: topic={}, message={}", topic, message, e);throw new RuntimeException("消息发送失败", e);}}public List<String> sendMessages(String topic, List<Object> messages) {try {List<MapRecord<String, String, String>> records = messages.stream().map(msg -> StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(msg))).withStreamKey(STREAM_KEY + ":" + topic)).collect(Collectors.toList());List<String> messageIds = new ArrayList<>();for (MapRecord<String, String, String> record : records) {RecordId recordId = redisTemplate.opsForStream().add(record);messageIds.add(recordId.getValue());}log.info("批量消息发送成功: topic={}, count={}", topic, messageIds.size());return messageIds;} catch (Exception e) {log.error("批量消息发送失败: topic={}", topic, e);throw new RuntimeException("批量消息发送失败", e);}}
}
3.消息消费者服务(多线程消费)
3.1 代码如下
java">@Service
@Slf4j
public class StreamMessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private final Map<String, StreamMessageHandler<?>> handlers = new ConcurrentHashMap<>();public <T> void registerHandler(String topic, Class<T> messageType, Consumer<T> handler) {handlers.put(topic, new StreamMessageHandler<>(messageType, handler));}@PostConstructpublic void startConsuming() {for (String topic : handlers.keySet()) {String streamKey = STREAM_KEY + ":" + topic;String consumerGroup = "group:" + topic;String consumerName = "consumer:" + UUID.randomUUID().toString();try {createConsumerGroupIfNotExists(streamKey, consumerGroup);Thread consumerThread = new Thread(() -> consumeMessages(streamKey, consumerGroup, consumerName, topic));consumerThread.setName("stream-consumer-" + topic);consumerThread.start();} catch (Exception e) {log.error("启动消费者失败: topic={}", topic, e);}}}private void createConsumerGroupIfNotExists(String streamKey, String groupName) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {log.debug("Consumer group already exists: {}", groupName);}}private void consumeMessages(String streamKey, String group, String consumer, String topic) {StreamMessageHandler<?> handler = handlers.get(topic);while (!Thread.currentThread().isInterrupted()) {try {List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(Consumer.from(group, consumer),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (records != null && !records.isEmpty()) {for (MapRecord<String, String, String> record : records) {try {processMessage(record, handler);redisTemplate.opsForStream().acknowledge(streamKey, group, record.getId());} catch (Exception e) {log.error("消息处理失败: messageId={}", record.getId(), e);}}}} catch (Exception e) {log.error("消息消费异常: topic={}", topic, e);try {Thread.sleep(1000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}}private <T> void processMessage(MapRecord<String, String, String> record, StreamMessageHandler<T> handler) {try {String messageJson = record.getValue().get("message");T message = JSON.parseObject(messageJson, handler.getMessageType());handler.getHandler().accept(message);} catch (Exception e) {log.error("消息处理失败: {}", record, e);throw e;}}
}@Data
@AllArgsConstructor
class StreamMessageHandler<T> {private Class<T> messageType;private Consumer<T> handler;
}
4.消息重试服务
4.1 代码如下
java">@Service
@Slf4j
public class StreamMessageRetryService {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private static final int MAX_RETRY_COUNT = 3;@Scheduled(fixedDelay = 60000) public void processPendingMessages() {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;String groupName = "group:" + topic;try {PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);if (pending != null) {for (PendingMessage message : pending.getPendingMessages()) {processRetry(streamKey, groupName, message);}}} catch (Exception e) {log.error("处理待处理消息失败: topic={}", topic, e);}}}private void processRetry(String streamKey, String groupName, PendingMessage message) {try {if (message.getTotalDeliveryCount() > MAX_RETRY_COUNT) {moveToDeadLetter(streamKey, groupName, message.getIdAsString());} else {redisTemplate.opsForStream().claim(streamKey, groupName, "retry-consumer", Duration.ofMinutes(1), message.getIdAsString());}} catch (Exception e) {log.error("处理重试消息失败: messageId={}", message.getIdAsString(), e);}}private void moveToDeadLetter(String streamKey, String groupName, String messageId) {try {List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream().range(streamKey, Range.closed(messageId, messageId));if (messages != null && !messages.isEmpty()) {MapRecord<String, String, String> message = messages.get(0);redisTemplate.opsForStream().add(streamKey + ":dead", message.getValue());redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageId);}} catch (Exception e) {log.error("移动消息到死信队列失败: messageId={}", messageId, e);}}
}
5.使用示例
5.1代码如下
java">@Service
@Slf4j
public class MessageService {@Autowiredprivate StreamMessageProducer producer;@Autowiredprivate StreamMessageConsumer consumer;@PostConstructpublic void init() {consumer.registerHandler("order", OrderMessage.class, this::processOrderMessage);consumer.registerHandler("payment", PaymentMessage.class, this::processPaymentMessage);}public String sendOrderMessage(OrderMessage message) {return producer.sendMessage("order", message);}private void processOrderMessage(OrderMessage message) {try {log.info("处理订单消息: {}", message);} catch (Exception e) {log.error("订单消息处理失败", e);throw e;}}private void processPaymentMessage(PaymentMessage message) {try {log.info("处理支付消息: {}", message);} catch (Exception e) {log.error("支付消息处理失败", e);throw e;}}
}@Data
@AllArgsConstructor
class OrderMessage {private String orderId;private String status;private BigDecimal amount;
}@Data
@AllArgsConstructor
class PaymentMessage {private String paymentId;private String orderId;private BigDecimal amount;private String status;
}
6.监控和管理服务
6.1 代码如下
java">@Service
@Slf4j
public class StreamMonitorService {@Autowiredprivate StringRedisTemplate redisTemplate;public StreamInfo getStreamInfo(String topic) {try {String streamKey = STREAM_KEY + ":" + topic;StreamInfo.StreamInfoBuilder builder = StreamInfo.builder();Long length = redisTemplate.opsForStream().size(streamKey);builder.messageCount(length != null ? length : 0);StreamInfo.GroupInfo groupInfo = getGroupInfo(streamKey);builder.groupInfo(groupInfo);builder.lastMessageId(getLastMessageId(streamKey));return builder.build();} catch (Exception e) {log.error("获取Stream信息失败: topic={}", topic, e);throw new RuntimeException("获取Stream信息失败", e);}}@Scheduled(cron = "0 0 * * * *") public void cleanupOldMessages() {try {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;long maxLength = 1000000; redisTemplate.opsForStream().trim(streamKey, maxLength, true);}} catch (Exception e) {log.error("清理过期消息失败", e);}}
}@Data
@Builder
class StreamInfo {private long messageCount;private String lastMessageId;private GroupInfo groupInfo;@Data@Builderstatic class GroupInfo {private String name;private int consumerCount;private long pendingMessageCount;}
}