Springboot Rabbitmq + 线程池技术控制指定数量task执行

server/2025/1/11 11:35:14/

定义DataSyncTaskManager,作为线程池任务控制器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;@Slf4j
public class DataSyncTaskManager {// 线程数private static final Integer threadNum = 5;private static DataSyncTaskManager taskManager = null;private static BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();private ThreadPoolExecutor taskExecutorPool;private DataSyncTaskManager() {taskExecutorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);}/*** 构建唯一Manager对象单例** @return*/public static synchronized DataSyncTaskManager getManager() {if (null == taskManager) {taskManager = new DataSyncTaskManager();}return taskManager;}/*** 提交需要运行的任务** @param task*/public void submitTask(DataSyncTask task) {taskQueue.add(task);log.info("[DataSyncTaskManager] submitTask size={}", taskQueue.size());}public void runTaskDaemon() {log.info("[DataSyncTaskManager] runTaskDaemon start.");Thread thread = new Thread(() -> {while (true) {try {Runnable task = taskQueue.take();taskExecutorPool.submit(task);// log.info("[DataSyncTaskManager] runTaskDaemon submit task={}", task);Thread.sleep(3000);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("[startTaskRunningDaemon] task run InterruptedException", e);} catch (Exception e) {log.error("[startTaskRunningDaemon] task run Exception", e);}}});thread.setName(this.getClass().getSimpleName());thread.start();}
}

定义DataSyncTask,作为具体任务执行方

package org.demo.scheduletest.service;import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;@Data
@Slf4j
public class DataSyncTask implements Runnable {private String name;private Channel channel;private long deliveryTag;public DataSyncTask(String name, Channel channel, long deliveryTag) {this.name = name;this.channel = channel;this.deliveryTag = deliveryTag;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {log.info("[DataSyncTask] run task start, name = {}", name);try {Thread.sleep(30000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {channel.basicAck(deliveryTag, true);log.info("[DataSyncTask] run task end, name = {}", name);} catch (IOException e) {throw new RuntimeException(e);}}}

InitTask,服务启动执行Task管理器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;/*** @author zhe.xiao* @version 1.0* @since 2025/1/9 上午11:38*/
@Slf4j
@Component
public class InitTask implements ApplicationRunner {/*** Callback used to run the bean.** @param args incoming application arguments* @throws Exception on error*/@Overridepublic void run(ApplicationArguments args) throws Exception {DataSyncTaskManager.getManager().runTaskDaemon();}
}

配置Rabbitmq

package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;/*** @author zhe.xiao* @date 2022-07-06 17:27* @description**/
@SpringBootConfiguration
public class MyRabbitTemplateConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host:/}")private String virtualhost;/*** 连接工厂* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualhost);//connectionFactory.setPublisherConfirms(true);return connectionFactory;}/**** @return RabbitTemplate*/@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}
}
package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author zhe.xiao* @date 2022-07-05 14:55* @description**/
@Configuration
public class MyRabbitExecutor {//正常队列public static final String QUEUE_1 = "my:queue:1";public static final String EXCHANGE_1 = "my:exchange:1";public static final String ROUTEING_1 = "data:route:1";//死信队列public static final String QUEUE_DEAD_LETTER = "my:queue:deadLetter";public static final String EXCHANGE_DEAD_LETTER = "my:exchange:deadLetter";public static final String ROUTING_DEAD_LETTER = "data:route:deadLetter";// 提供 Queue@BeanQueue myQueue1(){HashMap<String, Object> args = new HashMap<>();//绑定死信队列信息args.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER);args.put("x-dead-letter-routing-key", ROUTING_DEAD_LETTER);//        args.put("x-max-length", 5); //队列最大长度,超过了会进入死信队列
//         args.put("x-message-ttl", 5000); //如果5秒没被消费,则进入死信队列return new Queue(QUEUE_1, true, false, false, args);}// 提供 Exchange@BeanDirectExchange myExchange1(){return new DirectExchange(EXCHANGE_1, true, false);}// 创建一个Binding对象,将Exchange和Queue绑定在一起@BeanBinding myBinding1(){return BindingBuilder.bind(myQueue1()).to(myExchange1()).with(ROUTEING_1);// return BindingBuilder.bind(myQueue1()).to(myExchange1());}// 死信队列配置 QUEUE, EXCHANGE, BINDING@BeanQueue myQueueDeadLetter(){return new Queue(QUEUE_DEAD_LETTER, true, false, false);}@BeanDirectExchange myExchangeDeadLetter(){return new DirectExchange(EXCHANGE_DEAD_LETTER, true, false);}@BeanBinding myBindingDeadLetter(){return BindingBuilder.bind(myQueueDeadLetter()).to(myExchangeDeadLetter()).with(ROUTING_DEAD_LETTER);}
}

Rabbitmq消费者通过task控制器提交执行任务

package org.demo.scheduletest.rabbitmq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.demo.scheduletest.service.DataSyncTask;
import org.demo.scheduletest.service.DataSyncTaskManager;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** 消费QUEUE** @author zhe.xiao* @date 2022-07-05 14:57* @description**/
@Slf4j
@Component
public class MyReceiver {@RabbitListener(queues = MyRabbitExecutor.QUEUE_1)public void handler1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {log.info("handler1 process: {}", data);DataSyncTask dataSyncTask = new DataSyncTask(data, channel, deliveryTag);DataSyncTaskManager.getManager().submitTask(dataSyncTask);} catch (Exception e) {log.error(e.getMessage());}}
}

