手拉手springboot整合kafka发送消息

ops/2024/9/19 4:33:21/ 标签: spring boot, kafka, linq
环境介绍
技术栈springboot+mybatis-plus+mysql+rocketmq
软件版本
mysql8
IDEAIntelliJ IDEA 2022.2.1
JDK17
Spring Boot3.1.7
kafka2.13-3.7.0

创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092
consumer:
auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

Spring-kafka生产者发送消息

.send与sendDefault()方法都返回CompletableFuture<String<k,v>>;

CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

发送Message

@Test
void kafkaSendMessageTest1(){
//通过构建器模式创建Message
Message<String> message = MessageBuilder.withPayload("hello kafka send message")
.setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
.build();
kafkaTemplate.send(message);
}

SendProducerRecord

String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers

@Test
void kafkaSendProducerRecordTest1() {
//参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
Headers headers = new RecordHeaders();
headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String,String> record = new ProducerRecord(
"kafkaTopic01",
0,
System.currentTimeMillis(),
"key",
"hello kafka send message");
kafkaTemplate.send(record);
}

默认主题发送消息

yml配置默认主题

template:
default-topic: default-topic

@Test
void kafkaSendDefaultTest01(){
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
}

发送Object消息

序列化默认为String

@Resource
private KafkaTemplate<String,Object> kafkaTemplate1;
@Test
void kafkaSendObject(){
MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
//分区是null,kafka自行决定消息发送到哪个分区
kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
}

Replica副本

Replica副本:为实现备份公共,保证集群中的某个节点发生故障时,确保节点上的partition数据不丢失,且kafka仍然能够正常运行。
Replica副本分为leader Replica和Follower Replica
leader:每个分区多个副本中的主副本,生产者发送数据以及消费者消费数据都来说leader副本。
Follower:每个分区多个副本中的从副本,实时从leader副本中同步数据,保持和leader副本数据同步,当leader副本发送故障,节点中的某个Follower副本会变成新的leader副本。

指定topic分区及副本

  • 通过脚本命令创建topic时指定分区和副本

--replication-factor需要小于等于节点个数,不能为0,默认为1

--replication-factor 1表示只有本身

--replication-factor 2 表示本身+副本

./kafka-topics.sh --create --topic testTopic --partitions 3 --replication-factor 2 --bootstrap-server 127.0.0.1:9092
  • 代码指定分区及副本

配置bean

@Configuration
public class kafkaConfig {@Beanpublic NewTopic newTopic(){return new NewTopic("topic1", 3, (short) 2);}
}

分区策略

Kafak根据不同策略将数据分配到不同的分区

  • 默认分配策略:BuiltlnPartitioner
  • 轮询分配策略:RoundRobinPartitioner 接口:Partitioner

@Configuration
public class kafkaConfig {//读取application.yml   bootstrap-servers@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;//读取application.yml   bootstrap-servers@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Beanpublic KafkaTemplate<String,?> kafkaTemplate(){return new KafkaTemplate<>(producerFactory());}public Map<String,Object> producerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;}/*** 生产者工厂* @return*/public ProducerFactory<String,?> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfigs());}}

  • 手动指定

