kafka+spring cloud stream 发送接收消息

devtools/2025/2/27 3:29:43/

方案 1:使用旧版 @StreamListener(适用于 Spring Cloud Stream <= 2.x)

1. 添加依赖(pom.xml

<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 定义消息通道接口

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannels {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

3. 使用 @StreamListener 监听

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {

    @StreamListener(MyChannels.INPUT)
    public void handleMessage(String message) {
        System.out.println("Received via @StreamListener: " + message);
    }
}

4. 配置 application.properties

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain

方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class KafkaStreamListener {

    @Bean
    public Consumer<String> myInput() {
        return message -> {
            System.out.println("Received via Function: " + message);
        };
    }
}

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain

生产者代码示例(发送消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendMessage(String topic, String message) {
        streamBridge.send(topic, message);
    }
}

测试步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。

  2. 创建 Topic

    kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

  3. 发送消息

    kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");

  4. 查看消费者日志

    Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!


常见问题

  1. 版本兼容性

    • Spring Cloud Stream 3.x 后需使用函数式编程。

    • 检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。

  2. 绑定配置

    • 函数式模型中,绑定名称格式为 <functionName>-in-<index>(如 myInput-in-0)。

  3. 序列化配置

    • 若传递 JSON 对象,需配置 content-type=application/json 并添加 Jackson 依赖。


总结

  • 旧版:使用 @StreamListener + 通道接口(适合遗留代码升级)。

  • 新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。

  • 根据实际 Spring Cloud Stream 版本选择方案!


http://www.ppmy.cn/devtools/162950.html

相关文章

数据库(MySQL)二

MySQL 六、MySQL索引视图6.1 索引底层原理6.1.1 索引hash算法6.1.2 索引二叉树算法6.1.3 索引平衡二叉树算法6.1.4 索引BTREE树算法6.1.5 普通SQL全表扫描过程 6.2 索引分类6.2.1 按数据结构层次分类6.2.2 按字段数量层次分类6.2.3 按功能逻辑层次分类&#xff08;面试题&#…

【Elasticsearch】同一台服务器部署集群

【Elasticsearch】同一台服务器部署集群 1. 同一台服务器搭建ES集群2. 配置不同的node节点3. ES集群中安装IK分词器4. 启动es集群5. Kibana访问集群6. es-head7. 集群中创建索引7.1 什么是分片以及分片的好处7.2 副本&#xff08;Replication&#xff09;7.3 通过es-head创建索…

深入讲解微信小程序 <canvas> 标签的 type=“2d“属性

在微信小程序开发中&#xff0c;<canvas> 组件是一个非常强大的工具&#xff0c;允许开发者创建动态图形和动画。然而&#xff0c;正确设置 <canvas> 的 type 属性是确保其正常工作的关键之一。本文将深入探讨 type"2d" 属性的重要性、使用场景及其在实际…

高效日志管理与可视化:Loki与Grafana结合优化高频日志处理

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

卷积这个词在卷积神经网络中应该怎么理解

卷积的定义 数学概念&#xff1a; 在数学上&#xff0c;卷积是一种操作&#xff0c;通常用于两个函数之间的运算。对于图像处理而言&#xff0c;这些函数通常是输入图像和一个称为“卷积核”或“滤波器”的小矩阵。 在CNN中的应用&#xff1a; 卷积操作是通过滑动窗口&#xf…

汽车智能制造企业数字化转型SAP解决方案总结

一、项目实施概述 项目阶段划分&#xff1a; 蓝图设计阶段主数据管理方案各模块蓝图设计方案下一阶段工作计划 关键里程碑&#xff1a; 2022年6月6日&#xff1a;项目启动会2022年12月1日&#xff1a;系统上线 二、总体目标 通过SAP实施&#xff0c;构建研产供销协同、业财一…

django model.object.filter 不等于多个值

关于Django中QuerySet.filter()的使用问题。首先&#xff0c;我会分别针对“不等于多个值”的代码开发问题和可能遇到的报错问题给出解答。 代码开发问题&#xff1a;QuerySet.filter()不等于多个值 在Django中&#xff0c;如果你想在查询中排除多个值&#xff0c;可以使用__i…

客户端进程突然结束,服务端read是什么行为?

read函数的行为 read返回0表示正常结束条件&#xff0c;而非错误&#xff0c;具体场景包括&#xff1a; 文件读取结束&#xff0c;到达文件末尾&#xff08;EOF&#xff09;连接关闭&#xff1a;在socket通信中&#xff0c;对端正常关闭连接&#xff08;如执行close()或进程终…