OJ在线评测系统 微服务 用分布式消息队列 RabbitMQ 解耦判题服务和题目服务 手搓交换机和队列 实现项目异步化

news/2024/10/7 23:59:03/

消息队列解耦 项目异步化

分布式消息队列

分布式消息队列是一种用于异步通信的系统,它允许不同的应用程序或服务之间传递消息。消息队列的核心理念是将消息存储在一个队列中,发送方可以将消息发送到队列,而接收方则可以在适当的时候从队列中读取消息。这种机制有助于解耦应用程序,提高系统的可扩展性和可靠性。

主要特点:

  1. 异步通信:发送方和接收方可以在不同的时间工作,不必直接交互。

  2. 负载均衡:通过将消息分发到多个消费者,可以有效利用系统资源。

  3. 消息持久化:许多消息队列系统支持将消息存储在磁盘上,以防数据丢失。

  4. 顺序处理:某些队列支持按顺序处理消息,确保消息的处理顺序。

  5. 容错性分布式架构增强了系统的容错能力,可以在部分组件故障时继续工作。

常见的分布式消息队列系统:

  • Apache Kafka:高吞吐量、可扩展的消息队列,常用于大数据处理。

  • RabbitMQ:支持多种消息协议,易于使用,适合复杂的路由场景。

  • ActiveMQ:功能丰富,支持多种编程语言和消息协议。

分布式消息队列在微服务架构、事件驱动架构等场景中广泛应用,可以有效提高系统的灵活性和可维护性。

我们就用RabbitMQ去改造项目 解耦判题服务 题目服务

题目服务只需要向消息队列中发信息

判题服务从消息队列中取消息去执行判题

然后异步更新数据库即可

我们的题目服务和判题服务需要引入rabbitMQ

先引入消息队列的Java客户端

先引入依赖 amqp的客户端

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置一下

  rabbitmq:host: localhostport: 5672password: guestusername: guest

我们要创建交换机和队列

先启动生产者 的消息队列

package com.yupi.yuojbackendjudgeservice.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;/*** 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)*/
@Slf4j
public class InitRabbitMq {public static void doInit() {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();// 创建交换机 用于发 收信息Channel channel = connection.createChannel();String EXCHANGE_NAME = "code_exchange";channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建队列 随机分配一个队列名称String queueName = "code_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "my_routingKey");log.info("消息队列启动成功");} catch (Exception e) {log.error("消息队列启动失败");}}public static void main(String[] args) {doInit();}
}

在启动类里面可以看见

package com.yupi.yuojbackendjudgeservice;import com.yupi.yuojbackendjudgeservice.rabbitmq.InitRabbitMq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling
@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
@ComponentScan("com.yupi")
@EnableDiscoveryClient
@EnableFeignClients(basePackages = {"com.yupi.yuojbackendserviceclient.service"})
public class YuojBackendJudgeServiceApplication {public static void main(String[] args) {// 初始化消息队列,先注释掉,改用 Bean 的方式初始化消息队列(InitRabbitMqBean.java)
//        InitRabbitMq.doInit();SpringApplication.run(YuojBackendJudgeServiceApplication.class, args);}}

接下来要把生产者的消息扔到交换机里面

package com.yupi.yuojbackendquestionservice.rabbitmq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class MyMessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送消息* @param exchange* @param routingKey* @param message*/public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);}}

写一个接收消息的代码

package com.yupi.yuojbackendjudgeservice.rabbitmq;import com.rabbitmq.client.Channel;
import com.yupi.yuojbackendjudgeservice.judge.JudgeService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class MyMessageConsumer {@Resourceprivate JudgeService judgeService;// 指定程序监听的消息队列和确认机制@SneakyThrows@RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {log.info("receiveMessage message = {}", message);long questionSubmitId = Long.parseLong(message);try {judgeService.doJudge(questionSubmitId);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}}

要传递的消息

是什么

传递的数据


http://www.ppmy.cn/news/1535916.html

相关文章

git使用“保姆级”教程4——版本回退及分支讲解

一、版本回退 1、历史回退(版本回退)——命令行git reset --hard 版本编号 注意&#xff1a;当前命令会让工作区的内容发生改变&#xff0c;可以理解成历史区(master分支)直接回到工作区比如&#xff1a;从版本4回到版本3&#xff0c;则工作区只会显示版本3的代码内容 1.1、指…

【Windows】在任务管理器中隐藏进程

在此前的一篇&#xff0c;我们已经介绍过了注入Dll 阻止任务管理器结束进程 -- Win 10/11。本篇利用 hook NtQuerySystemInformation 并进行断链的方法实现进程隐身&#xff0c;实测支持 taskmgr.exe 的任意多进程隐身。 任务管理器 代码&#xff1a; // dllmain.cpp : 定义 …

Hive数仓操作(十四)

一、Hive的DDL语句 在 Hive 中&#xff0c;DDL&#xff08;数据定义语言&#xff09;语句用于数据库和表的创建、修改、删除等操作。以下是一些重要的 DDL 语句&#xff1a; 1. 创建数据库和表 创建数据库 CREATE DATABASE IF NOT EXISTS database_name;创建表 CREATE TABLE …

EPC User Manual Introduction

Overview 您提供的链接是指向srsRAN 4G项目的官方文档&#xff0c;具体是关于srsEPC的介绍部分。以下是该页面的核心内容概要&#xff1a; ### 概述 srsEPC是一个轻量级的完整LTE核心网络&#xff08;EPC&#xff09;实现。srsEPC应用程序作为一个单一的二进制文件运行&#…

胡超:引领中美能源与文化合作的创意先锋

中美能源合作领域迎来了一个重要的历史时刻,2024年中美可持续发展峰会(Sino-American Symposium on Sustainable Development)在全球关注下圆满落幕。这场峰会不仅成为了中美两国绿色能源合作的高端平台,也展示了作为该活动的协办方RES(Reverse Energy Solutions)在清洁能源领域…

什么是汽车中的SDK?

无论是在家里使用预制菜包做一顿大厨级别的晚餐&#xff0c;还是使用IKEA套组装配出时尚的北欧风桌子&#xff0c;我们都熟悉这样一种概念&#xff1a;比起完全从零开始&#xff0c;使用工具包可以帮助我们更快、更高效地完成一件事。 在速度至关重要的商业软件领域&#xff0…

基于 STM32F407 的 SPI Flash下载算法

目录 一、概述二、自制 FLM 文件1、修改使用的芯片2、修改输出算法的名称3、其它设置4、修改配置文件 FlashDev.c5、文件 FlashPrg.c 的实现 三、验证算法 一、概述 本文将介绍如何使用 MDK 创建 STM32F407 的 SPI Flash 下载算法。 其中&#xff0c;SPI Flash 芯片使用的是 W…

【CSS in Depth 2 精译_043】6.5 CSS 中的粘性定位技术 + 本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一章 层叠、优先级与继承&#xff08;已完结&#xff09;第二章 相对单位&#xff08;已完结&#xff09;第三章 文档流与盒模型&#xff08;已完结&#xff09;第四章 Flexbox 布局&#xff08;已…