使用微服务Spring Cloud集成Kafka实现异步通信

embedded/2024/12/23 8:43:27/

微服务架构中,使用Spring Cloud集成Apache Kafka来实现异步通信是一种常见且高效的做法。Kafka作为一个分布式流处理平台,能够处理高吞吐量的数据,非常适合用于微服务之间的消息传递。

微服务之间的通信方式包括同步通信和异步通信。

1)同步通信:通常通过HTTP RESTful API或RPC(远程过程调用)实现。服务消费者通过发送HTTP请求到服务提供者,服务提供者处理请求后返回响应。这种方式简单直接,但可能会受到网络延迟和并发量的影响。

同步通信的实现代码参见博文:微服务3:微服务间接口远程调用(同步通信方式)-CSDN博客

2)异步通信:通过消息队列(如RabbitMQ、Kafka等)实现。服务消费者将消息发送到队列中,服务提供者从队列中拉取消息并进行处理。这种方式实现了服务之间的解耦,提高了系统的可扩展性和容错性。但也需要考虑消息的顺序性、一致性和可靠性等问题。

1、本文目标

本文的目标是使用微服务Spring Cloud集成Kafka实现异步通信。本文实现了一个简单的Kafka Producer微服务,连接至部署再Ubuntu系统上的Kafka Server,同时在Ubuntu通过命令行终端启动一个监听的消费者,当通过浏览器测试接口想Kafka Producer微服务发送一条消息,Kafka Producer微服务即刻将该消息发送至Ubuntu系统上的Kafka Server,同时在Kafka consumer终端上可收到并显示出该消息。具体系统架构如下图所示。

部署Kafka Server和Kafka consumer,参见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的实现,参见博文:

微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

2、创建Kafka Producer

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka -DarchetypeArtifactId=maven-archetype-quickstart

完整代码的目录如下:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>microservice-kafka</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>microservice-kafka</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>         <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR4</version><type>pom</type><scope>import</scope></dependency>               </dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

编辑application.yml,配置kafka

bootstrap-servers: 192.168.23.131:9092其中192.168.23.131是Kafka Server的IP地址。

server:port: 8020
spring:application:name: microservice-kafkakafka:bootstrap-servers: 192.168.23.131:9092producer:retries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: alleureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true            

App.java的完整代码如下:

package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@SpringBootApplication
@EnableDiscoveryClient
public class App 
{public static void main( String[] args ){System.out.println( "Hello World!" );SpringApplication.run(App.class, args);}
}

KafkaController.java的完整代码如下:

package com.test;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.kafka.core.*;@RequestMapping("/kafka")
@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("sendMsg")public String helloProducer(String msg){kafkaTemplate.send("mydemo1",msg);return "ok";}}

启动Kafka Producer 和Eureka

mvn spring-boot:run

3、启动Kafka Server及Consumer

bin/kafka-server-start.sh config/server.properties&

创建主题

./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1

在命令行终端启动消费者

bin/kafka-console-consumer.sh --bootstrap-server demo1:9092 --topic mydemo1

4、浏览器测试

在浏览器输入:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

此时在Ubuntu的Consumer终端可以看到从浏览器输入的消息。


http://www.ppmy.cn/embedded/123359.html

相关文章

网络安全学习的详细要点

网络安全学习的详细要点可以归纳为以下几个方面&#xff1a; 1. 基础知识学习 计算机网络基础&#xff1a;了解各种网络协议、防火墙、数据转发原理等。 操作系统基础&#xff1a;熟悉操作系统的基本原理和操作。 编程语言&#xff1a;掌握C、C、Python等编程语言&#xff…

已解决:AttributeError: ‘str‘ object has no attribute ‘decode‘

已解决&#xff1a;AttributeError: ‘str’ object has no attribute ‘decode’ 文章目录 写在前面问题描述报错原因分析 解决思路解决办法1. 确保只对 bytes 对象调用 decode()2. 将 Python 2 的旧代码迁移到 Python 33. 检查数据来源4. 处理编码不一致的问题5. 使用 six 库…

如何使用ssm实现果蔬商品管理系统的设计与实现+vue

TOC ssm777果蔬商品管理系统的设计与实现vue 第1章 绪论 1.1 课题背景 二十一世纪互联网的出现&#xff0c;改变了几千年以来人们的生活&#xff0c;不仅仅是生活物资的丰富&#xff0c;还有精神层次的丰富。时代进步的标志&#xff0c;就是让人们过上更好的生活。在互联网…

机器人跳跃问题

机器人正在玩一个古老的基于 DOS 的游戏。 游戏中有 N1N1 座建筑——从 00 到 NN 编号&#xff0c;从左到右排列。 编号为 00 的建筑高度为 00 个单位&#xff0c;编号为 ii 的建筑高度为 H(i)H(i) 个单位。 起初&#xff0c;机器人在编号为 00 的建筑处。 每一步&#xff…

【计算机视觉】ch1-Introduction

相机模型与成像 1. 世界坐标系 (World Coordinate System) 世界坐标系是指物体在真实世界中的位置和方向的表示方式。在计算机视觉和图像处理领域&#xff0c;世界坐标系通常是一个全局坐标系统&#xff0c;描述了摄像机拍摄到的物体在实际三维空间中的位置。它是所有其他坐标…

Library介绍(三)

环境描述 工作条件 一般lib文件里面包含了芯片的工作条件即operation conditions&#xff0c;其指定了工艺&#xff08;process&#xff09;、温度&#xff08;temperature&#xff09;和电压&#xff08;voltage&#xff09;&#xff0c;见图1。 其中&#xff0c;process代表了…

角色动画——RootMotion全解

1. Unity(2022)的应用 由Animtor组件控制 在Animation Clip下可进行详细设置 ​ 官方文档的介绍(Animation选项卡 - Unity 手册) 上述动画类型在Rag选项卡中设置: Rig 选项卡上的设置定义了 Unity 如何将变形体映射到导入模型中的网格&#xff0c;以便能够将其动画化。 对于人…

SOMEIP_ETS_164: SD_SubscribeEventgroup_with_unallowed_option_ip_2

测试目的&#xff1a; 验证DUT能够拒绝一个在请求中包含错误参数&#xff08;端点选项中包含无效IPv4地址&#xff0c;即111.111.111.111&#xff09;的SubscribeEventgroup消息&#xff0c;并以SubscribeEventgroupNAck作为响应。 描述 本测试用例旨在确保DUT遵循SOME/IP协…