# 从浅入深 学习 SpringCloud 微服务架构(十六)

server/2024/9/22 16:14:02/

从浅入深 学习 SpringCloud 微服务架构(十六)

Stream_2">一、SpringCloudStream:自定义消息通道

1、在子工程 stream_product (子模块)中,创建 自定义的消息通道类 MyProcessor.java

/***   spring_cloud_demo\stream_product\src\main\java\djh\it\stream\channel\MyProcessor.java**   2024-5-11 创建 自定义的消息通道类 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生产者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消费者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

2、在子工程 stream_product (子模块)中,修改 消息发送的工具类 MessageSender.java 使用自定义消息通道。

/***  spring_cloud_demo\stream_product\src\main\java\djh\it\stream\producer\MessageSender.java**  2024-5-10  抽取一个消息发送的工具类 MessageSender.java*/package djh.it.stream.producer;import djh.it.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
//import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Source.class)
@EnableBinding(MyProcessor.class)
public class MessageSender {
//    @Autowired
//    private MessageChannel output;
//
//    //发送消息
//    public void send(Object obj){
//        output.send(MessageBuilder.withPayload((obj)).build());
//    }@Autowired@Qualifier(value = "myoutput")private MessageChannel myoutput;//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload((obj)).build());}
}

3、在子工程 stream_product (子模块)中,修改 application.yml 配置文件, 添加自定义消息配置。

##  spring_cloud_demo\stream_product\src\main\resources\application.ymlserver:port: 7001  #服务端口
spring:application:nmae: stream_product  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:  #管道交互destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myoutput:   # 自定义消息通道destination: djh-custom-outputbinders:  #配置绑定器defaultRabbit:type: rabbit

4、在子工程 stream_consumer (子模块)中,创建 自定义的消息通道类 MyProcessor.java

/***   spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\channel\MyProcessor.java**   2024-5-11 创建 自定义的消息通道类 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生产者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消费者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

5、在子工程 stream_consumer (子模块)中,修改 获取消息工具类 MessageListener.java 使用自定义消息通道。

/***   spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\consumer\MessageListener.java**   2024-5-10 创建一个获取消息工具类 MessageListener.java*/package djh.it.stream.consumer;import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {//    //监听 binding 中的消息
//    @StreamListener(Sink.INPUT)
//    public void input(String message) {
//        System.out.println("获取到的消息: " + message);
//    }//监听 binding 中的消息@StreamListener(MyProcessor.MYINPUT)public void input(String message) {System.out.println("获取到的消息: " + message);}
}

6、在子工程 stream_consumer (子模块)中,修改 application.yml 配置文件, 添加自定义消息配置。

##  spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002  #服务端口
spring:application:nmae: stream_consumer  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputbinders:  #配置绑定器defaultRabbit:type: rabbit

7、在子工程 stream_product (子模块)中,运行 启动类 ProducerApplication.java 进行测试

/***   spring_cloud_demo\stream_product\src\main\java\djh\it\stream\ProducerApplication.java**   2024-5-9 SpringCloudStream 入门案例:启动类 ProducerApplication.java*      1)引入依赖。*      2)配置 application.yml 配置文件。*      3)发送消息的话,定义一个通道接口,通过接口中内置的 messagechannel,(sprngcloudtream 中内置接口 Source)*      4)@EnableBinding 注解 :绑定对应通道。*      5)发送消息的话,通过 MessageChannel 发送消息,如果需要 MessageChannel --> 通过绑定内置接口获取。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}

8、在子工程 stream_consumer (子模块)中,运行 启动类 ConsumerApplication.java 进行测试。

/***    spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\ConsumerApplication.java**   2024-5-9 SpringCloudStream 入门案例:启动类 ConsumerApplication.java*      1)引入依赖。*      2)配置 application.yml 配置文件。*      3)定义一个通道接口,通过内置获取消息的接口:Sink*      4)绑定对应通道。*      5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class);}
}

9、在子工程 stream_product (子模块)中,运行 一个测试类 ProducterTest.java 进行测试。

/***  spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java**  2024-5-10 创建一个测试类 ProducterTest.java*/package djh.it.stream;import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {@Autowiredprivate MessageSender messageSender;@Testpublic void testSend(){messageSender.send("hello 测试 工具类");}
}

10、启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类,在 idea Run Dashboard 控制面板,

