# 从浅入深 学习 SpringCloud 微服务架构(十六)

ops/2024/9/22 16:14:02/

从浅入深 学习 SpringCloud 微服务架构(十六)

Stream_2">一、SpringCloudStream:自定义消息通道

1、在子工程 stream_product (子模块)中,创建 自定义的消息通道类 MyProcessor.java

/***   spring_cloud_demo\stream_product\src\main\java\djh\it\stream\channel\MyProcessor.java**   2024-5-11 创建 自定义的消息通道类 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生产者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消费者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

2、在子工程 stream_product (子模块)中,修改 消息发送的工具类 MessageSender.java 使用自定义消息通道。

/***  spring_cloud_demo\stream_product\src\main\java\djh\it\stream\producer\MessageSender.java**  2024-5-10  抽取一个消息发送的工具类 MessageSender.java*/package djh.it.stream.producer;import djh.it.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
//import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Source.class)
@EnableBinding(MyProcessor.class)
public class MessageSender {
//    @Autowired
//    private MessageChannel output;
//
//    //发送消息
//    public void send(Object obj){
//        output.send(MessageBuilder.withPayload((obj)).build());
//    }@Autowired@Qualifier(value = "myoutput")private MessageChannel myoutput;//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload((obj)).build());}
}

3、在子工程 stream_product (子模块)中,修改 application.yml 配置文件, 添加自定义消息配置。

##  spring_cloud_demo\stream_product\src\main\resources\application.ymlserver:port: 7001  #服务端口
spring:application:nmae: stream_product  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:  #管道交互destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myoutput:   # 自定义消息通道destination: djh-custom-outputbinders:  #配置绑定器defaultRabbit:type: rabbit

4、在子工程 stream_consumer (子模块)中,创建 自定义的消息通道类 MyProcessor.java

/***   spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\channel\MyProcessor.java**   2024-5-11 创建 自定义的消息通道类 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生产者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消费者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

5、在子工程 stream_consumer (子模块)中,修改 获取消息工具类 MessageListener.java 使用自定义消息通道。

/***   spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\consumer\MessageListener.java**   2024-5-10 创建一个获取消息工具类 MessageListener.java*/package djh.it.stream.consumer;import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {//    //监听 binding 中的消息
//    @StreamListener(Sink.INPUT)
//    public void input(String message) {
//        System.out.println("获取到的消息: " + message);
//    }//监听 binding 中的消息@StreamListener(MyProcessor.MYINPUT)public void input(String message) {System.out.println("获取到的消息: " + message);}
}

6、在子工程 stream_consumer (子模块)中,修改 application.yml 配置文件, 添加自定义消息配置。

##  spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002  #服务端口
spring:application:nmae: stream_consumer  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputbinders:  #配置绑定器defaultRabbit:type: rabbit

7、在子工程 stream_product (子模块)中,运行 启动类 ProducerApplication.java 进行测试

/***   spring_cloud_demo\stream_product\src\main\java\djh\it\stream\ProducerApplication.java**   2024-5-9 SpringCloudStream 入门案例:启动类 ProducerApplication.java*      1)引入依赖。*      2)配置 application.yml 配置文件。*      3)发送消息的话,定义一个通道接口,通过接口中内置的 messagechannel,(sprngcloudtream 中内置接口 Source)*      4)@EnableBinding 注解 :绑定对应通道。*      5)发送消息的话,通过 MessageChannel 发送消息,如果需要 MessageChannel --> 通过绑定内置接口获取。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}

8、在子工程 stream_consumer (子模块)中,运行 启动类 ConsumerApplication.java 进行测试。

/***    spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\ConsumerApplication.java**   2024-5-9 SpringCloudStream 入门案例:启动类 ConsumerApplication.java*      1)引入依赖。*      2)配置 application.yml 配置文件。*      3)定义一个通道接口,通过内置获取消息的接口:Sink*      4)绑定对应通道。*      5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class);}
}

9、在子工程 stream_product (子模块)中,运行 一个测试类 ProducterTest.java 进行测试。

/***  spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java**  2024-5-10 创建一个测试类 ProducterTest.java*/package djh.it.stream;import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {@Autowiredprivate MessageSender messageSender;@Testpublic void testSend(){messageSender.send("hello 测试 工具类");}
}

10、启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类,在 idea Run Dashboard 控制面板,

同样会输出 “获取到的消息: hello 测试 工具类”

在这里插入图片描述

Stream_301">二、SpringCloudStream:消息分组