http://www.ppmy.cn/server/157450.html

相关文章

【Linux】Linux开发:GDB调试器与Git版本控制工具指南

Linux相关知识点可以通过点击以下链接进行学习一起加油&#xff01;初识指令指令进阶权限管理yum包管理与vim编辑器GCC/G编译器make与Makefile自动化构建 在 Linux 开发中&#xff0c;GDB 调试器和 Git 版本控制工具是开发者必备的利器。GDB 帮助快速定位代码问题&#xff0c;G…

【学习笔记】数据结构(十二)

文件 文章目录 文件12.1 有关文件的基本概念12.2 顺序文件12.3 索引文件12.4 ISAM文件和VSAM文件12.4.1 ISAM文件12.4.2 VSAM文件 12.5 直接存取文件(散列文件)12.6 多关键字文件12.6.1 多重表文件12.6.2 倒排文件 12.1 有关文件的基本概念 文件(file) 是由大量性质相同的记录…

Ubuntu挂载Windows 磁盘,双系统

首先我们需要在终端输入这个命令&#xff0c;来查看磁盘分配情况 lsblk -f 找到需要挂载的磁盘&#xff0c;检查其类型&#xff08; 我的/dev/nvme2n1p1类型是ntfs&#xff0c;名字叫3500winData&#xff09; 然后新建一个挂载磁盘的目录&#xff0c;我的是/media/zeqi/3500wi…

了解SQL

目录 1、数据库基础 &#xff08;1&#xff09;数据库&#xff08;database&#xff09; &#xff08;2&#xff09;表&#xff08;table&#xff09;与模式&#xff08;schema&#xff09; &#xff08;3&#xff09;列和数据类型 &#xff08;4&#xff09;行 &#xff…

Win10微调大语言模型ChatGLM2-6B

在《Win10本地部署大语言模型ChatGLM2-6B-CSDN博客》基础上进行&#xff0c;官方文档在这里&#xff0c;参考了这篇文章 首先确保ChatGLM2-6B下的有ptuning AdvertiseGen下载地址1&#xff0c;地址2&#xff0c;文件中数据留几行 模型文件下载地址 &#xff08;注意&#xff1…

计算机网络期末复习(知识点)

概念题 在实际复习之前&#xff0c;可以看一下这个视频将网络知识串一下&#xff0c;以便更好地复习&#xff1a;【你管这破玩意叫网络&#xff1f;】 网络规模的分类 PAN&#xff08;个人区域网络&#xff09;&#xff1a;用于个人设备间的连接&#xff0c;如手机与蓝牙耳机…

redis的监控

1)查看key >keys * #查看所有的key,数据量大的时候容易阻塞 >scan 0 #每次获取11个key&#xff0c;直到获取所有的key,编号从0开始&#xff0c;下一次获取时&#xff0c;按照提示的编号输入scan后的值 当编号再次为0时&#xff0c;获取完成所有…

moviepy 将mp4视频文件提取音频mp3 - python 实现

DataBall 助力快速掌握数据集的信息和使用方式&#xff0c;会员享有 百种数据集&#xff0c;持续增加中。 需要更多数据资源和技术解决方案&#xff0c;知识星球&#xff1a; “DataBall - X 数据球(free)” -------------------------------------------------------------…