同样会输出 “获取到的消息: hello 测试 工具类”

在这里插入图片描述

Stream_301">二、SpringCloudStream:消息分组

Stream_303">1、SpringCloudStream:消息分组

  • 通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

  • 实现的方式非常简单,我们只需要在服务消费者端设置 spring.c1oud.stream.bindings.input.group 属性即可。

2、在子工程 stream_consumer (子模块),复制一个更名为:在子工程 stream_consumer_2 (子模块),并把 application.yml 配置文件中的端口号改为:7003

1)子工程 stream_consumer_2 (子模块)中的 pom.xml 文件。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>spring_cloud_demo</artifactId><groupId>djh.it</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>stream_consumer_2</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency></dependencies>
</project>
<!-- spring_cloud_demo\stream_consumer_2\pom.xml -->

2)子工程 stream_consumer_2 (子模块)中的 application.yml 文件。

##  spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003  #服务端口
spring:application:nmae: stream_consumer_2  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputbinders:  #配置绑定器defaultRabbit:type: rabbit

3)子工程 stream_consumer_2 (子模块)中的 自定义的消息通道类 MyProcessor.java

/***   spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\channel\MyProcessor.java**   2024-5-11 创建 自定义的消息通道类 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生产者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消费者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

4)子工程 stream_consumer_2 (子模块)中的 获取消息工具类 MessageListener.java

/***   spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\consumer\MessageListener.java**   2024-5-11 创建一个获取消息工具类 MessageListener.java*/package djh.it.stream.consumer;import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {//    //监听 binding 中的消息
//    @StreamListener(Sink.INPUT)
//    public void input(String message) {
//        System.out.println("获取到的消息: " + message);
//    }//监听 binding 中的消息@StreamListener(MyProcessor.MYINPUT)public void input(String message) {System.out.println("获取到的消息: " + message);}
}

5)子工程 stream_consumer_2 (子模块)中的 启动类 ConsumerApplication_2.java

