使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

news/2024/10/7 16:37:13/

1、本文架构

本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上,微服务部署在Windows 11系统上,Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之间通过Kafka Server实现异步通信。

出于便于测试的目的,我通过浏览器触发Kafka Producer发送消息,观察Kafka Consumer的后台是否打印出接收到的消息内容。

Ubuntu 上部署Kafka Server,详见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的搭建过程和完整代码,详见博文:微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

Kafka Producer微服务的完整代码,详见博文:使用微服务Spring Cloud集成Kafka实现异步通信-CSDN博客

本文的重点是实现下图中的深蓝色部分:Kafka Consumer微服务

2、创建Spring boot项目(Kafka Consumer微服务项目):

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart

项目代码的完整目录如下图所示:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>microservice-kafka-consumer</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>microservice-kafka-consumer</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>         <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR4</version><type>pom</type><scope>import</scope></dependency>               </dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

编辑application.yml,配置kafka消费者:

consumer:
      #消费的主题
      topic: test-topic
      #消费者组id
      group-id: test-group
      #是否自动提交偏移量
      enable-auto-commit: true
      #提交偏移量的间隔-毫秒
      auto-commit-ms: 1000
      #客户端消费的会话超时时间-毫秒
      session-timeout-ms: 10000
      #实现DeSerializer接口的反序列化类键
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #实现DeSerializer接口的反序列化类值
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:port: 8030
spring:application:name: microservice-kafka-consumerkafka:bootstrap-servers: 192.168.23.131:9092consumer:group-id: test-groupenable-auto-commit: trueauto-commit-ms: 1000session-timeout-ms: 10000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializereureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true            

App.java的完整代码如下:

package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.kafka.annotation.KafkaListener;@SpringBootApplication
@EnableDiscoveryClient
public class App 
{@KafkaListener(topics = "mydemo1")public void listen(String msg) throws Exception {System.out.println( "-----> Recv a msg: " + msg );}public static void main( String[] args ){System.out.println( "Hello World!" );SpringApplication.run(App.class, args);}
}

3、测试

在浏览器输入,触发Kafka Producer向Kafka Server发送消息:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

在Kafka Consumer的后台打印出收到的消息:


http://www.ppmy.cn/news/1535776.html

相关文章

私家车开车回家过节会发生什么事情

自驾旅行或者是自驾车回家过节路程太远。长途奔袭的私家车损耗很大。新能源汽车开始涉足电力系统和燃电混动的能源供应过渡方式。汽车在路途中出现零件故障。计划的出发日程天气原因。台风是否会提醒和注意。汽车的油站供应链和电力充电桩的漫长充电过程。高速公路的收费站和不…

Docker版MKVtoolnix的安装及中文显示

本文是应网友 kkkhi 要求折腾的&#xff0c;只研究了 MKVtoolnix 的安装及中文显示&#xff0c;未涉及到软件的使用&#xff1b; 什么是 MKVtoolnix &#xff1f; MKVToolnix 是一款功能强大的多媒体处理工具&#xff0c;用于在 Linux、其他 Unix 系统和 Windows 上创建、修改和…

AI学习记录 - L2正则化详细解释(权重衰减)

大白话&#xff1a; 通过让反向传播的损失值变得比原来更大&#xff0c;并且加入的损失值关联到神经网络全部权重的大小&#xff0c;当出现权重的平方变大的时候&#xff0c;也就是网络权重往更加负或者更加正的方向走的时候&#xff0c;损失就越大&#xff0c;从而控制极大正…

论文阅读:InternVL v1.5| How Far Are We to GPT-4V? 通过开源模型缩小与商业多模式模型的差距

论文地址&#xff1a;https://arxiv.org/abs/2404.16821 Demo&#xff1a; https://internvl.opengvlab.com Model&#xff1a;https://huggingface.co/OpenGVLab/InternVL-Chat-V1-5 公开时间&#xff1a;2024年4月29日 InternVL1.5&#xff0c;是一个开源的多模态大型语言模…

【Text2SQL】当前在BIRD基准测试集上取得SOTA的论文

论文《The Death of Schema Linking? Text-to-SQL in the Age of Well-Reasoned Language Models》探讨了在大型语言模型&#xff08;LLMs&#xff09;时代&#xff0c;文本到SQL&#xff08;Text-to-SQL&#xff09;转换中模式链接&#xff08;Schema Linking&#xff09;的作…

WPF 设计属性 设计页面时实时显示 页面涉及集合时不显示处理 设计页面时显示集合样式 显示ItemSource TabControl等集合样式

WPF 设计属性 设计页面时实时显示 页面涉及集合时不显示处理 设计页面时显示集合样式 显示ItemSource TabControl等集合样式 1、设计显示属性 1、设计时显示属性依赖以下属性 xmlns:d"http://schemas.microsoft.com/expression/blend/2008"2、在运行时不显示设计属性…

第 21 章 一条记录的多幅面孔——事务的隔离级别与 MVCC

21.1 事前准备 CREATE TABLE hero ( number INT, NAME VARCHAR ( 100 ), country VARCHAR ( 100 ), PRIMARY KEY ( number ) ) ENGINE INNODB CHARSET utf8;INSERT INTO hero VALUES ( 1, 刘备, 蜀 );21.2 事务隔离级别 在保证事务隔离性的前提下&#xff0c;使用不同的隔…

使用Qt实现实时数据动态绘制的折线图示例

基于Qt的 QChartView 和定时器来动态绘制折线图。它通过动画的方式逐步将数据点添加到图表上&#xff0c;并动态更新坐标轴的范围&#xff0c;提供了一个可以实时更新数据的折线图应用。以下是对代码的详细介绍及其功能解析&#xff1a; 代码概述 该程序使用Qt的 QChartView…