使用微服务Spring Cloud集成Kafka实现异步通信

devtools/2024/10/19 0:22:04/

微服务架构中,使用Spring Cloud集成Apache Kafka来实现异步通信是一种常见且高效的做法。Kafka作为一个分布式流处理平台,能够处理高吞吐量的数据,非常适合用于微服务之间的消息传递。

微服务之间的通信方式包括同步通信和异步通信。

1)同步通信:通常通过HTTP RESTful API或RPC(远程过程调用)实现。服务消费者通过发送HTTP请求到服务提供者,服务提供者处理请求后返回响应。这种方式简单直接,但可能会受到网络延迟和并发量的影响。

同步通信的实现代码参见博文:微服务3:微服务间接口远程调用(同步通信方式)-CSDN博客

2)异步通信:通过消息队列(如RabbitMQ、Kafka等)实现。服务消费者将消息发送到队列中,服务提供者从队列中拉取消息并进行处理。这种方式实现了服务之间的解耦,提高了系统的可扩展性和容错性。但也需要考虑消息的顺序性、一致性和可靠性等问题。

1、本文目标

本文的目标是使用微服务Spring Cloud集成Kafka实现异步通信。本文实现了一个简单的Kafka Producer微服务,连接至部署再Ubuntu系统上的Kafka Server,同时在Ubuntu通过命令行终端启动一个监听的消费者,当通过浏览器测试接口想Kafka Producer微服务发送一条消息,Kafka Producer微服务即刻将该消息发送至Ubuntu系统上的Kafka Server,同时在Kafka consumer终端上可收到并显示出该消息。具体系统架构如下图所示。

部署Kafka Server和Kafka consumer,参见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的实现,参见博文:

微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

2、创建Kafka Producer

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka -DarchetypeArtifactId=maven-archetype-quickstart

完整代码的目录如下:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>microservice-kafka</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>microservice-kafka</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>         <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR4</version><type>pom</type><scope>import</scope></dependency>               </dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

编辑application.yml,配置kafka

bootstrap-servers: 192.168.23.131:9092其中192.168.23.131是Kafka Server的IP地址。

server:port: 8020
spring:application:name: microservice-kafkakafka:bootstrap-servers: 192.168.23.131:9092producer:retries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: alleureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true            

App.java的完整代码如下:

package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@SpringBootApplication
@EnableDiscoveryClient
public class App 
{public static void main( String[] args ){System.out.println( "Hello World!" );SpringApplication.run(App.class, args);}
}

KafkaController.java的完整代码如下:

package com.test;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.kafka.core.*;@RequestMapping("/kafka")
@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("sendMsg")public String helloProducer(String msg){kafkaTemplate.send("mydemo1",msg);return "ok";}}

启动Kafka Producer 和Eureka

mvn spring-boot:run

3、启动Kafka Server及Consumer

bin/kafka-server-start.sh config/server.properties&

创建主题

./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1

在命令行终端启动消费者

bin/kafka-console-consumer.sh --bootstrap-server demo1:9092 --topic mydemo1

4、浏览器测试

在浏览器输入:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

此时在Ubuntu的Consumer终端可以看到从浏览器输入的消息。


http://www.ppmy.cn/devtools/121160.html

相关文章

PyGWalker:让你的Pandas数据可视化更简单,快速创建数据可视化网站

1、PyGWalker应用: 在数据分析的过程中,数据的探索和可视化是至关重要的环节,如何高效地将分析结果展示给团队、客户,甚至是公众,是很多数据分析师和开发者面临的挑战,接下来介绍的两大工具组合——PyGWalker与Streamlit,可以帮助用户轻松解决这个问题,即使没有复杂的代…

Oracle架构之物理存储之审计文件

文章目录 1 审计文件&#xff08;audit files&#xff09;1.1 定义1.2 查看审计信息1.3 审计相关参数1.4 审计的类型1.4.1 语句审计1.4.2 权限审计1.4.3 对象审计1.4.4 细粒度的审计 1.5 与审计相关的数据字典视图 1 审计文件&#xff08;audit files&#xff09; 1.1 定义 审…

PHP爬虫:获取商品销量详情API的利器

在电子商务时代&#xff0c;商品的销量数据对于商家来说至关重要。它不仅能够帮助商家了解市场动态&#xff0c;还能够指导库存管理和营销策略。PHP作为一种流行的服务器端脚本语言&#xff0c;结合其强大的HTTP请求处理能力&#xff0c;可以有效地用于编写爬虫程序&#xff0c…

我博客网站又遭受CC攻击了,记录一下

2024.9.29凌晨4点攻击开始&#xff0c;攻击目标是我的图床tc.zeruns.tech和博客blog.zeruns.tech&#xff0c;图床用的cdn是多吉云融合CDN&#xff0c;流量被刷了20GB左右就触发峰值关闭CDN了&#xff0c;HTTPS请求次数被刷了1.1亿次&#xff0c;因为设置了QPS&#xff0c;实际…

C++11中智能指针以及标准模板库 My_string My_stack

My_string.h #ifndef MY_STRING_H #define MY_STRING_H#include <iostream> #include <cstring> #include <stdexcept>using namespace std;template<typename T> class My_string { private:T *ptr; // 指向字符数组的指针int size; /…

不只是前端,后端、产品和测试也需要了解的浏览器知识(一)

目录标题 一、我们为什么要了解浏览器&#xff1f;1. 对于前端开发者2. 对于后端开发者 二、浏览器发展概述1. 宏观发展2. 微观发展 三、浏览器核心部件1. 浏览器界面介绍2. 目前浏览器的使用的渲染引擎和解释器总结3. 浏览器的解释器 四、各家浏览器目前的市场占比五、整体总结…

什么是 HTTP 请求中的 options 请求?

在 Chrome 开发者工具中的 Network 面板看到的 HTTP 方法 OPTIONS&#xff0c;其实是 HTTP 协议的一部分&#xff0c;用于客户端和服务器之间进行“预检”或“协商”。OPTIONS 请求的作用是让客户端能够获取关于服务器支持的 HTTP 方法和其他跨域资源共享 (CORS) 相关的信息&am…

国外电商系统开发-运维系统功能清单开发

一、最终效果图 二、功能清单 功能 描述 自定义日志绘图 根据Nginx、Apache登录日志文件绘图&#xff0c;绘图数据包括&#xff1a;访问量走势&#xff0c;500错误&#xff0c;200正确百分比等 创建服务器 加入服务器 主机状态自动检查 加入主机到系统后&#xff0c;系统…