Springboot Rabbitmq + 线程池技术控制指定数量task执行

news/2025/1/11 18:21:49/

定义DataSyncTaskManager,作为线程池任务控制器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;@Slf4j
public class DataSyncTaskManager {// 线程数private static final Integer threadNum = 5;private static DataSyncTaskManager taskManager = null;private static BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();private ThreadPoolExecutor taskExecutorPool;private DataSyncTaskManager() {taskExecutorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);}/*** 构建唯一Manager对象单例** @return*/public static synchronized DataSyncTaskManager getManager() {if (null == taskManager) {taskManager = new DataSyncTaskManager();}return taskManager;}/*** 提交需要运行的任务** @param task*/public void submitTask(DataSyncTask task) {taskQueue.add(task);log.info("[DataSyncTaskManager] submitTask size={}", taskQueue.size());}public void runTaskDaemon() {log.info("[DataSyncTaskManager] runTaskDaemon start.");Thread thread = new Thread(() -> {while (true) {try {Runnable task = taskQueue.take();taskExecutorPool.submit(task);// log.info("[DataSyncTaskManager] runTaskDaemon submit task={}", task);Thread.sleep(3000);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("[startTaskRunningDaemon] task run InterruptedException", e);} catch (Exception e) {log.error("[startTaskRunningDaemon] task run Exception", e);}}});thread.setName(this.getClass().getSimpleName());thread.start();}
}

定义DataSyncTask,作为具体任务执行方

package org.demo.scheduletest.service;import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;@Data
@Slf4j
public class DataSyncTask implements Runnable {private String name;private Channel channel;private long deliveryTag;public DataSyncTask(String name, Channel channel, long deliveryTag) {this.name = name;this.channel = channel;this.deliveryTag = deliveryTag;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {log.info("[DataSyncTask] run task start, name = {}", name);try {Thread.sleep(30000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {channel.basicAck(deliveryTag, true);log.info("[DataSyncTask] run task end, name = {}", name);} catch (IOException e) {throw new RuntimeException(e);}}}

InitTask,服务启动执行Task管理器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;/*** @author zhe.xiao* @version 1.0* @since 2025/1/9 上午11:38*/
@Slf4j
@Component
public class InitTask implements ApplicationRunner {/*** Callback used to run the bean.** @param args incoming application arguments* @throws Exception on error*/@Overridepublic void run(ApplicationArguments args) throws Exception {DataSyncTaskManager.getManager().runTaskDaemon();}
}

配置Rabbitmq

package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;/*** @author zhe.xiao* @date 2022-07-06 17:27* @description**/
@SpringBootConfiguration
public class MyRabbitTemplateConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host:/}")private String virtualhost;/*** 连接工厂* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualhost);//connectionFactory.setPublisherConfirms(true);return connectionFactory;}/**** @return RabbitTemplate*/@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}
}
package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author zhe.xiao* @date 2022-07-05 14:55* @description**/
@Configuration
public class MyRabbitExecutor {//正常队列public static final String QUEUE_1 = "my:queue:1";public static final String EXCHANGE_1 = "my:exchange:1";public static final String ROUTEING_1 = "data:route:1";//死信队列public static final String QUEUE_DEAD_LETTER = "my:queue:deadLetter";public static final String EXCHANGE_DEAD_LETTER = "my:exchange:deadLetter";public static final String ROUTING_DEAD_LETTER = "data:route:deadLetter";// 提供 Queue@BeanQueue myQueue1(){HashMap<String, Object> args = new HashMap<>();//绑定死信队列信息args.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER);args.put("x-dead-letter-routing-key", ROUTING_DEAD_LETTER);//        args.put("x-max-length", 5); //队列最大长度,超过了会进入死信队列
//         args.put("x-message-ttl", 5000); //如果5秒没被消费,则进入死信队列return new Queue(QUEUE_1, true, false, false, args);}// 提供 Exchange@BeanDirectExchange myExchange1(){return new DirectExchange(EXCHANGE_1, true, false);}// 创建一个Binding对象,将Exchange和Queue绑定在一起@BeanBinding myBinding1(){return BindingBuilder.bind(myQueue1()).to(myExchange1()).with(ROUTEING_1);// return BindingBuilder.bind(myQueue1()).to(myExchange1());}// 死信队列配置 QUEUE, EXCHANGE, BINDING@BeanQueue myQueueDeadLetter(){return new Queue(QUEUE_DEAD_LETTER, true, false, false);}@BeanDirectExchange myExchangeDeadLetter(){return new DirectExchange(EXCHANGE_DEAD_LETTER, true, false);}@BeanBinding myBindingDeadLetter(){return BindingBuilder.bind(myQueueDeadLetter()).to(myExchangeDeadLetter()).with(ROUTING_DEAD_LETTER);}
}

Rabbitmq消费者通过task控制器提交执行任务

package org.demo.scheduletest.rabbitmq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.demo.scheduletest.service.DataSyncTask;
import org.demo.scheduletest.service.DataSyncTaskManager;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** 消费QUEUE** @author zhe.xiao* @date 2022-07-05 14:57* @description**/
@Slf4j
@Component
public class MyReceiver {@RabbitListener(queues = MyRabbitExecutor.QUEUE_1)public void handler1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {log.info("handler1 process: {}", data);DataSyncTask dataSyncTask = new DataSyncTask(data, channel, deliveryTag);DataSyncTaskManager.getManager().submitTask(dataSyncTask);} catch (Exception e) {log.error(e.getMessage());}}
}

