RabbitMQ代码实战2
RPC远程过程调用模式队列(RPC)
模型
package cn.yanghuisen.rpc.server;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RPC模式队列-服务端*/
public class RPCServer {// 队列名称private static final String RPC_QUEUE_NAME = "rpc_queue";/*** 计算斐波那契数列** @param n* @return*/private static int fib(int n) {if (n == 0) return 0;if (n == 1) return 1;return fib(n - 1) + fib(n - 2);}public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");try {// 通过工厂创建连接final Connection connection = factory.newConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.queuePurge(RPC_QUEUE_NAME);/*限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。*/int prefetchCount = 1;channel.basicQos(prefetchCount);System.out.println(" [x] Awaiting RPC requests");Object monitor = new Object();// 获取消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取replyTo队列和correlationId请求标识AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 接收客户端消息String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");// 服务端根据业务需求处理response += fib(n);} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {// 将处理结果发送至replyTo队列同时携带correlationId属性channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,response.getBytes("UTF-8"));// 手动回执消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC server owner thread// RabbitMq消费者工作线程通知RPC服务器其他所有线程运行synchronized (monitor) {monitor.notify();}}};// 监听队列/*autoAck = true代表自动确认消息autoAck = false代表手动确认消息*/boolean autoAck = false;channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});// Wait and be prepared to consume the message from RPC client.// 线程等待并准备接收来自RPC客户端的消息while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
package cn.yanghuisen.rpc.client;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;/*** RPC模式队列-客户端*/
public class RPCClient implements AutoCloseable {private Connection connection;private Channel channel;// 队列名称private String requestQueueName = "rpc_queue";// 初始化连接public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] args) {try (RPCClient fibonacciRpc = new RPCClient()) {for (int i = 0; i < 10; i++) {String i_str = Integer.toString(i);System.out.println(" [x] Requesting fib(" + i_str + ")");// 请求服务端String response = fibonacciRpc.call(i_str);System.out.println(" [.] Got '" + response + "'");}} catch (IOException | TimeoutException | InterruptedException e) {e.printStackTrace();}}// 请求服务端public String call(String message) throws IOException, InterruptedException {// correlationId请求标识IDfinal String corrId = UUID.randomUUID().toString();// 获取队列名称String replyQueueName = channel.queueDeclare().getQueue();// 设置replyTo队列和correlationId请求标识AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 发送消息至队列channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));// 设置线程等待,每次只接收一个响应结果final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);// 接受服务器返回结果String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {// 将给定的元素在给定的时间内设置到线程队列中,如果设置成功返回true, 否则返回falseresponse.offer(new String(delivery.getBody(), "UTF-8"));}}, consumerTag -> {});// 从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值String result = response.take();// 从消息队列中丢弃该值channel.basicCancel(ctag);return result;}// 关闭连接public void close() throws IOException {connection.close();}
}
3.7、确认模式队列(confirm)
如何确定消息队列收到了生产者发送的消息?如果在发送消息前程序崩了怎么办?
3.7.1、事务机制控制
- txSelect():开启事务
- txCommit():提交事务
- txRollback():回滚事务
package cn.yanghuisen.tx.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;/*** 事务-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "tx";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启事务channel.txSelect();/*声明队列1、队列名称2、是否持久化3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));int i = 1/0;System.out.println(" [x] Sent '" + message + "'");channel.txCommit();}catch (Exception e){e.printStackTrace();// 回滚channel.txRollback();channel.close();connection.close();}}
}
package cn.yanghuisen.tx.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 事务-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "tx";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
缺点:降低了RabbitMQ的消息吞吐量
解决:使用confirm模式
总结:使用事务,可以在发送请求但是没有提交事务前回滚事务,撤回发送的消息。
3.7.2、确认模式(confirm)
生产者设置为确认模式,发送消息时所有的消息都会被指派一个唯一的ID,一旦消息被投递套指定的队列之后,就会返回一个确认结果给生产者(包含消息的唯一ID),这样生产者就知道了消息已经正确到达了目的地。如果消息和队列时可以持久化的,那么确认消息会将消息写入磁盘后发出。
confirm模式大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回 确认的同时继续发送下一条消息,当消息终得到确认之后,生产者应用便可以通过回调方法来处理该 确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序 同样可以在回调方法中处理该nack消息。
实现Confirm确认机制有三种方式
1、普通Confirm模式
每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。
package cn.yanghuisen.confirm.sync.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;/*** 确认模式-同步-单条-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启确认模式channel.confirmSelect();/*声明队列1、队列名称2、是否持久化3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 确认消息是否发送成功if (channel.waitForConfirms()){System.out.println("消息发送成功");}else {System.out.println("消息发送失败");}System.out.println(" [x] Sent '" + message + "'");}catch (Exception e){e.printStackTrace();}}
}
package cn.yanghuisen.confirm.sync.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 确认模式-同步-单条-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
2、批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端 confirm。
package cn.yanghuisen.confirm.sync.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;/*** 确认模式-同步-批量-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启确认模式channel.confirmSelect();/*声明队列1、队列名称2、是否持久化3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 确认消息是否发送成功-多条// 如果有一条没被确认,就会抛IO异常channel.waitForConfirmsOrDie();// if (channel.waitForConfirms()){
// System.out.println("消息发送成功");
// }else {
// System.out.println("消息发送失败");
// }System.out.println(" [x] Sent '" + message + "'");}catch (Exception e){e.printStackTrace();}}
}
package cn.yanghuisen.confirm.sync.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 确认模式-同步-批量-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "confirm_sync";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。
package cn.yanghuisen.confirm.async.send;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;/*** 确认模式-异步-发送消息*/
public class Send {// 队列名称private final static String QUEUE_NAME = "confirm_async";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");Connection connection = null;Channel channel = null;// 通过工厂创建连接try{// 维护信息发送回执deliveryTagfinal SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 创建连接connection = factory.newConnection();// 获取通道channel = connection.createChannel();// 开启确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 添加channel 监听channel.addConfirmListener(new ConfirmListener() {/*** 已确认* @param l 唯一标识* @param b 确认多条还是单条,true多条* @throws IOException*/@Overridepublic void handleAck(long l, boolean b) throws IOException {// 判断确认的是多条还是单条if (b){System.out.println("handleAck--success-->multiple" + l);// 清除前 l 标识IDconfirmSet.headSet(l+1).clear();}else {System.out.println("handleAck--success-->single" + l);confirmSet.remove(l);}}/*** 未确认* @param l* @param b* @throws IOException*/@Overridepublic void handleNack(long l, boolean b) throws IOException {if (b){System.out.println("handleNack--failed-->multiple-->" + l);// 清除前 deliveryTag 项标识idconfirmSet.headSet(l + 1L).clear();}else {System.out.println("handleNack--failed-->single" + l);confirmSet.remove(l);}}});// 循环发送消息while (true){// 消息内容String message = "Hello World!";// 获取unconfirm的消息序号Long seqNo = channel.getNextPublishSeqNo();// 将消息放入队列并发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 将消息序号添加到SortedSetconfirmSet.add(seqNo);}}catch (Exception e){e.printStackTrace();channel.close();connection.close();}}
}
package cn.yanghuisen.confirm.async.recv;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** 确认模式-异步-接收消息*/
public class Recv {// 队列名称private final static String QUEUE_NAME = "confirm_async";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.10.100");factory.setPort(5672);factory.setUsername("shop");factory.setPassword("shop");factory.setVirtualHost("/shop");// 创建连接Connection connection = factory.newConnection();// 获取信息Channel channel = connection.createChannel();// 申明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 接收消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 监听队列channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}