Stream_303">1、SpringCloudStream:消息分组

  • 通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

  • 实现的方式非常简单,我们只需要在服务消费者端设置 spring.c1oud.stream.bindings.input.group 属性即可。

2、在子工程 stream_consumer (子模块),复制一个更名为:在子工程 stream_consumer_2 (子模块),并把 application.yml 配置文件中的端口号改为:7003

1)子工程 stream_consumer_2 (子模块)中的 pom.xml 文件。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>spring_cloud_demo</artifactId><groupId>djh.it</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>stream_consumer_2</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency></dependencies>
</project>
<!-- spring_cloud_demo\stream_consumer_2\pom.xml -->

2)子工程 stream_consumer_2 (子模块)中的 application.yml 文件。

##  spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003  #服务端口
spring:application:nmae: stream_consumer_2  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputbinders:  #配置绑定器defaultRabbit:type: rabbit

3)子工程 stream_consumer_2 (子模块)中的 自定义的消息通道类 MyProcessor.java

/***   spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\channel\MyProcessor.java**   2024-5-11 创建 自定义的消息通道类 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生产者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消费者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

4)子工程 stream_consumer_2 (子模块)中的 获取消息工具类 MessageListener.java

/***   spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\consumer\MessageListener.java**   2024-5-11 创建一个获取消息工具类 MessageListener.java*/package djh.it.stream.consumer;import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {//    //监听 binding 中的消息
//    @StreamListener(Sink.INPUT)
//    public void input(String message) {
//        System.out.println("获取到的消息: " + message);
//    }//监听 binding 中的消息@StreamListener(MyProcessor.MYINPUT)public void input(String message) {System.out.println("获取到的消息: " + message);}
}

5)子工程 stream_consumer_2 (子模块)中的 启动类 ConsumerApplication_2.java

