RabbitMQ深度探索:简单实现 MQ

embedded/2025/2/5 20:02:31/

基于多线程队列实现 MQ :

  1. 实现类:
    java">public class ThreadMQ {private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();public static void main(String[] args) {//创建生产者线程Thread producer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {Thread.sleep(1000);JSONObject data = new JSONObject();data.put("phone","11111111");broker.offer(data);}catch (Exception e){}}}},"生产者");producer.start();Thread consumer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {JSONObject data = broker.poll();if(data != null){System.out.println(Thread.currentThread().getName() + data.toJSONString());}}catch (Exception e){}}}},"消费者");consumer.start();}
    }

基于 netty 实现 MQ:

  1. 执行过程:
    1. 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
    2. 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
  2. 执行流程:
    1. 导入 Maven 依赖:
      <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version>
      </dependency>
      <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.0.23.Final</version>
      </dependency>
      <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version>
      </dependency>
      <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version>
      </dependency>
      <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version>
      </dependency>
    2. 服务端:
      java">package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import org.apache.commons.lang3.StringUtils;import java.io.UnsupportedEncodingException;
      import java.util.ArrayList;
      import java.util.concurrent.LinkedBlockingDeque;/*** @ClassName BoyatopMQServer2021* @Author* @Version V1.0**/
      public class BoyatopNettyMQServer {public void bind(int port) throws Exception {/*** Netty 抽象出两组线程池BossGroup和WorkerGroup* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。*/EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossGroup, workerGroup)// 设定NioServerSocketChannel 为服务器端.channel(NioServerSocketChannel.class)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。.option(ChannelOption.SO_BACKLOG, 100)// 服务器端监听数据回调Handler.childHandler(new BoyatopNettyMQServer.ChildChannelHandler());//绑定端口, 同步等待成功;ChannelFuture future = bootstrap.bind(port).sync();System.out.println("当前服务器端启动成功...");//等待服务端监听端口关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅关闭 线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 设置异步回调监听ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());}}public static void main(String[] args) throws Exception {int port = 9008;new BoyatopNettyMQServer().bind(port);}private static final String type_consumer = "consumer";private static final String type_producer = "producer";private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();// 生产者投递消息的:topicNamepublic class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {/*** 服务器接收客户端请求** @param ctx* @param data* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object data)throws Exception {//ByteBuf buf=(ByteBuf)data;//byte[] req = new byte[buf.readableBytes()];//buf.readBytes(req);//String body = new String(req, "UTF-8");//System.out.println("body:"+body);JSONObject clientMsg = getData(data);String type = clientMsg.getString("type");switch (type) {case type_producer:producer(clientMsg);break;case type_consumer:consumer(ctx);break;}}private void consumer(ChannelHandlerContext ctx) {// 保存消费者连接ctxs.add(ctx);// 主动拉取mq服务器端缓存中没有被消费的消息String data = msgs.poll();if (StringUtils.isEmpty(data)) {return;}// 将该消息发送给消费者byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}private void producer(JSONObject clientMsg) {// 缓存生产者投递 消息String msg = clientMsg.getString("msg");msgs.offer(msg); //保证消息不丢失还可以缓存硬盘//需要将该消息推送消费者ctxs.forEach((ctx) -> {// 将该消息发送给消费者String data = msgs.poll();if (data == null) {return;}byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);});}private JSONObject getData(Object data) throws UnsupportedEncodingException {ByteBuf buf = (ByteBuf) data;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");return JSONObject.parseObject(body);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}}
      }
    3. 生产端:
      java">package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/
      public class BoyatopNettyMQProducer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "producer");JSONObject msg = new JSONObject();msg.put("userId", "123456");msg.put("age", "23");data.put("msg", msg);// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
      }
    4. 客户端:
      java">package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/
      public class NettyMQConsumer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;NettyMQConsumer client = new NettyMQConsumer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "consumer");// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
      }
  3. 持久化机制:
    1. 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
    2. 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
  4. MQ 服务器将该消息推送给消费者:
    1. 消费者已经和 MQ 服务器保持长连接
    2. 消费者在第一次启动的时候会主动拉取信息
  5. MQ 如何实现高并发思想:
    1. MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
    2. 默认的情况下取出一条消息
  6. 缺点:
    1. 存在延迟问题
  7. 需要考虑 MQ 消费者提高速率的问题:
    1. 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可

http://www.ppmy.cn/embedded/159834.html

相关文章

python学opencv|读取图像(五十六)使用cv2.GaussianBlur()函数实现图像像素高斯滤波处理

【1】引言 前序学习了均值滤波和中值滤波&#xff0c;对图像的滤波处理有了基础认知&#xff0c;相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;五十四&#xff09;使用cv2.blur()函数实现图像像素均值处理-CSDN博客 python学opencv|读取图像&#xff08;…

PyTorch快速入门

Anaconda Anaconda 是一款面向科学计算的开源 Python 发行版本&#xff0c;它集成了众多科学计算所需的库、工具和环境管理系统&#xff0c;旨在简化包管理和部署&#xff0c;提升开发与研究效率。 核心组件&#xff1a; Conda&#xff1a;这是 Anaconda 自带的包和环境管理…

Block Blaster Online:免费解谜游戏的乐趣

Block Blaster Online 是一款免费的在线解谜游戏&#xff0c;它将挑战你的思维和反应能力&#xff01;在这里&#xff0c;你可以匹配五彩缤纷的方块&#xff0c;创造出令人惊叹的组合&#xff0c;享受无尽的解谜乐趣。无需安装&#xff0c;点击即可开始&#xff0c;加入全球数百…

数据结构——并查集

一、并查集原理 再一些应用问题中&#xff0c;需要将n个不同的元素划分成一些不相交的集合。开始时&#xff0c;每个元素自成一个单元素集合&#xff0c;然后按一定的规律将归于同一组元素的集合合并。在此过程中要反复用到查询某一个元素归属于哪个集合。适合这种问题的抽象…

Day 28 卡玛笔记

这是基于代码随想录的每日打卡 77. 组合 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1a; [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4], ]示例 2…

Kafka常见问题之 java.io.IOException: Disk error when trying to write to log

文章目录 Kafka常见问题之 java.io.IOException: Disk error when trying to write to log1. 问题概述2. 问题排查方向&#xff08;1&#xff09;磁盘空间不足&#xff08;2&#xff09;磁盘 I/O 故障&#xff08;3&#xff09;Kafka 日志文件损坏&#xff08;4&#xff09;Kaf…

Docker 安装详细教程(适用于CentOS 7 系统)

目录 步骤如下&#xff1a; 1. 卸载旧版 Docker 2. 配置 Docker 的 YUM 仓库 3. 安装 Docker 4. 启动 Docker 并验证安装 5. 配置 Docker 镜像加速 总结 前言 Docker 分为 CE 和 EE 两大版本。CE即社区版&#xff08;免费&#xff0c;支持周期7个月&#xff09;&#xf…

为什么“记住密码”适合持久化?

✅ 特性 1&#xff1a;应用重启后仍需生效 记住密码的本质是长期存储用户的登录凭证&#xff08;如用户名、密码、JWT Token&#xff09;&#xff0c;即使用户关闭应用、重启设备&#xff0c;仍然可以自动登录。持久化存储方案&#xff1a; React Native 推荐使用 AsyncStorag…