SpringBoot用kafka.listener监听接受Kafka消息

ops/2024/11/13 9:55:53/

kafka_0">1.创建kafka监听配置并进行注册

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @author 35* @description kafka listen监听配置* @date 2024年04月24日 13:25*/
@Configuration
@EnableKafka
public class KafkaConfig {// kafka实例@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;// kafka AI 服务的Groupprivate String groupId = Constants.KAFKA_AI_SERVER_GROUP;@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置为可以手动消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}

2.使用示例

  @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)public void syncUserByKafKa(String message, Acknowledgment ack) {try {// 调用具体的执行方法bb(message);// 提交kafka消费位移ack.acknowledge();} catch (Exception e) {log.error("失败:" + e.getMessage() + "消息:" + message);} finally {// 提交kafka消费位移ack.acknowledge();}}

http://www.ppmy.cn/ops/111699.html

相关文章

连续时间,离散频率 傅里叶

时域周期——不是把一个信号周期化&#xff0c;而是周期信号取一个周期是x(t),对其周期化不会发生时域的重叠。故当接收到信号&#xff0c;在DFT时&#xff0c;以整个接收到的时间信号为周期进行延拓 推导公式时思路&#xff1a;时域卷积周期冲击&#xff0c;用傅里叶变换推导出…

react 基础语法

前置知识 类的回顾 通过class关键字定义一个类 类名首字母大写 class类有constructor构造器 new 一个类得到一个实例 类还有方法&#xff0c;该方法也会在其原型上 static静态数据&#xff0c;访问静态属性通过 类名.id getter和setter getter&#xff1a;定义一个属性&…

Android 11(API 级别 30)及以上版本中,将Bitmap保存到设备上

调用 saveBitmapToMediaStore(getContentResolver(),bitmap,“图片名”,mimeType); 参数解析&#xff1a; Bitmap myBitmap ...; // 这里应该是你获取或创建Bitmap的代码 private String mimeType "image/jpeg"; // 或者"image/png"&#xff0c;取决于…

CUDA及GPU学习资源汇总

CUDA C Programming Guide 的中文翻译版GPU中的SM和warp的关系推荐几个不错的CUDA入门教程

30. 使用GPU进行模型的训练(二) to()方法

使用GPU进行模型的训练(二) to()方法 1. 使用GPU要注意哪些问题 如果我们使用GPU训练模型&#xff0c;需要同时对 模型、数据、损失函数 同时指定使用GPU模型&#xff0c;否则就会存在运行问题 2. to()方法使用GPU 在使用to()方法之前&#xff0c;首先要定义使用的设别&…

无人机之悬停精度篇

无人机的悬停精度是指无人机在无GPS信号或其他外部定位辅助下&#xff0c;能够保持在一个固定空间位置时的精度。这一精度受到多种因素的影响&#xff0c;包括但不限于风速、气压、温度、湿度以及无人机自身的姿态稳定性等。以下是对无人机悬停精度的详细分析&#xff1a; 一、…

基于单片机的盲人智能水杯系统(论文+源码)

1 总体方案设计 本次基于单片机的盲人智能水杯设计&#xff0c;采用的是DS18B20实现杯中水温的检测&#xff0c;采用HX711及应力片实现杯中水里的检测&#xff0c;采用DS1302实现时钟计时功能&#xff0c;采用TTS语音模块实现语音播报的功能&#xff0c;并结合STC89C52单片机作…

828华为云征文|华为云Flexus X实例docker部署Jitsi构建属于自己的音视频会议系统

828华为云征文&#xff5c;华为云Flexus X实例docker部署Jitsi构建属于自己的音视频会议系统 华为云最近正在举办828 B2B企业节&#xff0c;Flexus X实例的促销力度非常大&#xff0c;特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务的需求&a…