/***   spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\ConsumerApplication_2.java**   2024-5-11 SpringCloudStream 入门案例:启动类 ConsumerApplication_2.java*      1)引入依赖。*      2)配置 application.yml 配置文件。*      3)定义一个通道接口,通过内置获取消息的接口:Sink*      4)绑定对应通道。*      5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication_2 {public static void main(String[] args) {SpringApplication.run(ConsumerApplication_2.class);}
}

3、启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,

在 idea Run Dashboard 控制面板,两个消费都启动类都会输出 “获取到的消息: hello 测试 工具类”

在这里插入图片描述

4、在子工程 stream_consumer (子模块)的 application.yml 配置文件中,添加 消息分组配置。

##  spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002  #服务端口
spring:application:nmae: stream_consumer  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group1  #消息分组(同一组只能有一个消息者获取消息)binders:  #配置绑定器defaultRabbit:type: rabbit

5、在子工程 stream_consumer_2 (子模块)的 application.yml 配置文件中,也添加 消息分组配置。

##  spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003  #服务端口
spring:application:nmae: stream_consumer_2  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group1  #消息分组(同一组只能有一个消息者获取消息)binders:  #配置绑定器defaultRabbit:type: rabbit

6、重新启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,

在 idea Run Dashboard 控制面板,发现只有一个消费都启动类都会输出 “获取到的消息: hello 测试 工具类”

在这里插入图片描述

Stream_538">三、SpringCloudStream:消息分区

1、消息分区

有一些场景需要满足,同一个特征的数据被同一个实例消费,比如同一个id的传感器监测数据必须被同-个实例统计计算分析,否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例.

2、在子工程 stream_producer (子模块)的 application.yml 配置文件中,添加 消息分区配置。

##  spring_cloud_demo\stream_product\src\main\resources\application.ymlserver:port: 7001  #服务端口
spring:application:nmae: stream_product  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:  #管道交互destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myoutput:   # 自定义消息通道destination: djh-custom-outputproducer:  # 配置分区partition-key-expression: payload  # 分区关键字,对象中的 id 或 对象。partition-count: 2  # 分区大小binders:  #配置绑定器defaultRabbit:type: rabbit

3、在子工程 stream_consumer (子模块)的 application.yml 配置文件中,也添加 消息分区配置。

##  spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002  #服务端口
spring:application:nmae: stream_consumer  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:instanceCount: 2  # 消费者总数。instanceIndex: 0  # 当前消费者的索引,从 0 开始。bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group1  #消息分组(同一组只能有一个消息者获取消息)consumer:partitioned: true  # 开启分区支持binders:  #配置绑定器defaultRabbit:type: rabbit

3、在子工程 stream_consumer_2 (子模块)的 application.yml 配置文件中,也添加 消息分区配置。

##  spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003  #服务端口
spring:application:nmae: stream_consumer_2  #指定服务名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:instanceCount: 2  # 消费者总数。instanceIndex: 1  # 当前消费者的索引,从 0 开始。bindings:input:  #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。destination: djh-default  #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。myinput:   #自定义消息通道destination: djh-custom-outputgroup: group2  #消息分组(同一组只能有一个消息者获取消息)consumer:partitioned: true  # 开启分区支持binders:  #配置绑定器defaultRabbit:type: rabbit

4、修改 子工程 stream_producer (子模块)的 测试类 ProducterTest 进行测试。

/***  spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java**  2024-5-10 创建一个测试类 ProducterTest.java*/package djh.it.stream;import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {@Autowiredprivate MessageSender messageSender;@Testpublic void testSend(){
//        messageSender.send("hello 测试 工具类");for(int i=0;i<5;i++){messageSender.send("0");}}
}

5、重新启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,

在 idea Run Dashboard 控制面板,发现只有 ConsumerApplication 一个消费者启动类都会输出 “获取到的消息: 0”

在这里插入图片描述

上一节关联链接请点击:
# 从浅入深 学习 SpringCloud 微服务架构(十五)


http://www.ppmy.cn/ops/41660.html

相关文章

大学生须知~~毕业季行李轻松寄,怎么邮寄行李省钱!

毕业季即将到来&#xff0c;告别母校告别这座城市&#xff0c;肯定恋恋不舍&#xff0c;这几年的学生生涯也留下了不少行李。怎么邮寄才便宜呢&#xff1f;&#xff1f; 记得找惠发快递呀&#xff01;因为我们平台是跟快递总部合作的&#xff0c;不管你寄大件还是快递都很便宜…

王道c语言-文件操作

fopen fgetc fputc fwrite fread fgets fputs //main.c #include <stdio.h> #include <string.h>int main() {FILE *fp;int ret;//打开/创建文件fp fopen("test.txt", "wb");if (NULL fp) {perror("fopen fail");//perror aim to…

google test 使用指南

目录 测试项目 calculator.h calculator.cpp test01.cpp 创建新项目 选择Google Test 选择要测试的项目 pch.cpp 加入依赖 设为启动项目 ​编辑 运行 ​编辑 关键点 测试项目 calculator.h #ifndef __CALCULATOR_H__ #define __CALCULATOR_H__#include <i…

深化产教融合,泰迪智能科技助力西南林业大学提质培优

2024年5月7日&#xff0c;泰迪智能科技昆明分公司院校部总监查良红和数据部负责人余雄亮赴西南林业大学理学院就工作室共建事宜进行交流会谈。西南林业大学理学院院长张雁、党委副书记魏轶、副院长谢爽、就业负责人罗丽及学生代表参与本次交流会。 会议伊始&#xff0c;谢副院长…

【Linux】Centos9设置ActiveMq开机自启功能

配置流程&#xff1a; 1. 创建 Systemd 服务文件。这个文件通常存放在/usr/lib/systemd/system/目录下&#xff0c;命名为 activemq.service。 #先创建文件&#xff0c;然后编辑&#xff1a; sudo touch /usr/lib/systemd/system/activemq.service sudo vim /usr/lib/systemd…

异步I/O库-libuv介绍

1.简介 libuv是一个跨平台的支持事件驱动的异步I/O的库&#xff0c;使开发者可以以非阻塞的方式执行文件I/O操作、网络通信、子进程管理等。 libuv的主要特点包括&#xff1a; 事件循环&#xff1a;libuv有一个基于事件循环的模型&#xff0c;它不断地轮询事件&#xff0c;并…

【实战selenium框架下在爱企查爬取企业的历史变更信息】文末附Google浏览器和驱动的下载

代码如下 # 导入包 import random import time from tkinter import filedialog import tkinter as tk import xlrd import os import datetime import csv from selenium import webdriver from selenium.webdriver import Keys from selenium.webdriver.common.by import By…

亚马逊自养号测评策略:提升店铺产品权重的秘诀

对于卖家而言&#xff0c;拥有一款爆款产品无疑是获得流量的关键&#xff0c;同时它也能显著提升店铺的销量。因此&#xff0c;大部分卖家都热衷于学习如何打造爆款产品的策略&#xff0c;特别是对于那些致力于经营自己店铺的卖家来说&#xff0c;掌握这一技巧对于店铺的成功运…