基于模板方法模式-消息队列发送

devtools/2025/1/24 7:03:31/

基于模板方法模式-消息队列发送

消息队列广泛应用于现代分布式系统中,作为解耦、异步处理和流量控制的重要工具。在消息队列的使用中,发送消息是常见的操作。不同的消息队列可能有不同的实现方式,例如,RabbitMQ、Kafka、RocketMQ、Redis等。为了统一操作流程和复用代码,可以使用设计模式来简化开发工作。

模板方法模式(Template Method Pattern)是一种行为型设计模式,旨在通过定义一个操作的骨架,而将一些步骤延迟到子类中实现。在消息队列发送场景中,模板方法模式可以帮助我们定义消息发送的基本流程,同时将具体的发送操作抽象为子类来实现,从而避免重复的代码。

1. 使用 RocketMQ 发送普通消息

java">public class YourServiceCode {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate ConfigurableEnvironment configurableEnvironment;@Transactional(rollbackFor = Exception.class)public void yourMethod(YourRequestDTO requestParam) {// 业务逻辑:// ...... 省略业务处理部分// 如果是立即发送任务,直接调用消息队列进行发送if (Objects.equals(requestParam.getSendType(), YourSendTypeEnum.IMMEDIATE.getType())) {// 定义消息队列 Topic(根据需要调整实际的 Topic)String topic = "your-topic-name-${unique-name}";// 通过 Spring 上下文解析占位符,把模板中的 unique-name 替换为实际值topic = configurableEnvironment.resolvePlaceholders(topic);// 构建消息体String messageKeys = UUID.randomUUID().toString();Message<Long> message = MessageBuilder.withPayload(yourDO.getId())  // 使用具体的 DO 对象.setHeader(MessageConst.PROPERTY_KEYS, messageKeys)  // 设置消息的 Key.build();// 执行消息队列发送及异常处理逻辑SendResult sendResult;try {sendResult = rocketMQTemplate.syncSend(topic, message, 2000L);  // 发送消息,并设置超时时间log.info("[生产者] 执行任务 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys);} catch (Exception ex) {log.error("[生产者] 执行任务 - 消息发送失败,消息体:{}", yourDO.getId(), ex);  // 异常处理}}}
}

每次发送消息时,总是充斥着大量相同的冗余代码,这些逻辑散落在业务代码中,不利于对核心业务的理解和维护。

2.什么是模板方法模式

模板方法模式(Template Method Pattern)是一种行为型设计模式,旨在通过在父类中定义一个算法的框架(即模板方法),而将某些步骤延迟到子类中实现。模板方法模式让子类可以在不改变算法结构的情况下,重新定义算法中的某些特定步骤。

模板方法模式中:

  • 父类提供了一个骨架方法(即模板方法),并规定了算法的步骤。
  • 子类负责实现某些具体的步骤,这些步骤通常是一些可变的行为,父类并不关心。
  • 父类的模板方法调用这些可变的步骤,完成一个完整的算法流程。

通过这种方式,可以确保算法的结构一致,但又允许各个子类根据需要修改特定的步骤。

3. 使用模板方法模式的好处

使用模板方法模式后,我们可以将发送消息的流程固定,并把具体的步骤通过子类来实现。这样带来了以下好处:

  • 代码复用:通过模板方法模式,可以将发送消息的共同逻辑提取到父类中,避免了重复代码。例如,连接消息队列和错误处理等通用逻辑只需在父类中实现。
  • 清晰的流程控制模板方法模式能够清晰地定义消息发送的流程,所有子类遵循同一流程,便于理解和调试。
  • 易于扩展:如果需要支持不同的消息队列,只需要创建不同的子类实现具体的发送逻辑,父类的通用流程不变,符合开闭原则,便于扩展。
  • 高内聚低耦合:每个消息队列的具体实现仅依赖于模板方法模式中的抽象方法,具有很高的内聚性,同时也保持了与其他模块的低耦合。

4.模板方法设计模式重构消息发送

4.1 定义消息发送事件基础实体

java">@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public final class BaseSendExtendDTO {/*** 事件名称*/private String eventName;/*** 主题*/private String topic;/*** 标签*/private String tag;/*** 业务标识*/private String keys;/*** 发送消息超时时间*/private Long sentTimeout;/*** 具体延迟时间*/private Long delayTime;
}

另外,有些和业务无关的属性,我们再抽象一层 Wrapper 类,用于定义消息发送基础内容。

java">@Data
@Builder
@NoArgsConstructor(force = true)
@AllArgsConstructor
@RequiredArgsConstructor
public final class MessageWrapper<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 消息发送 Keys*/@NonNullprivate String keys;/*** 消息体*/@NonNullprivate T message;/*** 消息发送时间*/private Long timestamp = System.currentTimeMillis();
}

4.2 定义抽象消息发送类

将消息发送的逻辑和结果日志的打印进行了抽象,也就是抽象方法模式中的复用性。并且,将消息发送事件的基本参数(如 Topic、Tag、是否延迟消息等)以及 Keys 的个性化属性独立为两个抽象方法。

