Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

ops/2024/12/29 5:00:56/

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml

加入Kafka的配置

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

java">import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;
}

3.2、Task.java

java">import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {private int id;private String description;private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

java">import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

3.4、KafkaProducer.java

java">import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "task-topic";ObjectMapper objectMapper = new ObjectMapper();for (int i = 1; i <= 5; i++) {// 定义一个对象实例User user = User.builder().id(1).name("Alice").build();Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();//JAVA对象转化为JSON字符串String message =  objectMapper.writeValueAsString(task);kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

java">import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {String message = record.value();ObjectMapper objectMapper = new ObjectMapper();Task task = objectMapper.readValue(message,Task.class);// 取出System.out.println("User - Received: " + task.getAssignedUser());// 手动提交offsetacknowledgment.acknowledge();}
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!


http://www.ppmy.cn/ops/145827.html

相关文章

sql字段值转字段

表alertlabel中记录变字段 如何用alertlabel表得到下面数据 实现的sql语句 select a.AlertID, (select Value from alertlabel where AlertIDa.AlertID and Labelhost) as host, (select Value from alertlabel where AlertIDa.AlertID and Labeljob) as job from (select …

this的指向问题

在JavaScript和Vue.js的上下文中&#xff0c;this的指向是由函数的调用方式决定的。理解this的指向对于编写正确的Vue组件和JavaScript代码至关重要。 ‌全局上下文中的this‌&#xff1a; 在全局执行环境中&#xff08;比如浏览器中的window对象或者Node.js的全局环境&#…

深度学习驱动的油气开发技术与应用

在深度学习与油气开发领域融合的背景下&#xff0c;科研边界持续扩展&#xff0c;创新成果不断涌现。从基本物理模型构建到油气开发问题的复杂模拟&#xff0c;从数据驱动分析到工程问题的智能解决&#xff0c;深度学习正以前所未有的动力推动油气开发领域的革新。以下是深度学…

【Java基础面试题038】栈和队列在Java中的区别是什么?

回答重点 栈&#xff08;Stack&#xff09;&#xff1a;遵循后进先出&#xff08;LIFO&#xff0c;Last In&#xff0c;First Out&#xff09;原则。即&#xff0c;最后插入的元素最先被移除。主要操作包括push&#xff08;入栈&#xff09;和pop&#xff08;出栈&#xff09;…

Spring Boot 整合 RabbitMQ:手动 ACK 与 QoS 配置详解

在分布式系统中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信的重要组件。RabbitMQ 作为一个功能强大的消息代理&#xff0c;提供了多种消息传递模式和丰富的配置选项。在生产环境中&#xff0c;为了确保消息的可靠传递&#xff0c;我们通常需要配…

HTML5 开发工具与调试

HTML5 开发工具与调试 在开发HTML5应用时&#xff0c;使用合适的工具可以大大提高效率和代码质量。以下是一些常用的开发工具和调试技巧。 1. 使用浏览器开发者工具 现代浏览器都提供了强大的开发者工具&#xff0c;帮助开发者调试和优化网页。 主要功能&#xff1a; 元素…

【RAG实战】语言模型基础

语言模型赋予了计算机理解和生成人类语言的能力。它结合了统计学原理和深度神经网络技术&#xff0c;通过对大量的样本数据进行复杂的概率分布分析来学习语言结构的内在模式和相关性。具体地&#xff0c;语言模型可根据上下文中已出现的词序列&#xff0c;使用概率推断来预测接…

table 表格转成 excell 导出

OK&#xff0c;功能非常简单&#xff0c;但是很实用啊&#xff01; 依赖安装 这里我们需要安装两个依赖&#xff1a; xlsx 和 file-saver&#xff0c;就可以帮助我们实现功能了&#xff01; npm i xlsx file-saver代码参考 导出方法 utils/index.js import * as XLSX from …