spring boot3 kafka集群搭建到使用

news/2025/3/19 22:38:46/

首先自行安装docker,通过docker容器安装kafka
CentOS 系统 docker安装地址

 1.pom.xml和application.properties或者application.yml文件配置

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
spring:kafka:bootstrap-servers: [fafka地址1,fafka地址2,....]
#    producer序列化设置producer:#key序列化设置,设置成json对象
#      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#    val序列化设置,设置成json对象value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

2.博主安装了kafka ui插件,就直接创建主题了

当前一个集群,因为博主只搭建了一台服务器,也可以称为一个节点

创建主题

没有安装kafka ui,就再main那里启动项目时创建

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);TopicBuilder.name("my-new-topic")//主题.partitions(3)//分区.replicas(2)//副本.build();}}

副本就是备份,有几节点就可以创建几个副本,副本数量一般采取分区数量-1,只有一个节点就N分区1副本


 3.在main 加上这个注解@EnableKafka

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);}}

4.生产者发送消息

package com.atguigu.boot3_08_kafka.controller;import com.atguigu.boot3_08_kafka.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/jjj") public String hello() {kafkaTemplate.send("tach", 0,"hello","急急急132");//send("主题", 分区号,"key","val")return "ok";}@GetMapping("/odj")public String odj() {kafkaTemplate.send("tach", 0,"hello",new Person(1L,"odj",19));//对象json需要序列化,可用配置文件配置,也可以在对象中序列化对象return "OK";}
}

5.消费者监听消息

package com.atguigu.boot3_08_kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;@Component
public class MykafkaListener {/*** 默认的监听是从最后一个消息开始拿,也就是只会拿新消息,不会拿历史的* @KafkaListener(topics = "主题",groupId = "用户组")* ConsumerRecord 消费者从 Kafka 获取消息的各种元数据和实际的消息* @param record*/@KafkaListener(topics = "tach",groupId = "teach")public void listen(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}/***  想要到历史的消息或者全部消息,只能设置偏移量*  @KafkaListener(groupId = "用户组" ,topicPartitions = {设置分区,设置偏移量})*  @TopicPartition(topic = "主题" ,partitionOffsets 设置偏移量)*  @PartitionOffset(partition = "哪个分区", initialOffset = "从第几个偏移量开始")** @param record*/@KafkaListener(groupId = "teach" ,topicPartitions = {@TopicPartition(topic = "tach" ,partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})})public void listens(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}
}

最后查看结果


最后补充一个小知识

groupId = "用户组"

组里的成员是竞争模式

用户组和用户组之间是发布/订阅模式

由zookeeper分配管理

好了可以和面试官吹牛逼了


课外话

如果是传对象json需要序列化,创建对象时序列化,不推荐太原始重要是很占资源

因为开始我们都配置好了,有对象就会自动序列化

package com.atguigu.boot3_08_kafka.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person implements Serializable {//不推荐implements Serializable private Long id;private String name;private Integer age;
}


http://www.ppmy.cn/news/1580437.html

相关文章

DeepSeek + Kimi 自动生成 PPT

可以先用deepseek生成ppt大纲&#xff0c;再把这个大纲复制到Kimi的ppt助手里&#xff1a; https://kimi.moonshot.cn/kimiplus/conpg18t7lagbbsfqksg 选择ppt模板&#xff1a; 点击生成ppt就制作好了。

django self.get_queryset() 如何筛选

在Django中&#xff0c;self.get_queryset()是一个在模型管理器的自定义方法中常用的方式&#xff0c;用于返回一个查询集&#xff08;QuerySet&#xff09;。如果你想在get_queryset()方法中添加筛选条件&#xff0c;可以通过以下几种方式来实现&#xff1a; 使用filter() 你…

基于springboot的无人智慧超市管理系统

一、系统架构 前端&#xff1a;vue | element-ui | html | jquery | css | ajax 后端&#xff1a;springboot | mybatis 环境&#xff1a;jdk1.8 | mysql | maven | nodejs | idea 二、代码及数据 三、功能介绍 01. web端-注册 02. web端-登录 03. web…

STM32 DAC详解:从原理到实战输出正弦波

目录 一、DAC基础原理1.1 DAC的作用与特性1.2 DAC功能框图解析 二、DAC配置步骤2.1 硬件配置2.2 初始化结构体详解 三、DAC数据输出与波形生成3.1 数据格式与电压计算3.2 正弦波生成实战3.2.1 生成正弦波数组3.2.2 配置DMA传输3.2.3 定时器触发配置 四、常见问题与优化建议4.1 …

LabVIEW压比调节器动态试验台

本案介绍了一种基于LabVIEW的压比调节器动态试验台的设计&#xff0c;通过实用的LabVIEW图形化编程语言&#xff0c;优化了数据采集与处理的整个流程。案例通过实际应用展示了设计的专业性与高效性&#xff0c;以及如何通过系统化的方法实现精确的动态测试和结果分析。 ​ 项目…

内网环境安装dlv,本地远程调试go

背景&#xff1a;内网环境(服务器)下安装dlv,本地通过dlv调试编译后的go代码。 可以配合观看: 【dlv远程调试-哔哩哔哩】 https://b23.tv/NqPZ5q9 内网安装dlv步骤 1、dlv安装: &#xff08;我额服务器和内网的go都是1.21以上&#xff09; # 先在有网络的环境下&#xff08…

【Go语言圣经2.3】

目标 了解Go 的变量声明有以下几个重要特点&#xff1a; 声明方式多样 使用 var 关键字可以显式声明变量、指定类型或初始化值。使用简短声明符号 : 可以在函数内部快速声明并初始化变量&#xff0c;类型由编译器自动推导。 零值初始化 如果在声明时没有提供初始化表达式&…

【深度学习与大模型基础】第6章-对角矩阵,对称矩阵,正交矩阵

一、对角矩阵 对角矩阵&#xff08;Diagonal Matrix&#xff09;是一种特殊的方阵&#xff0c;其非对角线上的元素均为零&#xff0c;只有对角线上的元素可能非零。具体来说&#xff0c;对于一个 nn的矩阵 A[]&#xff0c;如果满足 则 AA 称为对角矩阵。对角矩阵通常表示为&am…