今日指数项目项目集成RabbitMQ与CaffienCatch

news/2024/10/9 4:34:50/

今日指数项目项目集成RabbitMQ与CaffienCatch

一. 为什么要集成RabbitMQ

首先CaffeineCatch 是作为一个本地缓存工具 使用CaffeineCatch 能够大大较少I/O开销

股票项目 主要分为两大工程 --> job工程(负责数据采集) , backend(负责业务处理)

由于股票的实时性也就是说 , 对于股票来说像大盘数据 , 个股数据等都是每分钟进行更新的

而使用传统的采集以及业务处理方式 , 也就是说 数据采集后将数据保存到数据库中 , 然后客户从数据库中反复获取数据

当用户数量增多 , 数据库的I/O开销也会随之增大 , 会导致时效性的降低

所以这里我采用MQ加CaffeineCatch , 在job工程中采集数据后 写入数据库 , 同时通过MQ发送消息给backend工程 重新加载缓存

将数据库中的数据读取到CaffeineCatch 中

在这里插入图片描述

二. job工程代码实现

1. 导入mq依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2. 定义配置文件

spring:rabbitmq:host: 114.116.244.165 # rabbitMQ的ip地址port: 5672 # 端口username: jixupassword: 123321virtual-host: /

3. 编写服务端代码

package com.jixu.stock.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {// 定义大盘消息序列化方式@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}// 定义主题交换机@Beanpublic TopicExchange topicExchange(){return new TopicExchange("stockExchange",true,false);}// 定义大盘队列@Beanpublic Queue stockMarketQueue(){return new Queue("marketQueue",true);}@Bean// 绑定大盘信息public Binding bindingStockeMarket(){// with( Routingkey 参数 --> 匹配的队列名称 )return BindingBuilder.bind(stockMarketQueue()).to(topicExchange()).with("inner.market");}
}

4. 定义客户端

package com.jixu.stock.config;import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Date;@Configuration
@Slf4j
public class MqConfig {// 定义大盘消息序列化方式@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}// 客户端接受信息@RabbitListener(queues = "marketQueue")public void stockMarketListener(Date date){long diffTime= DateTime.now().getMillis()-new DateTime(date).getMillis();//超过一分钟告警if (diffTime>60000) {log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),diffTime);}}
}

3. 修改业务层代码

在数据插入成功后发送消息给MQ

log.info("当前时间点{} , 数据插入成功", DateTime.now().toString("yyyy-MM-dd HH-mm-ss"));
rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());

三. backend工程代码实现

首先在实现业务逻辑之前需要导入相关依赖 , 以及配置MQ和CaffineCache

1. 配置MQ配置类

package com.jixu.stock.config;import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Date;@Configuration
@Slf4j
public class MqConfig {// 定义大盘消息序列化方式@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}

2. 配置CaffineCache配置类

