spring boot3 kafka集群搭建到使用

devtools/2025/3/19 23:08:42/

首先自行安装docker,通过docker容器安装kafka
CentOS 系统 docker安装地址

 1.pom.xml和application.properties或者application.yml文件配置

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
spring:kafka:bootstrap-servers: [fafka地址1,fafka地址2,....]
#    producer序列化设置producer:#key序列化设置,设置成json对象
#      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#    val序列化设置,设置成json对象value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

2.博主安装了kafka ui插件,就直接创建主题了

当前一个集群,因为博主只搭建了一台服务器,也可以称为一个节点

创建主题

没有安装kafka ui,就再main那里启动项目时创建

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);TopicBuilder.name("my-new-topic")//主题.partitions(3)//分区.replicas(2)//副本.build();}}

副本就是备份,有几节点就可以创建几个副本,副本数量一般采取分区数量-1,只有一个节点就N分区1副本


 3.在main 加上这个注解@EnableKafka

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);}}

4.生产者发送消息

package com.atguigu.boot3_08_kafka.controller;import com.atguigu.boot3_08_kafka.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/jjj") public String hello() {kafkaTemplate.send("tach", 0,"hello","急急急132");//send("主题", 分区号,"key","val")return "ok";}@GetMapping("/odj")public String odj() {kafkaTemplate.send("tach", 0,"hello",new Person(1L,"odj",19));//对象json需要序列化,可用配置文件配置,也可以在对象中序列化对象return "OK";}
}

5.消费者监听消息

package com.atguigu.boot3_08_kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;@Component
public class MykafkaListener {/*** 默认的监听是从最后一个消息开始拿,也就是只会拿新消息,不会拿历史的* @KafkaListener(topics = "主题",groupId = "用户组")* ConsumerRecord 消费者从 Kafka 获取消息的各种元数据和实际的消息* @param record*/@KafkaListener(topics = "tach",groupId = "teach")public void listen(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}/***  想要到历史的消息或者全部消息,只能设置偏移量*  @KafkaListener(groupId = "用户组" ,topicPartitions = {设置分区,设置偏移量})*  @TopicPartition(topic = "主题" ,partitionOffsets 设置偏移量)*  @PartitionOffset(partition = "哪个分区", initialOffset = "从第几个偏移量开始")** @param record*/@KafkaListener(groupId = "teach" ,topicPartitions = {@TopicPartition(topic = "tach" ,partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})})public void listens(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}
}

最后查看结果


最后补充一个小知识

groupId = "用户组"

组里的成员是竞争模式

用户组和用户组之间是发布/订阅模式

由zookeeper分配管理

好了可以和面试官吹牛逼了


课外话

如果是传对象json需要序列化,创建对象时序列化,不推荐太原始重要是很占资源

因为开始我们都配置好了,有对象就会自动序列化

package com.atguigu.boot3_08_kafka.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person implements Serializable {//不推荐implements Serializable private Long id;private String name;private Integer age;
}


http://www.ppmy.cn/devtools/168469.html

相关文章

DataWhale大语言模型-大模型技术基础

DataWhale大语言模型-大模型技术基础 什么是大语言模型预训练和后训练之间的对比大模型预训练(Pre-training)大语言模型后训练(Post-Training)指令微调(Instruction Tuning)人类对齐(Human Alignment) 扩展定律KM扩展定律Chinchilla扩展定律深入讨论 涌现能力代表性能力指令遵循…

SpringBoot第三站(4):配置嵌入式服务器使用外置的Servlet容器

目录 1. 配置嵌入式服务器 1.1 如何定制和修改Servlet容器的相关配置 1.server.port8080 2. server.context-path/tx 3. server.tomcat.uri-encodingUTF-8 1.2 注册Servlet三大组件【Servlet&#xff0c;Filter&#xff0c;Listener】 1. servlet 2. filter 3. 监听器…

使用GitHub Actions实现Git推送自动部署到服务器

将网站一键部署到服务器的方案很多&#xff0c;比如纯Shell脚本结合SSH、Jenkins等工具。本文将介绍如何利用GitHub Actions这一免费且轻量的CI/CD工具&#xff0c;实现代码推送后自动部署到云服务器。 之前一直在使用github的工作流&#xff0c;确实是一个比较好用的工具。 我…

STM32配套程序接线图

1 工程模板 2 LED闪烁 3LED流水灯 4蜂鸣器 5按键控制LED 6光敏传感器控制蜂鸣器 7OLED显示屏 8对射式红外传感器计次 9旋转编码器计次 10 定时器定时中断 11定时器外部时钟 12PWM驱动LED呼吸灯 13 PWM驱动舵机 14 PWM驱动直流电机 15输入捕获模式测频率 16PWMI模式测频率占空…

《保险科技》

自己在保险行业工作很多年&#xff0c;只是接触了一些数据的内容&#xff0c;对于保险业务的知识了解的很少&#xff0c;想通过这本书补充一下&#xff0c;但是发现这本书就是一些知识的拼接。 先将保险的历史&#xff0c;后讲保险的定义&#xff0c;然后就是吹嘘保险行业和互联…

深入理解Spring Boot:快速构建现代化的Java应用

大家好&#xff01;今天我们来聊聊Java开发中最流行的框架之一——Spring Boot。Spring Boot是Spring生态系统中的一个重要模块&#xff0c;它旨在简化Spring应用的开发和部署。通过Spring Boot&#xff0c;开发者可以快速构建独立、生产级的应用程序&#xff0c;而无需繁琐的配…

DeepSeek + 药物研发:解决药物研发周期长、成本高-降低80%、失败率高-减少40%

DeepSeek 药物研发&#xff1a;解决药物研发周期长、成本高-降低80%、失败率高-减少40% 论文大纲1. WHY —— 研究背景与现实问题1.1 研究要解决的现实问题与提出背景1.2 研究所要解决的问题类别1.3 正反例对比关联&#xff1a;和前人的工作有什么关系&#xff1f; 3. &#x…

洛谷 P3986 斐波那契数列

P3986 斐波那契数列 题目描述 定义一个数列&#xff1a; f ( 0 ) a , f ( 1 ) b , f ( n ) f ( n − 1 ) f ( n − 2 ) f(0) a, f(1) b, f(n) f(n - 1) f(n - 2) f(0)a,f(1)b,f(n)f(n−1)f(n−2) 其中 a, b 均为正整数&#xff0c;n ≥ 2。 问有多少种 (a, b)&…