SpringBoot集成kafka-生产者发送消息

server/2024/10/25 18:24:47/

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别

在这里插入图片描述在这里插入图片描述

kafkaTemplatesend_3">1、kafkaTemplate.send()方法

kafkaMessage_5">1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}
}

2、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testMessage(){eventProducer.sendMessage();}@Testvoid testMessage(){eventProducer.sendMessage();}
}

kafkaProducerRecord_68">1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}public void sendProducerRecord(){//Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息Headers headers = new RecordHeaders();headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String,String> record =new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);kafkaTemplate.send(record);}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testProducerRecord(){eventProducer.sendProducerRecord();}}

kafka_139">1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent4(){//String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent4(){eventProducer.sendEvent4();}
}

kafkaTemplatesendDefault_194">2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendDefault(){//Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendDefault(){eventProducer.sendDefault();}
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

kafkaTemplatesendkafkaTemplatesendDefault_254">3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述


http://www.ppmy.cn/server/104496.html

相关文章

未设置辅助手机的谷歌账号停用,申诉回来后登录需要手机验证,验证两次后成功恢复。

谷歌账号被停用后怎么办&#xff1f;果断申诉&#xff0c;申诉方法和模板等见我前面的文章或视频。 通常申诉以后1-2天会反馈结果&#xff0c;而且大部分&#xff08;80%以上&#xff09;会第一次就被审批通过&#xff0c;如下图所示&#xff1a; 当收到上面这样的邮件&#x…

MySQL中处理JSON数据:大数据分析新方向,技术详解与应用场景

随着大数据时代的来临&#xff0c;数据分析和处理成为企业决策和业务优化的关键手段。JSON&#xff08;JavaScript Object Notation&#xff09;作为一种轻量级的数据交换格式&#xff0c;因其易于阅读和编写&#xff0c;以及易于机器解析和生成&#xff0c;被广泛应用于Web应用…

SQL——建表时是否需要设置外键?从哪些方面考虑?

1. 设置外键&不设置外键区别和影响 在数据库设计中&#xff0c;是否设置外键会对数据的完整性、安全性、性能等多个方面产生影响。以下是设置外键与不设置外键的区别和影响&#xff1a; 1. 数据完整性 设置外键: 强制数据完整性: 外键约束确保引用关系中的数据保持一致性…

C++中二叉搜索树的底层原理及实现

小编在学习完二叉搜索树(SearchBinaryTree)之后觉得虽然二叉搜索树不是很难&#xff0c;但是它对于后面学习C中的AVL树和红黑树及map和set的封装都有重要的作用&#xff0c;因此小编今天带给大家二叉搜索树的原理及实现&#xff0c;话不多说&#xff0c;开始学习&#xff01;~~…

PT:如何获取net的Delta delay信息

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 来自星球提问: crosstalk的Delta delay可以从三个渠道获取: report_timing

UE5 多个类选择界面生成。解决方案思路。

中控器CC 》用户界面控制器UI_CC 》用户界面UI_Inst 生成 CC使用接口&#xff0c;通知UI_CC开始生成UI_Inst。 蓝图函数库编写判断是否存在和创建UI的蓝图。&#xff08;此处略&#xff09; UI_CC生成时&#xff0c;userwidget使用接口&#xff0c;注册UI_CC的用户控件的控件…

HTML静态网页成品作业(HTML+CSS)——原神介绍设计制作(4个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有4个页面。 二、作品演示 三、代…

LangGPT结构化提示词

LangGPT是Language For GPT-like LLMs的简称&#xff0c;中文名为结构化提示词&#xff0c;LangGPT是一个帮助你编写高质量提示词的工具&#xff0c;理论基础是我们提出的一套模块化、标准化的提斯提编写方法论——结构化提示词。我们希望揭开提示工程的神秘面纱&#xff0c;为…