http://www.ppmy.cn/news/1562301.html

相关文章

《探秘鸿蒙NEXT中的人工智能核心架构》

在当今科技飞速发展的时代&#xff0c;华为HarmonyOS NEXT的发布无疑是操作系统领域的一颗重磅炸弹&#xff0c;其将人工智能与操作系统深度融合&#xff0c;开启了智能新时代。那么&#xff0c;鸿蒙NEXT中人工智能的核心架构究竟是怎样的呢&#xff1f;让我们一同探秘。 基础…

XS5037C一款应用于专业安防摄像机的图像信号处理芯片,支持MIPI和 DVP 接口,内置高性能ISP处理器,支持3D降噪和数字宽动态

XS5037C是一款应用于专业安防摄像机的图像信号处理芯片&#xff0c;支持MIPI和 DVP 接口&#xff0c;最 大支持 5M sensor接入。内置高性能ISP处理器&#xff0c;支持3D降噪和数字宽动态。标清模拟输出支 持960H&#xff0c;高清模拟输出支持HDCCTV 720P/1080P/4M/5M。高度集成…

Filebeat es

es kibana 内网地址 127.0.0.1:9200 https://vpcep-7c16b185-4d03-475c-bf9b-c38cde8d02c0.test.huaweicloud.com:9200 账户 admin 密码 admin #端口 9200 eskibana https://127.0.0.1:5601/app/login?nextUrl%2F 账户 admin 密码 admin docker 构建容器启动 docker syste…

Java聊天小程序

拟设计一个基于 Java 技术的局域网在线聊天系统,实现客户端与服务器之间的实时通信。系统分为客户端和服务器端两类,客户端用于发送和接收消息,服务器端负责接收客户端请求并处理消息。客户端通过图形界面提供用户友好的操作界面,服务器端监听多个客户端的连接并管理消息通…

【蓝桥杯比赛-C++组-经典题目汇总】

1. 最短路 题目描述&#xff1a; 如下图所示&#xff0c;G是一个无向图&#xff0c;其中蓝色边的长度是1、橘色边的长度是2、绿色边的长度是3。 则从 A 到 S 的最短距离是多少&#xff1f; #include <iostream> #include <cstring> using namespace std; const i…

[免费]微信小程序(高校就业)招聘系统(Springboot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序(高校就业)招聘系统(Springboot后端Vue管理端)&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序(高校就业)招聘系统(Springboot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项目介绍…

cJson——序列化格式json和protobuf对比

cJson——序列化格式json和protobuf对比 1. 更小的消息体积2. 更快的序列化与反序列化速度3. 类型安全4. 向后和向前兼容性5. 更低的带宽消耗6. 高效的编码方式7. 易于跨语言支持8. 支持复杂的数据结构9. 更好的支持大型数据交换总结 Protocol Buffers (Protobuf) 和 JSON 都是…

HTML 迷宫游戏

HTML 迷宫游戏 相关资源文件已经打包成压缩文件&#xff0c;可双击index.html直接运行程序&#xff0c;且文章末尾已附上相关源码&#xff0c;以供大家学习交流&#xff0c;博主主页还有更多Python相关程序案例&#xff0c;秉着开源精神的想法&#xff0c;望大家喜欢&#xff0…