/***   spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\ConsumerApplication_2.java**   2024-5-11 SpringCloudStream 入门案例:启动类 ConsumerApplication_2.java*      1)引入依赖。*      2)配置 application.yml 配置文件。*      3)定义一个通道接口,通过内置获取消息的接口:Sink*      4)绑定对应通道。*      5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication_2 {public static void main(String[] args) {SpringApplication.run(ConsumerApplication_2.class);}
}

3、启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,

在 idea Run Dashboard 控制面板,两个消费都启动类都会输出 “获取到的消息: hello 测试 工具类”

在这里插入图片描述

4、在子工程 stream_consumer (子模块)的 application.yml 配置文件中,添加 消息分组配置。

##  spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002  #服务端口
spring:application:nmae: stream_consumer  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group1  #消息分组(同一组只能有一个消息者获取消息)binders:  #配置绑定器defaultRabbit:type: rabbit

5、在子工程 stream_consumer_2 (子模块)的 application.yml 配置文件中,也添加 消息分组配置。

##  spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003  #服务端口
spring:application:nmae: stream_consumer_2  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group1  #消息分组(同一组只能有一个消息者获取消息)binders:  #配置绑定器defaultRabbit:type: rabbit

6、重新启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,

在 idea Run Dashboard 控制面板,发现只有一个消费都启动类都会输出 “获取到的消息: hello 测试 工具类”

在这里插入图片描述

Stream_538">三、SpringCloudStream:消息分区

1、消息分区

有一些场景需要满足,同一个特征的数据被同一个实例消费,比如同一个id的传感器监测数据必须被同-个实例统计计算分析,否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例.

2、在子工程 stream_producer (子模块)的 application.yml 配置文件中,添加 消息分区配置。

##  spring_cloud_demo\stream_product\src\main\resources\application.ymlserver:port: 7001  #服务端口
spring:application:nmae: stream_product  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:  #管道交互destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myoutput:   # 自定义消息通道destination: djh-custom-outputproducer:  # 配置分区partition-key-expression: payload  # 分区关键字,对象中的 id 或 对象。partition-count: 2  # 分区大小binders:  #配置绑定器defaultRabbit:type: rabbit

3、在子工程 stream_consumer (子模块)的 application.yml 配置文件中,也添加 消息分区配置。

##  spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002  #服务端口
spring:application:nmae: stream_consumer  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:instanceCount: 2  # 消费者总数。instanceIndex: 0  # 当前消费者的索引,从 0 开始。bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group1  #消息分组(同一组只能有一个消息者获取消息)consumer:partitioned: true  # 开启分区支持binders:  #配置绑定器defaultRabbit:type: rabbit

3、在子工程 stream_consumer_2 (子模块)的 application.yml 配置文件中,也添加 消息分区配置。

##  spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003  #服务端口
spring:application:nmae: stream_consumer_2  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:instanceCount: 2  # 消费者总数。instanceIndex: 1  # 当前消费者的索引,从 0 开始。bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group2  #消息分组(同一组只能有一个消息者获取消息)consumer:partitioned: true  # 开启分区支持binders:  #配置绑定器defaultRabbit:type: rabbit

4、修改 子工程 stream_producer (子模块)的 测试类 ProducterTest 进行测试。

/***  spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java**  2024-5-10 创建一个测试类 ProducterTest.java*/package djh.it.stream;import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {@Autowiredprivate MessageSender messageSender;@Testpublic void testSend(){
//        messageSender.send("hello 测试 工具类");for(int i=0;i<5;i++){messageSender.send("0");}}
}

5、重新启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,

在 idea Run Dashboard 控制面板,发现只有 ConsumerApplication 一个消费者启动类都会输出 “获取到的消息: 0”

在这里插入图片描述

上一节关联链接请点击:
# 从浅入深 学习 SpringCloud 微服务架构(十五)


http://www.ppmy.cn/server/40482.html

相关文章

正点原子Linux学习笔记(六)在 LCD 上显示 jpeg 图像

在 LCD 上显示 jpeg 图像 20.1 JPEG 简介20.2 libjpeg 简介20.3 libjpeg 移植下载源码包编译源码安装目录下的文件夹介绍移植到开发板 20.4 libjpeg 使用说明错误处理创建解码对象设置数据源读取 jpeg 文件的头信息设置解码处理参数开始解码读取数据结束解码释放/销毁解码对象 …

【OceanBase诊断调优】—— checksum error ret=-4103 问题排查

适用版本 OceanBase 数据库所有版本。 什么是 checksum data checksum&#xff1a;一个 SSTable 中所有宏块内存二进制计算出来的 checksum 值。反映了宏块中的数据和数据分布情况。如果宏块中数据一致但是数据分布不一致&#xff0c;计算出来的 checksum 也不相等。 column…

分享5个免费AI写作软件

在数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;正以惊人的速度渗透到我们生活的方方面面&#xff0c;而写作领域也不例外。AI写作工具的出现&#xff0c;不仅改变了传统的写作流程&#xff0c;更在创意表达、文本生成、语言校正等方面展现了其独特的优势。这些工…

py黑帽子学习笔记_环境准备

1 下载os装os 下载一个kali虚机镜像然后用虚机管理软件创虚机&#xff0c;装完如下图&#xff0c;我用的版本是2024.1的版本kali-linux-2024.1-installer-amd64&#xff0c;可以从镜像站下载&#xff0c;官网下的慢还断网Index of /kali-images/kali-2024.1/ | 清华大学开源软…

Vue3实战笔记(16)—pinia基本用法--Getter

文章目录 前言一、pinia的getter简单理解二、访问其他 store 的 getter总结 前言 在 Pinia 中&#xff0c;getter 类似于 Vuex 中的 getter&#xff0c;允许你从 store 中派生出一些状态&#xff0c;而不需要修改原始状态。这使得我们可以创建基于现有状态的计算属性。 一、pi…

[C/C++] -- 观察者模式

观察者模式是一种行为型设计模式&#xff0c;用于定义对象间的一种一对多的依赖关系&#xff0c;使得当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都会得到通知并自动更新。 观察者模式涉及以下几个角色&#xff1a; 主题&#xff08;Subject&#xff09;&…

【linux软件基础知识】-cdev_alloc

struct cdev *cdev_alloc(void) {struct cdev *p = kzalloc(sizeof(struct cdev), GFP_KERNEL);if <

oracle 行转列及列转行

行转列 使用pivot函数实现 行转列 with temp as( select 四川省 nation ,成都市 city,第一 ranking from dual union all select 四川省 nation ,绵阳市 city,第二 ranking from dual union all select 四川省 nation ,德阳市 city,第三 ranking from dual union all select 四…