🌸 spring 官方推荐使用 streamBridge
生产消费 streamMQ
示例 🌸
官方demo:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/streambridge-samples/stream-bridge-avro/src/main/java/com/example/stream/bridge/avro/StreamBridgeAvroApplication.java
🌸 一、yaml配置
- 生产者配置
spring:cloud:stream:function:# 重点配置 与 binding 名与消费者对应,多个可以使用;definition: cusnotification;projectmappingbindings:projectmapping-out-0:content-type: application/jsondestination: projectmapping-test-topicgroup: test-groupbinder: rocketmqrocketmq:binder:# RocketMQ 服务器地址name-server: 192.168.1.102:9876bindings:projectmapping-out-0:producer:# 必须得写group: defaultcusnotification-out-0:producer:# 必须得写group: cusnotification-group
- 消费者
spring:cloud:stream:function:# 重点配置 与 binding 名与消费者对应,多个可以使用;definition: projectmapping;cusnotificationbindings:projectmapping-in-0:content-type: application/jsondestination: projectmapping-test-topicgroup: test-groupbinder: rocketmq# Spring Cloud Stream RocketMQ 配置项rocketmq:# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类106.15.139.83binder:name-server: 192.168.1.102:9876 # RocketMQ Namesrv 地址# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Mapbindings:projectmapping-in-0:# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类consumer:enabled: true # 是否开启消费,默认为 truebroadcasting: false # 是否使用广播消费,默认为 false 使用集群消费orderly: true # 是否顺序消费,默认为 false 并发消费。
🌸 二、生产消费
- 消息配置
- 注意规范配置,不推荐直接写,容易出错。
- 注意规范配置,不推荐直接写,容易出错。
- 注意规范配置,不推荐直接写,容易出错。
public class MessageConstant {/*** 添加项目映射消息*/public static final String PROJECT_MAPPING = "projectmapping" + SplitStrConst.LINE;/*** 生产者标识*/public static final String OUTPUT = "out" + SplitStrConst.LINE + SplitStrConst.ZERO;/*** 项目映射消息生产者*/public static final String PROJECT_MAPPING_OUTPUT = PROJECT_MAPPING + OUTPUT;
}
- 消息载体
- 可以按要求更换,推荐带上消息唯一ID。
@Data
@Accessors(chain = true)
public class CommonMessaging {/*** 消息id*/private String msgId;/*** 消息内容*/private String msgText;
}
- 生产者生产消息
- 消息载体自动构建,不要再使用json互转,可直接消费,推荐带上消息唯一ID。
private final StreamBridge streamBridge;public void streamTestMsg(String msg) {// 构建消息对象CommonMessaging testMessaging = new CommonMessaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);streamBridge.send(MessageConstant.PROJECT_MAPPING_OUTPUT, MessageBuilder.withPayload(testMessaging).build());}
- 消费者消费
- 注意一个消费者只使用一个类
- 注意方法名称即队列名称,不可随意改动
- lambda消费,可直接获取消息题,不要再使用json互转
- 注意log打印
@Component
@Slf4j
public class ProjectMappingConsumer {@BeanConsumer<CommonMessaging> projectmapping() {log.info("projectmapping 初始化订阅成功。");return msg -> {log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), msg.toString());System.out.println(msg.getMsgText());};}
}