第02讲:SpringCloudStream

news/2024/11/29 10:34:57/

一、什么是SpringCloudStream

SpringCloudStream是SpringCloud的一个子项目,他提供了一套更加通用的操作MQ的解决方案

在这里插入图片描述

  1. Destination Binder(目标绑定器) :微服务与消息中间件通信的组件
  2. Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
  3. Message:消息

二、为什么选择SpringCloudStream

​ 现在的mq产品主流有4中:rabbitmq,rocketmq,activemq,kafka;有时候很意外的是:学的其中一个,公司用的又是另外一个,导致学习成本提高。有或者是 业务服务使用rabbitmq,而数据库后台使用kafka,整个项目使用了2种mq,可能会导致切换困难,维护成本高等因素。

我们希望能够像学习 hibernate时那样,不管底层是oracle还是mysql或其他数据库,只要给我一组统一的API操作即可;而springcloud-stream就相当于mq的统一接口。

在这里插入图片描述

Tip:input表示微服务接收消息,output表示微服务发送消息

二、SpringBoot整合SpringCloudStream

2.1、创建项目

父工程:stream-mq-demo

子工程(消息生产者):producer

子工程(消息消费者):consumer

2.1、pom.xml

配置父工程的pom.xml

  • springboot版本
  • 各组件的版本号
  • 配置SpringCloud和SpringCloudAlibaba
  • 添加依赖(RocketMQ、SpringCloudStream等)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version></parent><groupId>org.example</groupId><artifactId>stream-mq-demo</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>producer</module><module>consumer</module></modules><properties><spring-cloud.version>Greenwich.SR1</spring-cloud.version><spring-cloud-alibaba.version>0.9.0.RELEASE</spring-cloud-alibaba.version><java.version>1.8</java.version><lombok.version>1.18.8</lombok.version><rocketmq.version>2.0.3</rocketmq.version></properties><dependencies><!-- RocketMQ坐标 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><!-- SpringCloudStream坐标 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><!-- SpringWeb坐标 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- lombok坐标 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies><dependencyManagement><dependencies><!--整合spring cloud--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud alibaba--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
</project>

2.2、开发消息生产者

在子工程producer中进行开发

2.2.1、application.yml

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:#消费者output:#用来指定topicdestination: stream-test-topic
server:port: 8081

2.2.2、启动类

在启动类中配置生产者

package demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}

2.2.3、在controller中测试

package demo.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TestController {@Autowiredprivate Source source;@GetMapping("/send")public String testSend(){source.output().send(MessageBuilder.withPayload("这是一条测试的消息").build());return "消息发送完成,请到MQ控制台查看";}
}

2.2.4、测试

使用postman发送请求 http://localhost:8081/send

在MQ的控制台可以看到有新的消息

在这里插入图片描述

2.3、开发消费者

在子工程consumer中进行开发

2.3.1、application.yml

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:#消息消费者input:#用来指定topic,要和消息生产者的的topic匹配destination: stream-test-topic#一定要设置,必填项,如果用其他MQ,该属性可以不设置group: test
server:port: 8082

2.3.2、启动类

在启动类中配置消费者

package demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}

2.3.3、监听消息

在消费者中监听生产者发送的消息

package demo.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class TestStreamConsumer {@StreamListener(Sink.INPUT)public void receive(String messageBody){log.info("通过stream收到了消息:messageBody={}", messageBody);
//        throw new RuntimeException();}/*** 全局异常处理** @param message 发生异常的消息*/@StreamListener("errorChannel")public void error(Message<?> message) {ErrorMessage errorMessage = (ErrorMessage) message;log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);}
}

2.3.4、测试

使用postman发送请求 http://localhost:8081/send

在消费者控制台可以看到该条消息被消费了

在这里插入图片描述


http://www.ppmy.cn/news/71065.html

相关文章

如何有效的听话听音

学会从TA的的语句中抓住最有价值的信息&#xff1a;即“听话听音”。 我们常常讲言外之意&#xff0c;就是一句话字面之外的意思。 话&#xff1a;我们沟通当中的事件信息。我们沟通中讲的事情 音&#xff1a;音是包含了情感、语气、侧重、倾向&#xff0c;两层&#xff0c;…

C++进阶——mapset的使用

C进阶——map&set的使用 关联式容器 在初阶阶段&#xff0c;我们已经接触过STL中的部分容器&#xff0c;比如&#xff1a;vector、list、deque、forward_list(C11)等&#xff0c;这 些容器统称为序列式容器&#xff0c;因为其底层为线性序列的数据结构&#xff0c;里面存…

如何进行TCP抓包调试?

网络调试工具——Wireshark Wireshark 是世界上应用最广泛的网络协议分析器&#xff0c;它让我们在微观层面上看到整个网络正在发生的事情。 Wireshark 本身是一个开源项目&#xff0c;所以也得到了很多志愿者的支持。同时&#xff0c;Wireshark 具有丰富的功能集&#xff0c;…

科士达为绿色世界 提供多维度低碳新动能

2023年5月11日&#xff0c;“数据中心绿色发展大会”围绕节能降碳绿色赋能主题&#xff0c;在成都如期召开。作为国内数据中心行业的翘楚&#xff0c;科士达应邀参加了大会,与会代表高级售前沈凤文带来了题为《为绿色世界 提供多维度低碳新动能》的精彩演讲。 在“双碳”目标引…

mybatis的一级二级缓存详解及源码解剖

文章目录 什么是一级缓存&#xff1f;什么是二级缓存&#xff1f;一级缓存二级缓存有什么不同&#xff1f;执行流程源码流程解剖一级缓存失效场景分析二级缓存结构及需要解决的问题二级缓存执行流程二级缓存获取和commit源码解剖总结 什么是一级缓存&#xff1f; 一级缓存是指…

行业逻辑变了!智驾玩家要算「经济账」,从夺存量到造增量

按照高工智能汽车研究院的预测&#xff0c;到2025年行泊一体前装标配年交付规模将接近600万辆&#xff1b;在L2及L2等高阶智能驾驶细分市场的占有率将接近50%。 「高性能芯片域控制器软件算法的逐步成熟」&#xff0c;带动低速泊车和高速行车两套过去并行ADAS系统正在加速融合…

华为OD机试之求满足条件的最长子串的长度

求满足条件的最长子串的长度 题目描述 给定一个字符串&#xff0c;只包含字母和数字&#xff0c;按要求找出字符串中的最长&#xff08;连续&#xff09;子串的长度&#xff0c;字符串本身是其最长的子串&#xff0c;子串要求&#xff1a; 1、 只包含1个字母(a-z, A-Z)&#x…

I2C通信协议原理和MPU6050

一、串口通讯 只能在两个设备之间进行 若要三台设备两两通信&#xff0c;则每个设备得需要两组窗口&#xff0c;为3组相互独立的窗口通讯 为解决这个问题&#xff1a;设计了总线通讯&#xff0c;有多种&#xff0c;I2C为其中一种 二、I2C通信 &#xff08;1&#…