RabbitMQ代码实战2

server/2024/11/27 18:03:16/

RabbitMQ代码实战2

RPC远程过程调用模式队列(RPC)

模型

img

package cn.yanghuisen.rpc.server;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RPC模式队列-服务端*/
public class RPCServer {// 队列名称private static final String RPC_QUEUE_NAME = "rpc_queue";/*** 计算斐波那契数列** @param n* @return*/private static int fib(int n) {if (n == 0) return 0;if (n == 1) return 1;return fib(n - 1) + fib(n - 2);}public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");try {// 通过工厂创建连接final Connection connection = factory.newConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.queuePurge(RPC_QUEUE_NAME);/*限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。*/int prefetchCount = 1;channel.basicQos(prefetchCount);System.out.println(" [x] Awaiting RPC requests");Object monitor = new Object();// 获取消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取replyTo队列和correlationId请求标识AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 接收客户端消息String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");// 服务端根据业务需求处理response += fib(n);} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {// 将处理结果发送至replyTo队列同时携带correlationId属性channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,response.getBytes("UTF-8"));// 手动回执消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC server owner thread// RabbitMq消费者工作线程通知RPC服务器其他所有线程运行synchronized (monitor) {monitor.notify();}}};// 监听队列/*autoAck = true代表自动确认消息autoAck = false代表手动确认消息*/boolean autoAck = false;channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});// Wait and be prepared to consume the message from RPC client.// 线程等待并准备接收来自RPC客户端的消息while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
package cn.yanghuisen.rpc.client;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;/*** RPC模式队列-客户端*/
public class RPCClient implements AutoCloseable {private Connection connection;private Channel channel;// 队列名称private String requestQueueName = "rpc_queue";// 初始化连接public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] args) {try (RPCClient fibonacciRpc = new RPCClient()) {for (int i = 0; i < 10; i++) {String i_str = Integer.toString(i);System.out.println(" [x] Requesting fib(" + i_str + ")");// 请求服务端String response = fibonacciRpc.call(i_str);System.out.println(" [.] Got '" + response + "'");}} catch (IOException | TimeoutException | InterruptedException e) {e.printStackTrace();}}// 请求服务端public String call(String message) throws IOException, InterruptedException {// correlationId请求标识IDfinal String corrId = UUID.randomUUID().toString();// 获取队列名称String replyQueueName = channel.queueDeclare().getQueue();// 设置replyTo队列和correlationId请求标识AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 发送消息至队列channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));// 设置线程等待,每次只接收一个响应结果final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);// 接受服务器返回结果String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {// 将给定的元素在给定的时间内设置到线程队列中,如果设置成功返回true, 否则返回falseresponse.offer(new String(delivery.getBody(), "UTF-8"));}}, consumerTag -> {});// 从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值String result = response.take();// 从消息队列中丢弃该值channel.basicCancel(ctag);return result;}// 关闭连接public void close() throws IOException {connection.close();}
}

3.7、确认模式队列(confirm)

如何确定消息队列收到了生产者发送的消息?如果在发送消息前程序崩了怎么办?

3.7.1、事务机制控制
  • txSelect():开启事务
  • txCommit():提交事务
  • txRollback():回滚事务
package cn.yanghuisen.tx.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;/*** 事务-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "tx";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启事务channel.txSelect();/*声明队列1、队列名称2、是否持久化3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));int i = 1/0;System.out.println(" [x] Sent '" + message + "'");channel.txCommit();}catch (Exception e){e.printStackTrace();// 回滚channel.txRollback();channel.close();connection.close();}}
}
package cn.yanghuisen.tx.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 事务-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "tx";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

缺点:降低了RabbitMQ的消息吞吐量

解决:使用confirm模式

总结:使用事务,可以在发送请求但是没有提交事务前回滚事务,撤回发送的消息。

3.7.2、确认模式(confirm)
生产者设置为确认模式,发送消息时所有的消息都会被指派一个唯一的ID,一旦消息被投递套指定的队列之后,就会返回一个确认结果给生产者(包含消息的唯一ID),这样生产者就知道了消息已经正确到达了目的地。如果消息和队列时可以持久化的,那么确认消息会将消息写入磁盘后发出。

confirm模式大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回 确认的同时继续发送下一条消息,当消息终得到确认之后,生产者应用便可以通过回调方法来处理该 确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序 同样可以在回调方法中处理该nack消息。

实现Confirm确认机制有三种方式

1、普通Confirm模式

每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。

package cn.yanghuisen.confirm.sync.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;/*** 确认模式-同步-单条-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启确认模式channel.confirmSelect();/*声明队列1、队列名称2、是否持久化3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 确认消息是否发送成功if (channel.waitForConfirms()){System.out.println("消息发送成功");}else {System.out.println("消息发送失败");}System.out.println(" [x] Sent '" + message + "'");}catch (Exception e){e.printStackTrace();}}
}
package cn.yanghuisen.confirm.sync.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 确认模式-同步-单条-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

2、批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端 confirm。

package cn.yanghuisen.confirm.sync.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;/*** 确认模式-同步-批量-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启确认模式channel.confirmSelect();/*声明队列1、队列名称2、是否持久化3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 确认消息是否发送成功-多条// 如果有一条没被确认,就会抛IO异常channel.waitForConfirmsOrDie();//            if (channel.waitForConfirms()){
//                System.out.println("消息发送成功");
//            }else {
//                System.out.println("消息发送失败");
//            }System.out.println(" [x] Sent '" + message + "'");}catch (Exception e){e.printStackTrace();}}
}
package cn.yanghuisen.confirm.sync.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 确认模式-同步-批量-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。

package cn.yanghuisen.confirm.async.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;/*** 确认模式-异步-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "confirm_async";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{// 维护信息发送回执deliveryTagfinal SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 创建连接connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 添加channel 监听channel.addConfirmListener(new ConfirmListener() {/*** 已确认* @param l 唯一标识* @param b 确认多条还是单条,true多条* @throws IOException*/@Overridepublic void handleAck(long l, boolean b) throws IOException {// 判断确认的是多条还是单条if (b){System.out.println("handleAck--success-->multiple" + l);// 清除前 l 标识IDconfirmSet.headSet(l+1).clear();}else {System.out.println("handleAck--success-->single" + l);confirmSet.remove(l);}}/*** 未确认* @param l* @param b* @throws IOException*/@Overridepublic void handleNack(long l, boolean b) throws IOException {if (b){System.out.println("handleNack--failed-->multiple-->" + l);// 清除前 deliveryTag 项标识idconfirmSet.headSet(l + 1L).clear();}else {System.out.println("handleNack--failed-->single" + l);confirmSet.remove(l);}}});// 循环发送消息while (true){// 消息内容String message = "Hello World!";// 获取unconfirm的消息序号Long seqNo = channel.getNextPublishSeqNo();// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 将消息序号添加到SortedSetconfirmSet.add(seqNo);}}catch (Exception e){e.printStackTrace();channel.close();connection.close();}}
}
package cn.yanghuisen.confirm.async.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 确认模式-异步-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "confirm_async";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

http://www.ppmy.cn/server/145399.html

相关文章

基于Angular+BootStrap+SpringBoot简单的购物网站

目录 一、项目结构图 二、目录结构解析 后端 (Spring Boot) 前端 (Angular) 三、技术栈 四、具体功能实现 五、数据库设计 六、后端实现 1. 设置Spring Boot项目 2. 数据库实体类 3. 创建Repository 4. 创建Service层 5. 创建Controller层 七、前端实现&#xff0…

【es6进阶】如何使用Proxy实现自己的观察者模式

观察者模式&#xff08;Observer mode&#xff09;指的是函数自动观察数据对象&#xff0c;一旦对象有变化&#xff0c;函数就会自动执行。这里&#xff0c;我们是使用es6的proxy及reflect来实现这个效果。 实现效果 业务分析 源数据 const object2 {name: "张三"…

如何使用OCR技术批量识别图片中的文字并重命名文件,OCR 技术批量识别图片中的文字可能出现的错误

字符识别错误 形近字混淆&#xff1a;例如 “已” 和 “己”、“未” 和 “末” 等&#xff0c;由于外形极为相似&#xff0c;OCR 软件在识别时可能出现误判&#xff0c;将原本正确的字识别成与之形近的另一个字。比如在识别一篇手写的文章中&#xff0c;手写体的 “已” 可能就…

软件测试面试之常规问题

1.描述一下测试过程 类似题目:测试的生命周期 思路:这是一个“范围”很大的题目&#xff0c;而且回答时间一般在3分钟之内&#xff0c;不可能非常详细的描述整个过程&#xff0c;因此答题的思路要从整体结构入手&#xff0c;不要过细。为了保证答案的准确性&#xff0c;可以引…

SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷

前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。 但是呢&#xff0c;没有演示具体应用到项目中的实例。 这里使用RabbitMQ来实现流量的削峰添谷。 一&#xff1a;添加pom依赖 <!--rabbitmq-需要的 AMQP 依赖--> <dependency><groupId>org.springfr…

TCP IP协议和网络安全

传输层的两个协议&#xff1a; 可靠传输 TCP 分段传输 建立对话&#xff08;消耗系统资源&#xff09; 丢失重传netstat -n 不可靠传输 UDP 一个数据包就能表达完整的意思或屏幕广播 应用层协议&#xff08;默认端口&#xff09;&#xff1a; httpTCP80 网页 ftpTCP21验证用户身…

在Ubuntu2004中搭建基于ESP-IDF v5.1的ESP32-S3开发环境

在Ubuntu2004中搭建基于ESP-IDF v5.1的ESP32-S3开发环境 目录 1 基本资料 2 注意事项 2.1 子模块检出失败处理 2.2 选择 Espressif 下载服务器 2.3 自定义工具安装路径 2.4 导出环境变量 2.5 测试基础环境 3 创建自己的工程 3.1 创建基础应用工程 3.2 创建组件(…

Linux 虚拟机下安装RedisJSON

文章目录 一、安装 Redis二、安装RedisJSON 一、安装 Redis 安装地址 二、安装RedisJSON RedisJSON github 地址 选择版本&#xff0c;下载压缩包。 RedisJson 是根据 Rust 开发编译的&#xff0c;所以我们要在系统中安装 Rust。官网地址。 国内下载 Rust 下载较慢&#x…