Spring Cloud Stream 3.x+kafka 3.8整合

devtools/2024/10/19 4:58:44/

Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接

  • 前言
  • 一、如何看官方文档(有深入了解需求的人)
  • 二、kafka的安装
    • tar包安装
    • docker安装
  • 三、代码中集成
    • 创建一个测试topic:test
    • producer代码
    • producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
    • Consumer代码
    • Consumer 配置
    • Consumer 2的代码和配置
  • 四、测试
  • 五、结语

前言

上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。

由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。

官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。

一、如何看官方文档(有深入了解需求的人)

1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程
在这里插入图片描述
2.配置建议看中文版本
在这里插入图片描述

kafka_15">二、kafka的安装

tar包安装

  1. 下载链接:kafka_2.13-3.8.0.tgz

  2. 选择一个合适的位置解压

    tar -zxvf kafka_2.12-3.8.0.tgz
    
  3. 启动自带的zookeeper(后台启动)

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  4. 修改kafka server的配置文件,便于外网能够访问
    找到bin\config目录下的server.properties文件
    修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://172.16.72.133:9092
    
  5. 启动kafka server(后台启动)

    nohup bin/kafka-server-start.sh config/server.properties &
    
  6. 稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
    首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    

    编辑这些新文件并设置如下属性:

    config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2
    

    broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
    然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    

    但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。

docker安装

  1. 拉取镜像

    docker pull apache/kafka:3.8.0
    
  2. 启动

    docker run -p -d 9092:9092 apache/kafka:3.8.0
    

三、代码中集成

创建一个测试topic:test

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。

producer代码

@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}

自定义消息体SimpleMsg,此类不需要序列化

@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}

producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)

key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String

spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json

Consumer代码

 @Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}

Consumer 配置

项目中有更详细的配置,这里为了测试用的简化版

spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json

Consumer 2的代码和配置

项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。

四、测试

生产者发送两个消息
在这里插入图片描述
两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:
在这里插入图片描述
friend模块消费者:
在这里插入图片描述

五、结语

到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。

本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git


http://www.ppmy.cn/devtools/125034.html

相关文章

java.io.StreamCorruptedException: invalid stream header的原因及解决方法

最近在写一个类似于QQ的网络通讯项目&#xff0c;在信息发送的时候出现了一个问题&#xff0c;客户端的信息服务端可以正常收到并且转出&#xff0c;但是对应的客户端在接收的时候就会抛出这个异常&#xff0c;往往还会伴随着java.io.StreamCorruptedException: invalid type c…

如何彻底删除360软件或安装的应用软件

以彻底删除360安全软件为例&#xff08;或其他360相关软件&#xff09;&#xff0c;可以按照以下步骤进行操作&#xff1a; 方法一&#xff1a;通过控制面板卸载 打开控制面板&#xff1a; 在 Windows 操作系统中&#xff0c;右键点击开始菜单&#xff0c;选择“控制面板”。或…

vscode播放MP4文件时候没声音

问题描述&#xff1a; vscode 播放MP4文件时候没有声音 原因分析&#xff1a; https://github.com/microsoft/vscode-docs/blob/vnext/release-notes/v1_72.md#built-in-preview-for-some-audio-and-video-files 解决方案&#xff1a; 从上面描述可以看出&#xff0c;大概…

vue3 vue2

vue3.0是如何变快的&#xff1f; diff算法优化 vue2的虚拟dom是进行全局的对比。vue3 新增了静态标记&#xff08;patchFlag&#xff09; 在与上次虚拟节点进行比较的时候&#xff0c;只对比带有patch Flag的节点&#xff0c;并且可以通过flag的信息得知当前节点要对比的具体内…

Pandas库详细学习要点

Pandas库是一个基于Python的数据分析库&#xff0c;它提供了丰富的数据结构和数据分析工具&#xff0c;非常适合数据科学和数据分析领域的工作。以下是Pandas库详细学习的一些要点&#xff1a; 1. 数据结构 - Series&#xff1a;一维带标签数组&#xff0c;类似于NumPy中的一…

verdaccio使用管理私自npm

参考文献&#xff1a; 使用verdaccio搭建自己的npm私有库_one of the uplinks is down, refuse to publish-CSDN博客 npm i报错request to https://registry.npmjs.org/xxx failed, reason: connect ETIMEDOUT 104.16.25.34:443-CSDN博客 window上搭建npm私仓&#xff08;verd…

vue双向绑定/小程序双向绑定区别

Vue双向绑定与小程序双向绑定在实现方式、语法差异以及功能特性上均存在显著区别。以下是对这两者的详细比较&#xff1a; 一、实现方式 Vue双向绑定 Vue的双向绑定主要通过其响应式数据系统实现。Vue使用Object.defineProperty()方法&#xff08;或在Vue 3中使用Proxy对象&am…

Perl 子程序(函数)

Perl 子程序&#xff08;函数&#xff09; Perl 是一种高级、解释型、动态编程语言&#xff0c;广泛用于CGI脚本、系统管理、网络编程、 finance, bioinformatics, 以及其他领域。在Perl中&#xff0c;子程序&#xff08;也称为函数&#xff09;是组织代码和重用代码块的重要方…