在 DDD 中优雅的发送 Kafka 消息

ops/2024/12/18 11:04:25/

前言
1:host 映射
在这里插入图片描述
下载 SwitchHost 配置一个映射地址。点击 + 添加一个本地环境,之后配置你的 IP kafka 这样就能找这个地址了。IP 为你本地的IP,如果是云服务器就是公网IP地址
使用docker-compose.yml进行一键部署安装

version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:zookeeper:image: zookeeper:3.9.0container_name: zookeeperrestart: alwaysports:- "2181:2181"environment:ZOO_MY_ID: 1ZOO_SERVERS: server.1=zookeeper:2888:3888;2181ZOOKEEPER_CLIENT_PORT: 2181ALLOW_ANONYMOUS_LOGIN: yesTZ: Asia/Shanghainetworks:- my-networkkafka:image: bitnami/kafka:3.7.0container_name: kafkavolumes:- /etc/localtime:/etc/localtimeports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_CFG_LISTENERS: PLAINTEXT://:9092KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ALLOW_PLAINTEXT_LISTENER: yesKAFKA_MESSAGE_MAX_BYTES: "2000000"KAFKA_ENABLE_KRAFT: noJMX_PORT: 9999TZ: Asia/Shanghaidepends_on:- zookeepernetworks:- my-networkkafka-eagle:image: echo21bash/kafka-eagle:3.0.2container_name: kafka-eagleenvironment:KAFKA_EAGLE_ZK_LIST: zookeeper:2181volumes:- ./kafka-eagle/system-config.properties:/opt/kafka-eagle/conf/system-config.propertiesports:- "8048:8048"depends_on:- kafkanetworks:- my-networknetworks:my-network:driver: bridge

脚本在代码中提供了完整的语句
消息流程
在这里插入图片描述
代码结构
在这里插入图片描述
1:domain 是领域层,提供一个个领域服务。如果一个工程有多个领域,则有不同的 a、b、c 领域包,每个包下有一套【event、model、repository、service】。
2:在领域层定义的 event 事件,里面涵盖了事件消息。而这个事件消息可以让 UserRepository 继承实现。最终完成消息发送。
3:最后是 trigger 触发器层,所有的 http、rpc、job、mq 都是一种触发行为。通过触发器的 listener 监听,来接收 mq 消息。
环境配置

spring:kafka:bootstrap-servers: localhost:9092producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1...# 配置主题
kafka:topic:group: xmg-groupuser: xmg-topic

配置发送事件

@Slf4j
@Component
public class EventPublisher {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {try {String messageJson = JSON.toJSONString(eventMessage);kafkaTemplate.send(topic, messageJson);log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);} catch (Exception e) {log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);throw e;}}}

事件消息定义

public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {@Value("${kafka.topic.user}")private String topic;@Overridepublic EventMessage<UserMessage> buildEventMessage(UserMessage data) {return EventMessage.<UserMessage>builder().id(RandomStringUtils.randomNumeric(11)).timestamp(new Date()).data(data).build();}@Overridepublic String topic() {return topic;}/*** 要推送的事件消息,聚合到当前类下。*/@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic static class UserMessage {private String userId;private String userName;private String userType;}}

事件消息发送

