java 中多线程、 队列使用实例,处理大数据业务

devtools/2025/1/23 19:27:43/

场景: 从redis 订阅数据 调用线程来异步处理数据

直接上代码

定义线程管理类
java">import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.*;/*** Created with IntelliJ IDEA.* @Description 线程池管理类*/
@Component
public class ThreadPoolManager implements BeanFactoryAware {private static Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);//用于从IOC里取对象private BeanFactory factory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下// 线程池维护线程的最少数量 (根据环境而定)private final static int CORE_POOL_SIZE = 10;// 线程池维护线程的最大数量 (根据环境而定)private final static int MAX_POOL_SIZE = 50;// 线程池维护线程所允许的空闲时间private final static int KEEP_ALIVE_TIME = 0;// 线程池所使用的缓冲队列大小 (此处队列设置 需要考虑处理数据的效率  内存的大小)private final static int WORK_QUEUE_SIZE = 99999;@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {factory = beanFactory;}// 消息队列public LinkedBlockingQueue<String> getMsgQueue() {return msgQueue;}LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();/*** 当线程池的容量满了,执行下面代码,将推送数据存入到缓冲队列*/final RejectedExecutionHandler handler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {String  temp = ((MsgHandleThread) r).getRecord();if (StringUtils.isEmpty(temp)) {msgQueue.offer(temp);}}};/*** 创建线程池*/final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);/*** 将任务加入线程池---执行数据处理*/public void addPushRecord(String  record) {MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}/*** 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。*/final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);/*** 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池*/final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//判断缓冲队列是否存在记录if (!msgQueue.isEmpty()) {//当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {String record = msgQueue.poll();MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}}}}, 0, 1, TimeUnit.SECONDS);/*** 终止订单线程池+调度线程池*/public void shutdown() {//true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止scheduledFuture.cancel(false);scheduler.shutdown();threadPool.shutdown();}
}
任务处理类
java">/*** Created with IntelliJ IDEA.* @Description 订阅数据 处理*/
@Component
@Scope("prototype")//spring 多例
public class MsgHandleThread implements Runnable {private Logger logger = LoggerFactory.getLogger(SubCheckDataThread.class);private IDataHandleService _serviceprivate String record;public SubCheckDataThread(String  _record) {this.record = _record;}public String getRecord() {return record;}@Overridepublic void run() {try {if (StringUtils.isEmpty(this.record)) {return;}// 无法注入是采用此方法if (_service== null) {_service= ApplicationContextProvider.getBean(IDataHandleService .class);}//TODO 具体业务logger.info("消费完成",record);} catch (Exception e) {e.printStackTrace();}}
}
调用
java">import com.yicheng.common.properties.SetProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;/*** <p>* 订阅redis消息* </p>** @Author: zhuYaqiang* @Date: 2024/06/12*/
@Component
public class SubscribeCheckData {@Autowiredprivate ThreadPoolManager threadPoolManager;/**** @Description:  查岗信息订阅---redis* @Param: [message]* @return: void* @Author: zhuYaqiang* @Date: 2024/06/12*/public void receiveMessage(String message) {try {threadPoolManager.addPushRecord(message);} catch (Exception e) {e.printStackTrace();}}}
redis 订阅消息后调用线程池处理数据
java">package com.yicheng.subscribeRedis;import com.yicheng.common.properties.SetProperties;
import com.yicheng.subscribeRedis.alarm.SubscribeAlarmNoticeData;
import com.yicheng.subscribeRedis.check.SubscribeCheckData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;/*** @title RedisSubscribeCHeck* @description* @create 2024/6/12 19:30*/
@Configuration
public class RedisMessageListener {@Autowiredprivate SetProperties setProperties;@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerCheckAdapter, MessageListenerAdapter listenerAlarmNoticeAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);listenerCheckAdapter.afterPropertiesSet();listenerAlarmNoticeAdapter.afterPropertiesSet();//订阅了的通道// 订阅查岗数据container.addMessageListener(listenerCheckAdapter, new PatternTopic(setProperties.getRedisCheckSub().getSubChannel()));//这个container 可以添加多个 messageListenerreturn container;}/*** 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法* 监听查岗消息* @param receiver* @return*/@BeanMessageListenerAdapter listenerCheckAdapter(SubscribeCheckData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}/*** 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法*   监听报警通知信息* @param receiver* @return*/@BeanMessageListenerAdapter listenerAlarmNoticeAdapter(SubscribeAlarmNoticeData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}}

以上代码已在实际项目中使用,觉得有用的点赞收藏评论


http://www.ppmy.cn/devtools/152951.html

相关文章

ElasticSearch 学习课程入门(一)

引子 前文已经介绍了windows下如何安装ES&#xff0c;接下来的文章我会边学习边记录。OK&#xff0c;那就让我们开始吧。 一、ES基础操作 1、预备知识 &#xff08;1&#xff09;RESTful REST 指的是一组架构约束条件和原则。满足这些约束条件和原则的应用程序或设计就是 …

Android系统开发(二十):字体活起来,安卓自定义字体改造指南

为什么要写这篇文章&#xff1f; 你是否厌倦了千篇一律的安卓默认字体&#xff1f;想让你的设备从“乏味的配角”变成“炫酷的主角”&#xff1f;好消息&#xff01;从Android 12到Android 15&#xff0c;自定义字体变得更简单、更强大。尤其是表情字体的更新&#xff0c;不仅…

蓝桥杯算法|基础笔记(1)

**时间复杂度** 一、概念理解 时间复杂度是用来衡量算法运行时间随输入规模增长而增长的量级。它主要关注的是当输入规模趋向于无穷大时&#xff0c;算法执行基本操作的次数的增长趋势&#xff0c;而不是精确的运行时间。 二、分析代码中的基本操作 确定关键操作 在一段代码…

如何安装linux版本的node.js

在 Linux 系统上安装 Node.js 可以通过多种方式。以下是一些常见的安装方法&#xff1a; 方法 1: 使用包管理器 Ubuntu / Debian 更新包信息&#xff1a; sudo apt update安装 Node.js 和 npm&#xff1a; sudo apt install nodejs npm验证安装&#xff1a; node -v npm -vCe…

小米Vela操作系统开源:AIoT时代的全新引擎

小米近日正式开源了其物联网嵌入式软件平台——Vela操作系统&#xff0c;并将其命名为OpenVela。这一举动在AIoT&#xff08;人工智能物联网&#xff09;领域掀起了不小的波澜&#xff0c;也为开发者们提供了一个强大的AI代码生成器和开发平台。OpenVela项目源代码已托管至GitH…

WinHttp API接口辅助类实现GET POST网络通讯

1、简述 近期需要在MFC基础上开发网络Http通讯,开始使用的WinINet进行通讯,后面发现WinINet对连接超时这块不支持设置,在网上搜索了几种方式效果都不太好,于是决定用WinHttp API接口进行通讯,分别对GET、POST进行了封装。 2、使用到接口 2.1、WinHttpOpen WinHttpOpen 是…

FPGA 开发工作需求明确:关键要点与实践方法

FPGA开发工作需求明确&#xff1a;关键要点与实践方法 一、需求明确的重要性 在FPGA开发领域&#xff0c;明确的需求是项目成功的基石。FPGA开发往往涉及复杂的硬件逻辑设计、高速信号处理以及与其他系统的协同工作。若需求不明确&#xff0c;可能导致开发过程中频繁变更设计…

C++ 学习:深入理解 Linux 系统中的冯诺依曼架构

一、引言 冯诺依曼架构是现代计算机系统的基础&#xff0c;它的提出为计算机的发展奠定了理论基础。在学习 C 和 Linux 系统时&#xff0c;理解冯诺依曼架构有助于我们更好地理解程序是如何在计算机中运行的&#xff0c;包括程序的存储、执行和资源管理。这对于编写高效、可靠的…