消息驱动Stream---基于SpringCloud

server/2024/11/15 6:57:28/

概要:实际开发中,服务与服务之间的通信经常会使用到消息中间件,而以往使用的一些消息中间件,比如RabbitMQ,该中间件和系统的耦合性非常高,如果我们要将RabbitMQ替换为Kafka,那么系统将会有较大的变动。此时,我们可以使用Spring Cloud Stream整合消息中间件降低系统和中间件的耦合性。

Spring Cloud Stream简介

什么是Spring Cloud Stream

概述Spring Cloud Stream是一个构建消息驱动微服务的框架

作用它是Spring Cloud对于消息中间件的进一步封装,通过使用Spring Cloud Stream,可忽略消息中间件之间的差异,有效降低开发人员对消息中间件的使用复杂度。 目前Spring Cloud Stream支持的消息中间件仅有RabbitMQKafka

Spring Cloud Stream 与消息中间件的交互

使用Stream构建的应用程序与消息中间件之间是通过绑定器Binder相关联的

Binder

特点Binder对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对Stream应用程序来说是透明的

用途对于每一个Stream的应用程序来说,Binder无需知晓消息中间件的通信细节,而是通过向应用程序暴露统一的通道(Channel)来进行通信。Binder是作为输入通道(inputs输出通道(outputs消息中间件之间的桥梁进行消息通信。

构建Stream工程

快速构建Stream工程

stream-hello%E9%A1%B9%E7%9B%AE" style="margin-left:0in;text-align:left;">1.创建stream-hello项目

加入依赖:

java">    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>2.1.3.RELEASE</version></dependency></dependencies>

创建rabittmq包,并在rabittmq包中创建SinkReceiver类,用于接收RabbitMQ发送的消息 

java">import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;@EnableBinding(Sink.class)  //开启绑定通道的注解  Sink是Stream组件默认的输入通道接口
public class SinkReceiver {//声明日志private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);@StreamListener(Sink.INPUT)      //此注解声明此方法为监听方法private void receiver(String payload){logger.info("Receiver:"+payload);}
}
2.编写消息消费者类

@EnableBinding注解用来指定一个或多个定义了@Input@Output注解的接口,以此实现对消息通道(Channel)的绑定。

@StreamListener注解主要是修饰方法,用于将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名

3.项目测试

输入Rabbit MQ的可视化网址,账号密码默认为:guest

        第一个Spring Cloud Stream案例就完成了,但是我们会发现,此案例中并没有在配置文件 application.yml中进行任何属性设置,原因在于Spring Cloud Stream会为消息中间件RabbitMQ提供默认的自动化配置。当然我们也可以在Spring Boot支持的全局配置文件application.propertiesapplication.yml中修改相关配置。

Stream的发布-订阅模式

Stream的分布-订阅

特点:Spring Cloud Stream中的消息通信方式遵循的是发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享主题的方式进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方

Stream框架应用结构图

提供者发送消息到RabbitMQ等消息中间件,消费者通过订阅的方式从消息中间件获取消息。

搭建工程实现Stream的发布-订阅 

1.启动rabbitmq
2.创建提供者

依赖:

java"><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>2.1.3.RELEASE</version></dependency>

application.yml文件:

java">server:port: 8898
spring:application:name: stream-rabbitmq-providerrabbitmq:host: localhostport: 5672username: guestpassword: guestcloud:stream:bindings:output:destination: minestream

 创建StreamProvider

 

java">
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding(Source.class)
@RestController
public class StreamProvider {@Autowired@Output(Source.OUTPUT)private MessageChannel channel;@GetMapping("/send")public void send(){channel.send(MessageBuilder.withPayload("hello world").build());}
}
3.创建消费者

依赖和上面提供者一样(复制即可)

application.yml:

java">server:port: 9898
spring:application:name: stream-rabbitmq-consumerrabbitmq:host: localhostport: 5672username: guestpassword: guestcloud:stream:bindings:input:destination: minestream

创建StreamConsumer 

 

java">import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;@EnableBinding(Sink.class)
public class StreamConsumer {@StreamListener(Sink.INPUT)public void receiver(String payload){System.out.println("接收到了mq中发送过来的消息"+payload);}
}
 4.测试运行

访问http://localhost:8898/send 地址发送消息,在消费者项目stream-rabbitmq-consumer的控制台可以看到打印日志“接收到MQ消息:Hello World!”,说明消息已成功被接收。

Stream的消费组和消息分区

Stream消费组的实现

Spring Cloud Stream应用程序开发中,如果在同一主题上的应用需要启动多个实例时,为防止对消息的重复处理,我们可以通过spring.cloud.stream.bindings.input. group属性为应用指定一个组名,这样一个应用的多个实例在接收到消息时,只会有一个实例真正收到消息并进行处理。

添加消费组

现在因为消费者是个集群,需要再复制多一个消费者,更改端口,并且两个消费者的配置文件加上group: stream

测试运行

依次启动stream-rabbitmq-provider,stream-rabbitmq-consumer,stream-rabbitmq-consumer2,在RabbitMQ控制台的 Queues可以看到两个队列合并为一个:minestream.stream

在浏览器访问http://localhost:8898/send

 

但再次访问http://localhost:8898/send时消费者2收到消息,而消费者1没有收到消息

说明:消息是以轮询的方式进行接收的

Stream的消息分区 

什么是Stream的消息分区

        在消费组中我们可以保证消息不会被重复消费,但是在同组下有多个实例的时候,我们无法确定每次处理消息的是不是被同一消费者消费,这就要使用到Stream消息分区消息分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理。

1.改造提供者

修改提供者stream-rabbitmq-provider项目的application.yml配置文件

2.改造消费者

修改消费者stream-rabbitmq-consumer项目的application.yml配置文件

修改消费者stream-rabbitmq-consumer2项目的application.yml配置文件

 

3.启动测试

多次访问http://localhost:8898/send,发现在9899端口的控制台打印了多次日志信息“接收到MQ消息:Hello World!”,而9898端口的控制台没有,说明只有指定的分区可以接收到消息,这就是消费分区的作用。


http://www.ppmy.cn/server/102234.html

相关文章

Django Project | 云笔记练习项目

文章目录 功能整体架构流程搭建平台环境子功能先创建用户表 并同步到数据库1.用户注册密码存储 -- 哈希算法唯一索引引发的重复问题 try登陆状态保持 -- 详细看用户登录状态 2. 用户登录会话状态时间 cookie用户登录状态校验 3. 网站首页4.退出登录5.笔记模块 列表页添加笔记 …

C++ //练习 17.16 如果前一题程序中的regex对象用“[^c]ei“进行初始化,将会发生什么?用此模式测试你的程序,检查你的答案是否正确。

C Primer&#xff08;第5版&#xff09; 练习 17.16 练习 17.16 如果前一题程序中的regex对象用"[^c]ei"进行初始化&#xff0c;将会发生什么&#xff1f;用此模式测试你的程序&#xff0c;检查你的答案是否正确。 环境&#xff1a;Linux Ubuntu&#xff08;云服务…

EmguCV学习笔记 VB.Net 2.5 Mat类、Matrix类和Image类的相互转换

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 EmguCV学习笔记目录 Vb.net EmguCV学习笔记目录 C# 笔者的博客网址&#xff1a;VB.Net-CSDN博客 教程相关说明以及如何获得pdf教…

uni-app 开发华为鸿蒙HarmonyOS NEXT初体验

引言 随着华为鸿蒙系统的不断发展&#xff0c;越来越多的开发者开始尝试使用uni-app来开发跨平台应用&#xff0c;特别是针对华为最新的HarmonyOS NEXT版本。本文将分享我在使用uni-app开发HarmonyOS NEXT应用的一些经验和心得&#xff0c;并提供具体的代码示例。 1. 开发环境…

数据结构详细教程绪论

&#x1f30f;个人博客主页&#xff1a;意疏-CSDN博客 希望文章能够给到初学的你一些启发&#xff5e; 如果觉得文章对你有帮助的话&#xff0c;点赞 关注 收藏支持一下笔者吧&#xff5e; 阅读指南&#xff1a; 开篇说明一、数据结构绪论 开篇说明 数据结构是计算机科学中的…

1.Java基础概念-注释和关键字

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 Facts speak louder than words&#xff01; 注释是在程序的指定位置…

Browserless 网页抓取:在 Selenium 中使用 NodeJs

Selenium 是否有效&#xff1f; Selenium 是一个流行的开源网页自动化框架&#xff0c;主要用于浏览器测试自动化。此外&#xff0c;它也可以用来解决动态网页抓取问题。 Selenium 有三个主要组件&#xff1a; Selenium IDE&#xff1a;一个浏览器插件&#xff0c;提供了一种…

XML外部实体注入

1.DTD实体及引用 DTD(文档类型定义)是一种用于定义XML文档结构和元素约束的方法。它可以描述一个XML文档的元素、属性、实体、注释等&#xff0c;从而规定了文档的结构和语法规则。DTD 通常是一个单独的文件&#xff0c;可以被多个XML文档所共享。 而在DTD中&#xff0c;实体…