一、什么是SpringCloudStream
SpringCloudStream是SpringCloud的一个子项目,他提供了一套更加通用的操作MQ的解决方案
- Destination Binder(目标绑定器) :微服务与消息中间件通信的组件
- Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
- Message:消息
二、为什么选择SpringCloudStream
现在的mq产品主流有4中:rabbitmq,rocketmq,activemq,kafka;有时候很意外的是:学的其中一个,公司用的又是另外一个,导致学习成本提高。有或者是 业务服务使用rabbitmq,而数据库后台使用kafka,整个项目使用了2种mq,可能会导致切换困难,维护成本高等因素。
我们希望能够像学习 hibernate时那样,不管底层是oracle还是mysql或其他数据库,只要给我一组统一的API操作即可;而springcloud-stream就相当于mq的统一接口。
Tip:input表示微服务接收消息,output表示微服务发送消息
二、SpringBoot整合SpringCloudStream
2.1、创建项目
父工程:stream-mq-demo
子工程(消息生产者):producer
子工程(消息消费者):consumer
2.1、pom.xml
配置父工程的pom.xml
- springboot版本
- 各组件的版本号
- 配置SpringCloud和SpringCloudAlibaba
- 添加依赖(RocketMQ、SpringCloudStream等)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version></parent><groupId>org.example</groupId><artifactId>stream-mq-demo</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>producer</module><module>consumer</module></modules><properties><spring-cloud.version>Greenwich.SR1</spring-cloud.version><spring-cloud-alibaba.version>0.9.0.RELEASE</spring-cloud-alibaba.version><java.version>1.8</java.version><lombok.version>1.18.8</lombok.version><rocketmq.version>2.0.3</rocketmq.version></properties><dependencies><!-- RocketMQ坐标 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><!-- SpringCloudStream坐标 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><!-- SpringWeb坐标 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- lombok坐标 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies><dependencyManagement><dependencies><!--整合spring cloud--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud alibaba--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
</project>
2.2、开发消息生产者
在子工程producer中进行开发
2.2.1、application.yml
spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:#消费者output:#用来指定topicdestination: stream-test-topic
server:port: 8081
2.2.2、启动类
在启动类中配置生产者
package demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}
2.2.3、在controller中测试
package demo.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TestController {@Autowiredprivate Source source;@GetMapping("/send")public String testSend(){source.output().send(MessageBuilder.withPayload("这是一条测试的消息").build());return "消息发送完成,请到MQ控制台查看";}
}
2.2.4、测试
使用postman发送请求 http://localhost:8081/send
在MQ的控制台可以看到有新的消息
2.3、开发消费者
在子工程consumer中进行开发
2.3.1、application.yml
spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:#消息消费者input:#用来指定topic,要和消息生产者的的topic匹配destination: stream-test-topic#一定要设置,必填项,如果用其他MQ,该属性可以不设置group: test
server:port: 8082
2.3.2、启动类
在启动类中配置消费者
package demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}
2.3.3、监听消息
在消费者中监听生产者发送的消息
package demo.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class TestStreamConsumer {@StreamListener(Sink.INPUT)public void receive(String messageBody){log.info("通过stream收到了消息:messageBody={}", messageBody);
// throw new RuntimeException();}/*** 全局异常处理** @param message 发生异常的消息*/@StreamListener("errorChannel")public void error(Message<?> message) {ErrorMessage errorMessage = (ErrorMessage) message;log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);}
}
2.3.4、测试
使用postman发送请求 http://localhost:8081/send
在消费者控制台可以看到该条消息被消费了