SpringBoot集成kafka-监听器注解

devtools/2025/1/16 5:54:27/

SpringBoot集成kafka-监听器注解

  • 1、application.yml
  • 2、生产者
  • 3、消费者
  • 4、测试类
  • 5、测试

在这里插入图片描述

1、application.yml

在这里插入图片描述

#自定义配置
kafka:topic:name: helloTopicconsumer:group: helloGroup

2、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent2(){User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("helloTopic",userJson);}}

3、消费者

package com.power.consumer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.group}")public void onEvent3(String userJson,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record){User user =JSONUtils.toBean(userJson,User.class);System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());}}

在这里插入图片描述

4、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent2(){eventProducer.sendEvent2();}}

5、测试

先启动消费者监听
再启动生产者发送消息
消费者依然可以正常监听到消息:
在这里插入图片描述


http://www.ppmy.cn/devtools/100284.html

相关文章

数学建模~~~预测方法--决策树模型

目录 0.直击重点 1.决策树概念 2.节点特征的选择算法 3.基尼系数的计算 4.决策树的分类 5.模型的搭建 6.模型的改进和评价 ROC曲线 参数调优 &#xfeff;GridSearch网格搜索 使用搜索结果重新建模 0.直击重点 这个文章&#xff0c;我们从三个维度进行说明介绍&#…

HTTP 414错误问题

问题描述&#xff1a; 在一次前端编辑报表完成&#xff0c;打开审核人选择弹出框的时候&#xff0c;layer直接报414错误。 问题分析&#xff1a; HTTP 414是HTTP协议中的一个状态码&#xff0c;表示请求的URI&#xff08;Uniform Resource Identifier&#xff09;过长&#…

STM32(六):定时器——输出比较实验

PWM驱动呼吸灯 源码&#xff1a; #include "stm32f10x.h" // Device headervoid PWM_Init(void) {RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2,ENABLE);//开启时钟TIM_InternalClockConfig(TIM2);//选择时基单元的时钟TIM_TimeBaseInitTypeDef TI…

【Linux —— 线程互斥】

Linux —— 线程互斥 1. 临界资源与临界区2. 互斥的定义3. 原子性4. 互斥量(Mutex)5. 互斥的实现示例6. 互斥量实现原理探究 1. 临界资源与临界区 临界资源: 指的是多个线程或进程共享的资源&#xff0c;例如全局变量、文件、数据库等。由于这些资源的共享&#xff0c;可能会导…

SQL注入漏洞WAF绕过

目录 如何检测和防范SQL注入攻击中的编码伪装&#xff1f; 检测SQL注入攻击中的编码伪装 防范SQL注入攻击中的编码伪装 WAF在处理SQL注入时为什么有时会对大小写不敏感&#xff1f; SQL注入中的联合查询注入有哪些常见的攻击方式&#xff1f; 在绕过Web应用防火墙&#xf…

Linux文件目录系统

Linux文件目录是Linux操作系统中用于组织和存储文件及子目录的层次结构。这个结构从根目录&#xff08;/&#xff09;开始&#xff0c;向下分支出多个子目录&#xff0c;每个子目录又可以包含更多的文件和子目录&#xff0c;形成一棵倒置的树状结构。以下是Linux文件目录的一些…

动态库和静态库(.so/dll,.a/lib)

目录 一、库的概念 二、库的分类 1.动态库 2.静态库 三、查看所依赖的动静态库 四、制作、发布与使用静态库 1.制作并发布静态库 2.使用静态库 五、制作、发布与使用动态库 1.制作并发布动态库 2.使用动态库 六、总结 在一个大型的项目中&#xff0c;我们可以使用别…

tornado一个请求对应一个实例

在Tornado框架中&#xff0c;关于请求处理与实例创建的行为&#xff0c;主要取决于你是如何组织你的Tornado应用的。通常&#xff0c;Tornado本身并不直接为每个请求新建一个类的实例&#xff0c;而是使用了一种称为“RequestHandler”的机制来处理请求。 基本的RequestHandle…