SpringBoot集成kafka接收对象消息

embedded/2024/12/22 15:14:02/

SpringBoot集成kafka接收对象消息

  • 1、生产者
  • 2、消费者
  • 3、工具类
  • 4、消息实体对象
  • 5、配置文件
  • 6、启动类
  • 7、测试类
  • 8、测试结果

在这里插入图片描述

1、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent2(){User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("helloTopic",userJson);}}

2、消费者

package com.power.consumer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(String userJson,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record){User user =JSONUtils.toBean(userJson,User.class);System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());}}

3、工具类

package com.power.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;public class JSONUtils {private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();public static String toJSON(Object object){try {return OBJECTMAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json,Class<T> clazz){try {return OBJECTMAPPER.readValue(json,clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}
}

4、消息实体对象

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

5、配置文件

spring:application:#应用名称name: spring-boot-02-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092

6、启动类

package com.power;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);System.out.println("启动成功--------------------------");}
}

7、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent2(){eventProducer.sendEvent2();}}

8、测试结果

先启动消费者
在启动生产者测试类
已接收到消息对象数据:

在这里插入图片描述


http://www.ppmy.cn/embedded/100477.html

相关文章

通过Python绘制不同数据类型适合的可视化图表

在数据可视化中&#xff0c;对于描述数值变量与数值变量之间的关系常见的有散点图和热力图&#xff0c;以及描述数值变量与分类变量之间的关系常见的有条形图&#xff0c;饼图和折线图&#xff0c;可以通过使用Python的matplotlib和seaborn库来绘制图表进行可视化表达&#xff…

【时间序列预测_python_jupyter】使用neuralforecast包在jupyter-lab上预测并绘图

neuralforecast包有很多引入好的时间序列预测算法模型&#xff0c;可以直接通过接口调用。 支持的算法模型有&#xff1a; __all__ [RNN, GRU, LSTM, TCN, DeepAR, DilatedRNN,MLP, NHITS, NBEATS, NBEATSx, DLinear, NLinear,TFT, VanillaTransformer, Informer, Autoforme…

数字文创产业:用科技讲述文化故事的新方式

当今数字化的时代浪潮中&#xff0c;数字文创产业正以一种全新的姿态崛起&#xff0c;成为用科技讲述文化故事的独特方式。 数字文创产业将传统文化与现代科技相融合&#xff0c;赋予了文化新的生命力和表现力。通过虚拟现实、增强现实、大数据、人工智能等前沿技术&#xff0…

[数据集][目标检测]红外场景下车辆和行人检测数据集VOC+YOLO格式19069张4类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;19069 标注数量(xml文件个数)&#xff1a;19069 标注数量(txt文件个数)&#xff1a;19069 标…

以简单的例子从头开始建spring boot web多模块项目(三)-better mybatis generate的使用

这个也是因为网上找了太多不太对头的文档&#xff0c;这里是验证过的。 1、idea插件中查找better-mybatis-generator&#xff0c;貌似这个版本很久没更新过了。。到现在已经6年了。。 2、我需要连接mysql8.0.38&#xff0c;右侧Database中添加mysql连接&#xff0c;属性如下&a…

Day98:云上攻防-云原生篇K8s安全Config泄漏Etcd存储Dashboard鉴权Proxy暴露

云原生-K8s安全-etcd(Master-数据库)未授权访问 实战中不会常见&#xff0c;利用条件比较苛刻。 默认通过证书认证&#xff0c;起一个数据库作用。主要存放节点的数据&#xff0c;如一些token和证书。 攻击23791端口 配置映射&#xff1a; /etc/kubernetes/manifests/etcd.y…

自动续期 双token流程

为什么需要自动续期 从状态维护说起 http是一个无状态协议 必须靠一些 特定的技术 实现状态的维护 传统web中 session 过程 浏览器输入用户名密码 后端 获取参数 校验登录成功 存储在内存中 否则 后返回 sessionid 浏览器通过 cookie存储 内存存一个sessionid 用户后续请求 …

ip归属地换地方了会自动更新吗

在这个数字化时代&#xff0c;互联网已成为我们生活、工作和学习中不可或缺的一部分。而每一个连接互联网的设备&#xff0c;都会通过其IP地址与外界进行通信。IP地址&#xff0c;这个看似简单的数字组合&#xff0c;实则承载着设备位置、网络身份等重要信息。随着人们移动性的…