@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {@Resourceprivate EventPublisher publisher;@Overridepublic void doSaveUser(UserEntity userEntity) {// 推送消息publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder().userId(userEntity.getUserId()).userName(userEntity.getUserName()).userType(userEntity.getUserTypeVO().getDesc()).build()));}}

事件消息监听

@Slf4j
@Component
public class KafkaMessageListener {@KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();try {// 逻辑处理// 确认消息消费完成,如果抛异常消息会进入重试ack.acknowledge();log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);} catch (Exception e) {e.printStackTrace();log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);}}}}

测试验证

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {@Resourceprivate IUserService userService;@Testpublic void test_register() throws InterruptedException {while (true) {UserEntity userEntity = new UserEntity();userEntity.setUserId("10001");userEntity.setUserName("小明哥");userEntity.setUserTypeVO(UserTypeVO.T8);userService.register(userEntity);Thread.sleep(1500);}}}

好了 至此 在 DDD 中优雅的发送 Kafka 消息 学习结束了 友友们 点点关注不迷路 老铁们!!!!!


http://www.ppmy.cn/ops/142890.html

相关文章

【信息系统项目管理师】【综合知识】【备考知识点】第二十章 高级项目管理

【移动端浏览】☞ 【信息系统项目管理师】第二十章 高级项目管理 第二十章 高级项目管理 项目集管理 &#xff08;项目集管理&#xff09;角色和职责(1)项目集发起人 ①为项目集提供资金&#xff0c;确保项目集目标与战略愿景保持一致&#xff1b; ②使效益实现交付&#xff…

【网络】五种IO模型多路转接select/poll/epollReactor反应堆模式

主页&#xff1a;醋溜马桶圈-CSDN博客 专栏&#xff1a;计算机网络原理_醋溜马桶圈的博客-CSDN博客 gitee&#xff1a;mnxcc (mnxcc) - Gitee.com 目录 1.五种 IO 模型 1.1 阻塞 IO 1.2 非阻塞 IO 1.3 信号驱动 IO 1.4 IO 多路转接 1.5 异步 IO 2.高级 IO 重要概念 2.1 …

elasticsearch 使用enrich processor填充数据

文章目录 使用 POST 请求手动插入用户数据1. 创建 Enrich Policy步骤 1.1: 创建 Enrich Policy步骤 1.2: 执行 Enrich Policy 2. 创建 Ingest Pipeline步骤 2.1: 创建 Ingest Pipeline步骤 2.2: 配置 Enrich Processor 参数 3. 使用 Ingest Pipeline步骤 3.1: 使用 Pipeline 进…

数字经济转型(三):要素市场化

商业的本质是价值创造和价值交换&#xff0c;数据要素的流通同样遵循此原则。数据要素本身是数字世界的数据&#xff0c;同时又映射了现实世界的价值属性&#xff0c;其使用价值具备了价值交换的基础&#xff0c;流通价值代表了具备流通的可能性。 按国家数据局的官方释义&…

spring循环依赖深度源码解析

spring循环依赖深度源码解析 一&#xff0c;什么是循环依赖问题 简单来说循环依赖就是在spring容器中的两个Bean互相调用对方 在这里我们创建了两个对象A,B&#xff0c;在A中调用B&#xff0c;在B中调用A&#xff0c;这样就会产生循环依赖问题 public class A {private B b…

国产Linux系统如何部署ftp文件共享服务器

在Linux系统上部署FTP&#xff08;文件传输协议&#xff09;文件共享服务器通常涉及安装和配置FTP服务器软件。最常用的FTP服务器软件之一是vsftpd&#xff08;Very Secure FTP Daemon&#xff09;。以下是如何在Linux上部署FTP文件共享服务器的步骤&#xff1a; 一、安装vsft…

[C++]运算符重载

一、 什么是运算符重载&#xff1f; 运算符重载是 C 中的一种功能&#xff0c;它允许用户定义的类或数据类型重新定义或扩展运算符的行为&#xff0c;使运算符能够作用于用户定义的对象。 二、 通俗解释 在 C 中&#xff0c;运算符&#xff08;如 , -, *, 等&#xff09;默认…

电气CAD制图软件概述及主要电气CAD软件介绍

一、电气CAD制图软件概述 电气CAD制图软件&#xff0c;即电气计算机辅助设计软件&#xff0c;是一种用于电气系统设计的专业软件。这类软件能够通过计算机帮助电气工程师完成从简单的电路设计到复杂的电气系统设计等各种任务。常用的电气CAD制图软件主要有AutoCAD, EPLAN,SEE E…