 	/*** 配置CaffienCatch*/@Beanpublic Cache<String,Object> caffeineCache(){Cache<String, Object> cache = Caffeine.newBuilder().maximumSize(200)//设置缓存数量上限
//                .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
//                .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除.initialCapacity(100)// 初始的缓存空间大小.recordStats()//开启统计.build();return cache;}

3. 创建客户端类接收信息

package com.jixu.stock.mq;import com.jixu.stock.service.StockService;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.github.benmanes.caffeine.cache.Cache;
import java.util.Date;/*** @program: stock_parent* @description:* @author: jixu* @create: 2024-10-01 12:45**/
@Component
@Slf4j
public class StockMarketMQ {@Autowiredprivate Cache caffeineCache;@Autowiredprivate StockService service;// 客户端接受信息@RabbitListener(queues = "marketQueue")public void stockMarketListener(Date date){long diffTime= DateTime.now().getMillis()-new DateTime(date).getMillis();//超过一分钟告警if (diffTime>60000) {log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),diffTime);}}
}

在信息接受之后需要对业务层代码进行修改 --> 实现CaffineCache缓存

这里我们使用CaffineCache.get的方法 , 其中会传入两个参数 , 分别是要从CaffineCache中查询的数据的key ,以及如果key不存在使用的补救方法(从数据库中查询)

4. 完善业务代码

/*** 实现股票大盘数据查询* @return*/@Overridepublic R<ArrayList<InnerMarketDomain>> getInnerMarketDomain() {R<ArrayList<InnerMarketDomain>> msg = (R<ArrayList<InnerMarketDomain>>) caffeineCache.get("stockMarketMsg" , key -> {// 1. 获取最新时间数据Date curTime = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();// 创建mock数据curTime = DateTime.parse("2022-01-02 09:32:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();// 2. 获取股票代码ArrayList<String> marketInfo = stockInfoConfig.getInner();// 3. dao层查询数据ArrayList<InnerMarketDomain> data = stockMarketIndexInfoMapper.getMarketInfo(curTime , marketInfo);return R.ok(data);});return msg;}

5. 完善StockMarketMQ类刷新数据

// 清除caffeineCache中的缓存
caffeineCache.invalidate("stockMarketMsg");
// 调用service重新获取
service.getInnerMarketDomain();

http://www.ppmy.cn/news/1536462.html

相关文章

Valhalla实现 -Docker部署利用OSM(Mapbox)地图实现路径规划可视化

一. Valhalla基本概念 1. 背景介绍&#xff1a; 官网介绍文档&#xff1a;https://valhalla.github.io/valhalla/ Valhalla是一个开源的路由引擎&#xff0c;能够实现实时路径规划&#xff0c;处理大量请求返回最优路径。 基于 OSM 数据&#xff0c;结合灵活的多模式交通方式…

Spring Boot:打造下一代医院管理系统

3系统分析 3.1可行性分析 通过对本医院管理系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本医院管理系统采用JAVA作为开发语言&#xff0c;Spring Boot框…

Pikachu-url重定向-不安全的url跳转

不安全的url跳转 不安全的url跳转问题可能发生在一切执行了url地址跳转的地方。如果后端采用了前端传进来的(可能是用户传参,或者之前预埋在前端页面的url地址)参数作为了跳转的目的地,而又没有做判断的话就可能发生"跳错对象"的问题。 url跳转比较直接的危害是: …

Apache Flink 和 Apache Kafka

Apache Flink 和 Apache Kafka 都是大数据生态系统中非常重要的工具&#xff0c;但它们的作用和应用场景有所不同。下面将分别介绍两者的主要特性和它们之间的异同点。 Apache Kafka 作用&#xff1a; 消息队列&#xff1a;Kafka 主要作为消息队列使用&#xff0c;用于解耦生…

gin如何具体利用Server-Send-Events(SSE)实时推送技术实现消息推送

目录 业务场景 解决方案 1. 轮询 2. WebSocket 3. SSE(Server-Send-Events) 代码实现 总结 业务场景 在抖音、美团等APP中&#xff0c;我们经常会遇到APP内部的消息推送&#xff0c;如关注的人的动态消息推送、点赞评论互动消息推送以及算法推荐消息推送。这些场景都是…

刷题 二叉树

二叉树的核心思想 - 递归 - 将问题分解为子问题 题型 递归遍历迭代遍历层序遍历 bfs&#xff1a;队列各种递归题目&#xff1a;将问题分解为子问题二叉搜索树 - 中序遍历是递增序列 TreeNode* &prev 指针树形dp 面试经典 150 题 - 二叉树 104. 二叉树的最大深度 广度优…

传感器模块编程实践(二)W5500 SPI转以太网模块简介及驱动源码

文章目录 一.概要二.W5500芯片介绍W5500通讯协议介绍 三.W5500模块介绍四.W5500模块原理图五.W5500以太网模通讯实验六.CubeMX工程源代码下载七.小结 一.概要 我们介绍过单片机的以太网系统一般是由&#xff1a;单片机MACPHYRJ45。有些单片机比如STM32F407VET6芯片内部自带MAC…

十二、血条UI

一、制作血条UI 注&#xff1a;一般不用Slider制作血条&#xff1b;而是用两个Image制作&#xff0c;选择为填充 使用Slider滑动条制作UI 人物血条&#xff1a;背景深绿色&#xff1b;滑条浅绿色 在场景中的画布选择为OverLay 敌人血条&#xff1a; 在预制体里面制作&#x…