SpringBoot集成kafka接收消息

server/2024/12/23 4:30:25/

SpringBoot集成kafka接收消息

  • 1、SpringBoot集成kafka接收消息
  • 2、@Payload注解接收消息体内容
  • 3、@Header注解接收消息头内容
  • 4、接收消息所有内容

在这里插入图片描述

kafka_6">1、SpringBoot集成kafka接收消息

生产者

package com.power.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent(){kafkaTemplate.send("helloTopic","hello kafka");}}

application.yml配置文件

spring:application:#应用名称name: spring-boot-02-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: 47.116.35.15:9092#配置生产者(24个配置)
#    producer:#配置消费者(24个配置)
#    consumer:

测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

2、@Payload注解接收消息体内容

消费者:

package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event){System.out.println("读取/消费到的事件:"+event);}
}

测试结果:
在这里插入图片描述

3、@Header注解接收消息头内容

注意,不太版本kafak使用@Header注解读取partition时不一样:

  • kafka3.0以下版本使用KafkaHeaders.RECEIVED_PARTITION_ID获取分区
  • kafka3.0以上版本使用KafkaHeaders.RECEIVED_PARTITION获取分区

消费者:

package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition){System.out.println("读取/消费到的事件:"+event+",topic:"+topic+",partition:"+partition);}
}

测试结果:
在这里插入图片描述

4、接收消息所有内容

在这里插入图片描述
在这里插入图片描述

消费者:

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record){System.out.println("读取/消费到的事件:"+event+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());}
}

测试打印所有消息:
在这里插入图片描述


http://www.ppmy.cn/server/105285.html

相关文章

计算机毕业设计选题推荐-岗位招聘数据可视化分析-Python爬虫

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

CentOS7发送邮件如何配置SMTP服务器发信?

CentOS7发送邮件安全设置&#xff1f;CentOS7发信性能优化方法&#xff1f; 对于使用CentOS7操作系统的用户而言&#xff0c;配置SMTP服务器以发送邮件是一个关键步骤。AokSend将详细介绍如何在CentOS7中配置SMTP服务器发信的方法和注意事项。 CentOS7发送邮件&#xff1a;准…

18. 分割dataframe:让数据分析更高效

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 如何分割dataframe&#xff1f; 在pandas中&#xff0c;可以使用groupby函数来分割dataframe。groupby函数可以将dataframe中的行按照指定的列进行分组&#xff0c;然后可以对每个组进行各种操作。 下面是一个简单…

设计模式六大原则(一)--单一职责原则

摘要 单一职责原则是设计模式六大原则之一&#xff0c;强调一个类应该仅有一个引起它变化的原因&#xff0c;即每个类应仅负责一项职责。本文通过详细探讨单一职责原则的定义、实现方式、优缺点及其适用场景&#xff0c;揭示了其在软件设计中的核心地位。通过类的拆分、接口设…

flutter 键盘弹出 都会重新Build

原因是调用MediaQuery.of(context)后&#xff0c;点击TextField组件时会导致调用build方法。 解决方法&#xff1a;在Scaffold组件的body嵌套Builder组件&#xff0c;然后设置一个BuildContext变量&#xff0c;将Builder组件中的context传递给BuildContext变量&#xff0c;然后…

C#/WinForm实现炸弹人游戏

游戏类设计 代码地址&#xff08;gitee&#xff09;&#xff1a;炸弹人游戏: WinForm实现炸弹人游戏

线程池详解

概念 线程池&#xff08;Thread Pool&#xff09;是一种基于池化技术的多线程处理形式&#xff0c;用于管理线程的创建和生命周期&#xff0c;以及提供一个用于并行执行任务的线程队列。线程池的主要目的是减少在创建和销毁线程时所花费的开销和资源&#xff0c;提高程序性能&a…

在Excel中“直接引用”字符串地址

indirect是Excel唯一可以拥有直接解析字符串引用地址参数能力的函数&#xff0c;是绝无仅有的宝贝疙瘩。 (笔记模板由python脚本于2024年08月21日 12:45:49创建&#xff0c;本篇笔记适合喜欢用Excel处理数据的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;ht…