  • 自定义策略:自定义类实现Partitioner接口;

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。


http://www.ppmy.cn/ops/47097.html

相关文章

CSS学习笔记之高级教程(五)

23、CSS 媒体查询 - 实例 /* 如果屏幕尺寸超过 600 像素&#xff0c;把 <div> 的字体大小设置为 80 像素 */ media screen and (min-width: 600px) {div.example {font-size: 80px;} }/* 如果屏幕大小为 600px 或更小&#xff0c;把 <div> 的字体大小设置为 30px …

「C系列」C 变量及常见问题梳理

文章目录 一、C 变量1. 整数变量2. 浮点数变量3. 字符变量4. 字符串变量&#xff08;在C中&#xff0c;通常使用字符数组来表示字符串&#xff09; 二、C 变量-常见问题1. 变量未初始化2. 变量类型不匹配3. 变量作用域问题4. 变量命名冲突5. 变量越界访问6. 变量声明位置7. 变量…

1174. 即时食物配送 II

1174. 即时食物配送 II 题目链接&#xff1a;1174. 即时食物配送 II 代码如下&#xff1a; # Write your MySQL query statement below select round(sum(order_datecustomer_pref_delivery_date)*100/count(*),2)as immediate_percentage from Delivery where (customer_id,…

面试官:什么是Redis持久化—>AOF持久化

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…

如何手动批准内核扩展 Tuxera NTFS for mac内核扩展需要批准 内核扩展怎么打开

在了解如何手动批准内核扩展之前&#xff0c;我们应该先了解什么叫做内核扩展。内核扩展又被称为KEXT&#xff0c;通过它可以实现macOS系统与软件组件之间的交互&#xff0c;例如磁盘管理、任务管理和内存管理等等。 kext 是内核扩展&#xff08;Kernel Extension&#xff09;…

【Endnote】如何在word界面加载Endnote

如何在word界面加载Endnote 方法1&#xff1a;方法2&#xff1a;从word入手方法3&#xff1a;从CWYW入手参考 已下载EndNote,但Word中没有显示EndNote&#xff0c;应如何加载显示呢&#xff1f; 方法1&#xff1a; 使用EndNote的Configure EndNote.exe 。 具体步骤为&#x…

开放式虚拟化格式1.0和2.0有什么区别

开放式虚拟化格式&#xff08;Open Virtualization Format&#xff0c;简称OVF&#xff09;是一种用于描述、打包、和分发虚拟机的标准格式。OVF 1.0和OVF 2.0是这个标准的两个不同版本。以下是它们之间的一些主要区别&#xff1a; 1. **扩展性**&#xff1a; - OVF 1.0主要…

Java的扩展性

Java的扩展性主要体现在其面向对象编程的特性上&#xff0c;以及通过抽象、继承、接口和多态等机制来实现代码的重用和扩展。以下是关于Java扩展性的详细描述&#xff1a; 一、基本概念 面向对象编程&#xff1a;Java是一种完全面向对象的编程语言&#xff0c;它支持将现实世…

爬虫技术中的滑块验证问题及解决方案

一、引言 随着大数据时代的到来&#xff0c;网络爬虫技术已成为数据获取和分析的重要工具。然而&#xff0c;随着网络安全性的提高&#xff0c;越来越多的网站开始采用滑块验证技术来防止机器人程序的自动化访问。对于爬虫开发者来说&#xff0c;如何绕过或处理滑块验证成为了…

动态sql set标签 , trim标签

set标签 来看例子 set标案解决了逗号问题(当if条件不满足时,逗号无处安放的问题),我认为set标签可以识别这个问题,并自动忽略这个问题 <update id"update">update employee<set><if test"name!null">name#{name},</if><if te…

list(二)和_stack_queue

嗨喽大家好&#xff0c;时隔许久阿鑫又给大家带来了新的博客&#xff0c;list的模拟实现&#xff08;二&#xff09;以及_stack_queue&#xff0c;下面让我们开始今天的学习吧&#xff01; list(二)和_stack_queue 1.list的构造函数 2.设计模式之适配器和迭代器 3.新容器de…

python udp双向通信

import json import socket import threading import loggingthislist [] thisneednum {}class ChatUdpMain:def __init__(self):#其他原有逻辑 begin#其他原有逻辑 end# 1.创建socket套接字 收self.udp_socket_receive socket.socket(socket.AF_INET, socket.SOCK_DGRAM…

Python一般用什么IDE:深入剖析四大主流选择

Python一般用什么IDE&#xff1a;深入剖析四大主流选择 在Python编程的世界里&#xff0c;选择合适的集成开发环境&#xff08;IDE&#xff09;对于提升编程效率和体验至关重要。本文将从四个方面、五个方面、六个方面和七个方面&#xff0c;深入剖析Python开发者常用的四大主…

数据结构--关键路径

事件v1-表示整个工程开始&#xff08;源点&#xff1a;入度为0的顶点&#xff09; 事件v9-表示整个工程结束&#xff08;汇点&#xff1a;出度为0的顶点&#xff09; 关键路径&#xff1a;路径长度最长的路径 求解关键路径问题&#xff08;AOE网&#xff09; 定义四个描述量 …

全自动打包封箱机:解析其在产品质量与安全保障方面的作用

在当今快节奏的生产环境中&#xff0c;全自动打包封箱机以其高效、精准的特点&#xff0c;正逐渐成为生产线上的得力助手。它不仅提升了生产效率&#xff0c;更在产品质量与安全保障方面发挥着举足轻重的作用。星派将详细解析全自动打包封箱机在产品质量与安全保障方面的作用。…

盲盒小程序预售机制的设计与实施

随着盲盒市场的不断发展&#xff0c;预售机制逐渐成为商家吸引用户、提升销售额的重要手段。本文将探讨盲盒小程序预售机制的设计与实施&#xff0c;以帮助商家更好地满足用户需求并优化库存周转率。 一、预售机制设计原则 在设计预售机制时&#xff0c;商家需要遵循以下几个…

react 合成事件

React合成事件-CSDN博客 当然&#xff0c;很高兴为你解释React中的合成事件概念&#xff0c;非常适合React初学者理解。 想象一下&#xff0c;你正在组织一场派对&#xff0c;为了让派对顺利进行&#xff0c;你需要管理各种活动&#xff0c;比如游戏、音乐和食物分配。但是&a…

成功解决“ModuleNotFoundError: No module named ‘xxx.yyy‘”错误的全面指南

成功解决“ModuleNotFoundError: No module named ‘xxx.yyy’”错误的全面指南 在Python编程中&#xff0c;当我们尝试导入一个模块时&#xff0c;有时会遇到“ModuleNotFoundError: No module named ‘xxx.yyy’”这样的错误。这通常表明Python解释器在尝试查找和导入一个包…

python09 字符串切片

字符串切片 字符串切片(字符串截取) 语法&#xff1a; [start: stop: step] 1.start > 开始索引 默认&#xff1a;0 2.stop > 结束索引&#xff0c;不包括stop 默认&#xff1a;到最后 3.step > 步长 默认&#xff1a;1 三个都有默认值&#xff0c;但注意不能一…

Vue3生命周期

文章目录 前言初始化组件挂载组件更新组件卸载示意图 前言 每个vue实例在被创建都要经历一系列初始化的过程 在这个过程中会运行一些叫做生命周期钩子的函数 使用户可以在页面中不同的阶段执行代码 初始化 1.setup() 在组件实例化时调用&#xff0c;用于设置组件的状态和响应…