Spring Cloud — 消息驱动 Stream

news/2025/3/4 16:34:40/

Spring Cloud Stream 是让微服务更容易在应用中实现消息的发布和订阅处理的框架。Stream 支持与多种消息中间件整合,如Kafka、RibbitMQ等。

本文使用的是Kafka消息中间件,依赖文件为:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>1.2.1.RELEASE</version>
</dependency>

1 Stream 应用模型

图 Spring Cloud Stream 的应用模式

通道接口与监听接口:用于定于消息的输入和输出通道的接口,通过注解(@Input或@Output)标记方法。该注解的参数为通道名称。而通道名称与消息中间件的主题一对一绑定。

消息通道:对消息队列的一种抽象,用来存放消息发布者发布的消息或消费者所要消费的信息及信息的发送与接收。

消息绑定器:实现应用程序与消息中间件之间交互的隔离。封装了交互细节。

1.1 消息发送和监听步骤

负责发送消息的微服务称为生产者,负责接收消息的微服务称为消费者。

1.1.1 生产者的消息发送

1)添加@EnableBinding注解。例如,@EnableBinding(Source.class)

该注解会触发Spring Cloud Stream进行基本配置,将应用升级为一个Spring Cloud Stream 应用。其可以声明一个或多个(发送或监听)通道接口参数。

2)访问消息通道及发送消息。

发送消息通道用于发送消息(MessageChannel的send方法)。通过上面声明的发送通道接口的方法获取通道。

1.1.2 消费者的消息监听

1)添加@EnableBinding注解。例如,@EnableBinding(Sink.class)

2)@StreamListener 注解方法来实现消息的监听。

@StreamListener(Sink.INPUT)
public void onUserMsgSink(UserMsg userMsg) {System.out.println("收到消息:" + userMsg);
}

1.1.3 相关配置

还需要配置Kafka的连接信息,及消息通道对应的主题。

spring.cloud.stream.kafka.binder.brekers=Kafka的Host

spring.cloud.stream.kafka.binder.zkNodes=zookeeper的Host

生产者的配置:

spring.cloud.bindings.[通道名称].destination= 发送通道对应的主题

消费者的配置:

spring.cloud.bindings.[通道名称].destination= 监听通道对应的主题

spring.cloud.bindings.[通道名称].group= 消费者组名

1.2 通道接口 Channel Interfaces

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}public interface Sink {String INPUT = "input";@Input("input")SubscribableChannel input();
}

Source 及Sink 为Spring Cloud Stream 框架定义的发送消息通道接口及监听通道接口。@EnableBinding注解的参数就是通道接口的class。@Output来标注获取发送消息通道的方法。其中返回值要是MessageChannel及其子类。@Input来标注获取监听消息通道的方法,返回值要是SubscribableChannel及其子类。

1.2.1 通道接口的注入及使用

@Component
public class UserMsgSender {private final Source source;public UserMsgSender(Source source) {this.source = source;}
}

获取通道,调用Source接口的output()方法。也可以直接在Bean中注入消息通道。

@Component
public class UserMsgSender {private final MessageChannel channel;public UserMsgSender(@Qualifier("output") MessageChannel channel) {this.channel = channel;}
}

如果在服务中定义了多个消息通道,可以通过@Qualifier(“通道名称”)来明确具体的通道。

1.2.2 自定义消息通道

Source定义的发送通道名称为output,Sink定义的监听通道名称为input。我们可以自定义通道。步骤如下:

1)定义通道接口。

public interface ConsumerStreamChannel {@Input("consumerUserMsg")SubscribableChannel userMsg();
}

2)@EnableBinding中声明这个接口。

@EnableBinding(ConsumerStreamChannel.class)

3)在配置文件中为该通道配置相应的主题(或消费组名)。

spring.cloud.bindings.consumerUserMsg.destination= 监听通道对应的主题

spring.cloud.bindings.consumerUserMsg.group= 消费者组名

注意,在使用@StreamListener注解来标注监听方法时,该注解的参数为通道名称。例如,@StreamListener("consumerUserMsg")


http://www.ppmy.cn/news/1576584.html

相关文章

Powershell和BTEQ工具实现带多组参数和标签的Teradata数据库批量数据导出程序

设计一个基于多个带标签SQL模板作为配置文件和多组参数的Powershell代码程序和BTEQ工具&#xff0c;实现根据不同的输入参数&#xff0c;自动批量地将Teradata数据库的数据导出为CSV文件到指定目录上&#xff0c;标签和多个参数&#xff08;以“_”分割&#xff09;为组成导出数…

摄像头应用编程(四):ARM Linux LCD实时预览UVC摄像头画面

文章目录 1、前言2、环境介绍3、步骤4、应用程序编写4.1、lcd初始化4.2、摄像头初始化4.3、jpeg解码4.4、开启摄像头4.5、完整的程序如下 5、测试5.1、编译应用程序5.2、运行应用程序 6、总结 1、前言 本次应用程序主要针对支持MJPEG格式输出的UVC摄像头。 2、环境介绍 rk35…

GPT-4.5震撼登场,AI世界再掀波澜!(3)

GPT-4.5震撼登场&#xff0c;AI世界再掀波澜! GPT-4.5震撼登场&#xff0c;AI世界再掀波澜!(2) &#xff08;一&#xff09;伦理困境&#xff1a;如何抉择 GPT-4.5 的强大功能在为我们带来诸多便利的同时&#xff0c;也引发了一系列深刻的伦理问题&#xff0c;这些问题犹如高…

【pytest框架源码分析四】pluggy源码分析之hook执行

pluggy的主要执行方法在_callers.py中&#xff0c;这里简单介绍下。 def _multicall(hook_name: str,hook_impls: Sequence[HookImpl],caller_kwargs: Mapping[str, object],firstresult: bool, ) -> object | list[object]:"""Execute a call into multipl…

Unity 内置渲染管线各个Shader的用途和性能分析,以及如何修改Shader(build in shader 源码下载)

文章目录 所有Shader分析路径&#xff1a;Standard路径&#xff1a;Nature/路径&#xff1a;UI/路径&#xff1a;Particles/Particles/Standard SurfaceParticles/Standard Unlit 路径&#xff1a;Unlit/Unlit/TextureUnlit/ColorUnlit/TransparentUnlit/Transparent CutoutUnl…

android12 屏幕亮度控制修改为线性变化

由于高版本的亮度调节不是线性变化了,有客户反馈在Android11或者12上使用代码获取亮度不对,比如我们在设置中查看屏幕亮度是80%,读出来的亮度值是100,客户认为亮度值是39%。 获取屏幕亮度adb shell settings get system screen_brightness 或者 adb shell cat /sys/class…

STM32G431RBT6——(2)浅析Cortex-M4内核

本篇博客是一个对Cortex-M4内核了解性的简介&#xff0c;不会涉及到深奥的理论&#xff0c;请大家放心食用。 我们所学习的STM32G431RBT6单片机是基于ARM的Cotex-M4内核&#xff0c;因此我们有必要对此内核做一个大概了解。其实M4内核和M3内核有很大的相似之处&#xff0c;很多…

广州4399游戏25届春招游戏策划管培生内推

【热招岗位】 游戏策划管培生、产品培训生、游戏文案策划、游戏数值策划、游戏系统策划、游戏产品运营、游戏战斗策划、游戏关卡策划 【其他岗位】产品类&#xff08;产品培训生、产品运营等&#xff09;、技术类&#xff08;开发、测试、算法、运维等&#xff09;、运营市场类…