java">@RequiredArgsConstructor
@Slf4j(topic = "CommonSendProduceTemplate")
public abstract class AbstractCommonSendProduceTemplate<T> {private final RocketMQTemplate rocketMQTemplate;/*** 构建消息发送事件基础扩充属性实体** @param messageSendEvent 消息发送事件* @return 扩充属性实体*/protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);/*** 构建消息基本参数,请求头、Keys...** @param messageSendEvent 消息发送事件* @param requestParam     扩充属性实体* @return 消息基本参数*/protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);/*** 消息事件通用发送** @param messageSendEvent 消息发送事件* @return 消息发送返回结果*/public SendResult sendMessage(T messageSendEvent) {BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent);SendResult sendResult;try {// 构建 Topic 目标落点 formats: `topicName:tags`StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic());if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())) {destinationBuilder.append(":").append(baseSendExtendDTO.getTag());}// 延迟时间不为空,发送任意延迟消息,否则发送普通消息if (baseSendExtendDTO.getDelayTime() != null) {sendResult = rocketMQTemplate.syncSendDeliverTimeMills(destinationBuilder.toString(),buildMessage(messageSendEvent, baseSendExtendDTO),baseSendExtendDTO.getDelayTime());} else {sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),buildMessage(messageSendEvent, baseSendExtendDTO),baseSendExtendDTO.getSentTimeout());}log.info("[生产者] {} - 发送结果:{},消息ID:{},消息Keys:{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys());} catch (Throwable ex) {log.error("[生产者] {} - 消息发送失败,消息体:{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

4.3. 定义消息发送事件

将消息队列中的数据定义为事件

4.4定义消息队列生产者

消息队列生产者继承了我们的消息发送抽象类,并实现了两个抽象方法,从而体现了模板方法设计模式的扩展性。在业务代码中,只需引入消息发送生产者,即可通过简洁的一行代码完成消息发送流程。

java">@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CouponTaskExecuteEvent {/*** 推送任务id*/private Long couponTaskId;
}@Slf4j
@Component
public class CouponTaskActualExecuteProducer extends AbstractCommonSendProduceTemplate<CouponTaskExecuteEvent> {private final ConfigurableEnvironment environment;public CouponTaskActualExecuteProducer(@Autowired RocketMQTemplate rocketMQTemplate, @Autowired ConfigurableEnvironment environment) {super(rocketMQTemplate);this.environment = environment;}@Overrideprotected BaseSendExtendDTO buildBaseSendExtendParam(CouponTaskExecuteEvent messageSendEvent) {return BaseSendExtendDTO.builder().eventName("优惠券推送执行").keys(String.valueOf(messageSendEvent.getCouponTaskId())).topic(environment.resolvePlaceholders("你的主题")).sentTimeout(2000L).build();}@Overrideprotected Message<?> buildMessage(CouponTaskExecuteEvent couponTaskExecuteEvent, BaseSendExtendDTO requestParam) {String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();return MessageBuilder.withPayload(new MessageWrapper(keys, couponTaskExecuteEvent)).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()).build();}
}

http://www.ppmy.cn/devtools/153071.html

相关文章

Web入门

Spring 官网:spring.io Spring发展到今天已经形成了一种开发生态圈&#xff0c;Spring提供了若干个子项目&#xff0c;每个项目用于完成特定的功能 Spring Boot 可以帮助我们非常快速的构建应用程序、简化开发、提高效率 SpringBootWeb入门 ①.创建springboot工程&#xff0…

Solr与Elasticsearch 的对比与选型

在现代应用中&#xff0c;搜索引擎扮演着至关重要的角色。Apache Solr 和 Elasticsearch 是当前最流行的两个开源搜索引擎&#xff0c;它们各有优缺点&#xff0c;适用于不同的使用场景。本文将对这两者进行详细比较&#xff0c;并提供选型建议。 1. 概述 1.1 Apache Solr A…

摄影交流平台项目Uniapp+Springboot已完成

后端项目结构 前端项目结构 前端效果

Spring04 - filter和interceptor

filter和interceptor 文章目录 filter和interceptor一&#xff1a;舶来品和原住民1&#xff1a;Filter&#xff1a;1.1&#xff1a;基本概念1.2&#xff1a;Spring Boot中的使用 2&#xff1a;Interceptor&#xff1a;2.1&#xff1a;基本概念2.2&#xff1a;Spring Boot中的使…

vue3+elementPlus之后台管理系统(从0到1)(day3-管理员管理)

管理员管理 搭建管理员页面 在views中创建一个manager文件夹&#xff0c;并创建ManagerIndexView.vue、MangagerListView.vue、UserList.vue <!-- src/views/manager/ManagerIndexView.vue --> <template><!-- 作为一个占位符&#xff0c;用于渲染与当前 URL…

Scrapy中间件的使用

使用 Scrapy 中间件爬取网易新闻四大板块数据 在爬取动态加载网页数据时&#xff0c;我们经常需要结合 Scrapy 的强大爬虫框架和自动化工具的功能&#xff0c;来获取完整的页面数据。本文将以 网易新闻四大板块&#xff08;国内、国际、军事、航空&#xff09;数据爬取 为例&a…

Web安全:缓存欺骗攻击;基于缓存、CDN的新型Web漏洞

基于缓存、CDN的新型Web漏洞 漏洞原理利用方式解决方法 Web缓存欺骗漏洞&#xff08;Web Cache Deception&#xff09;是一种利用不安全的缓存机制来泄露用户敏感信息的攻击方式。攻击者通过操控请求URL诱导缓存系统将敏感信息缓存并对其他用户公开&#xff0c;可能导致用户数据…

Spring Boot整合WebSocket

目录 ?引言 1.WebSocket 基础知识 ?1.1 什么是 WebSocket&#xff1f; ?1.2 WebSocket 的应用场景 ?2.Spring Boot WebSocket 整合步骤 2.1 创建 Spring Boot 项目 2.2 添加 Maven 依赖 2.3 配置 WebSocket 2.4 创建 WebSocket 控制器 2.5 创建前端页面 引言 在…