目录
一、第一章
1、pom依赖
二、第二章
1、消息属性对象(Delivery delivery)
2、信道对象 (发送消息根据路由发送,接收消息根据队列接收)
3、工作队列模式
4、消息应答 (消费者)
5、消息自动重新入队 (消费者)
6、RabbitMQ持久化 (生产者)
7、不公平分发(能者多劳)
8、发布确认消息(生产者)
9、交换机
10、Exchanges交换机的类型
11、临时队列
12、交换机和队列的关系
13、交换机模式
(1)Fanout模式(发布/订阅)
(2)Direct 直接交换机模式(路由模式)
(3)Topic 主题模式
14、死信队列(队列消息自动发送到配置的路由中)
15、延迟队列(ttl)
一、第一章
1、pom依赖
<dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies><!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
二、第二章
1、消息属性对象(Delivery delivery)
Delivery 类
api
delivery.getBody() 消息的内容(byte类型) 装箱: new String(delivery.getBody())delivery.getProperties() 当前消息的详细属性(很多get方法)delivery.getEnvelope() 消息的属性delivery.getEnvelope().getDeliveryTag() 当前消息在当前消费者中的位置 (1标识当前消费者处理的第一条消息)delivery.getEnvelope().getRoutingKey() 消息的路由Keydelivery.getEnvelope().getExchange(); 消息的交换机delivery.getEnvelope().isDedeliver(); 是否是重新发送消息
2、信道对象 (发送消息根据路由发送,接收消息根据队列接收)
//创建连接
Connection connection=factory.newConnection();
//发消息是通过信道发送消息,所以要创建信道
//获取信道
Channel channel= connection.createChannel();
消费者channel.basicAck(delivery,true) (用于肯定确认应答) true表示批量应答信道中的消息都会被应答 (未被处理的也应答),false表示不批量应答,只应答当前消息channel.basicNack (用于否定确认应答)channel.basicReject (用于否定确认应答)channel.basicQos(1) (消息暂存区大小,0无限大,n表示当前有消息未处理完时候可以容纳的其他消息条数)//队列 //自动应答 //成功回调 // 消息被取消的回调channel.basicConsume(QUEUE_NAME,true, deliverCallback1,cancelCallback); //(消息是异步的) 不会阻塞当前线程的运行
生产者channel.confirmSelect(); //开启发布确认,返回值为bool//设置队列名 持久化 单个消费者 自动删除 其他参数channel.queueDeclare("cons" ,false ,false, false, null); //创建队列 创建在生产者或消费者都可String queueName=channel.queueDedare.getQueue(); //,创建一个临时队列,返回值队列的名称//交换机 路由id 其他参数 消息内容channel.basicPublish("",QUEUE_NAME,null,mssage.getBytes()); //发送消息boolean flag=channel.waitForConfirms(); // 确认前面全部发布的消息 (可以每发布调用一次,也可以批量发布后再确认)//确认消息回调 未确认消息消息回调channel.addConfirmListener(confirmCallback1,confirmCallback2); // 异步应答监听 (需要创建再发布消息之前)
其他channel.getNextPublishSeqNo() //下一次发送的消息的标号 (标号从1开始)//声名一个交换机 //交换机名字 交换机类型channel.exchangeDeclare(“logs”, BuiltinExchangeType.FANOUT ); //定义在消费者或者生产者都行 创建在生产者或消费者都可//绑定交换机和队列 队列名 交换机名 routingKey 绑定序号(路由序号)channel.queueBind(queueName, "logs", ""); //定义在消费者或者生产者都行
示例代码
连接工厂
//连接工厂创建信道的工具类
public class RabbitMqUtis {public static Channel getChannel() throws Exception{//创建连接工厂ConnectionFactory factory=new ConnectionFactory();//工厂ip 连接 RabbitMQ的队列factory.setHost("192.168.1.5");//用户名factory.setUsername("usermq");//密码factory.setPassword("111111");//创建连接Connection connection=factory.newConnection();// MessageProperties.PERSISTENT_TEXT_PLAIN//发消息是通过信道发送消息,所以要创建信道//获取信道return connection.createChannel();}
}
生产者
//生产者
//发送大量的消息
//发消息
public class Task_one {//队列名称public static final String QUEUE_NAME="hello";//发送大量消息public static void main(String[] args) throws Exception {Channel channel=RabbitMqUtis.getChannel();//声明一个队列//参数//1、队列名称//2、队列里的消息是否持久化(磁盘) 默认情况消息存储在内存中//3、该队列是否只提供一个消费者进行消费 是否进行消息共享,true可以多个消费者消费//4、是否自动删除 最后一个消费者端开连接以后 该队一句是否删除 true自动删除 false不自动删除//5、其他参数channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台接收消息Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String mssage=scanner.next();//1、发送到哪个交换机//2、路由的key值是哪个,本次是队列的名称//3、消息持久化(MessageProperties.PERSISTENT_TEXT_PLAIN)//4、发送消息的消息体channel.basicPublish("",QUEUE_NAME,null,mssage.getBytes());System.out.println("发送消息完成:"+mssage);}}
}
消费者
//消费者工作线程
// 这是一个线程
public class Worker_one {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//获得信道Channel channel= RabbitMqUtis.getChannel();//消费消息//消费者接收消息//1、消费哪个队列//2、消费成功之后是否要自动应答 (true代表自动应答,false代表手动应答)//3、消费者消费成功的回调方法 (拉姆达表达式)//4、消费者消费消费失败的回调 (拉姆达表达式)//成功回调DeliverCallback deliverCallback1=new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {// s:当前会话 (同一个消费者会话是相对的)// delivery 消息属性// new String( delivery.getBody() 消息的valueSystem.out.println("s:"+s);System.out.println("delivery.getBody():"+ new String( delivery.getBody()));System.out.println("delivery.getEnvelope().getDeliveryTag():"+delivery.getEnvelope().getDeliveryTag());System.out.println("delivery.getExchange().getDeliveryTag():"+delivery.getEnvelope().getExchange());System.out.println("delivery.getExchange().)getRoutingKey():"+delivery.getEnvelope().getRoutingKey());}};//消息被取消CancelCallback cancelCallback=new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("当前会话:"+s);}};System.out.println("C2等待接收消息....");//队列 //自动应答 //成功回调 // 消息被取消的回调channel.basicConsume(QUEUE_NAME,true, deliverCallback1,cancelCallback);}
}
模式
p为生产者 c为消费者 在中间的队列 rabbitmqp生产者(java程序)创建连接工厂--通过工厂创建连接--通过连接创建信道--设置信道队列--发送消息代码..//声明一个队列//参数//1、队列名称//2、队列里的消息是否持久化(磁盘) 默认情况消息存储在内存中//3、该队列是否只提供一个消费者进行消费 是否进行消息共享,true可以多个消费者消费//4、是否自动删除 最后一个消费者端开连接以后 该队列是否删除 true自动删除 false不自动删除//5、其他参数//队列 持久化 单个消费者 自动删除 其他配置(map)channel.queueDeclare(QUEUE_NAME ,false ,false, false, null);//1、发送到哪个交换机//2、路由的key值是哪个,本次是队列的名称//3、其他参数信息//4、发送消息的消息体//交换机 路由的key 其他配置(AMQP) 消息内容channel.basicPublish("", QUEUE_NAME, null, mssage.getBytes());
c为消费者(java程序)创建连接工厂--通过工厂创建连接--通过连接创建信道--通过信道接收消息执行消费者语句是异步的,它有自己的独立线程代码..//消费消息//消费者接收消息//1、消费哪个队列//2、消费成功之后是否要自动应答 (true代表自动应答,false代表手动应答)//3、消费者消费成功的回调方法 (拉姆达表达式)//4、消费者消费消费失败的回调 (拉姆达表达式)//成功回调DeliverCallback deliverCallback1=new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {// s:当前会话 (同一个消费者会话是相同的)// delivery 消息属性// new String( delivery.getBody() 消息的value//delivery.getEnvelope() 消息的属性//delivery.getEnvelope().getDeliveryTag() 消息的标记// delivery.getBody() 消息的内容(比特数据)System.out.println("s:"+s);System.out.println("delivery.getBody():"+ new String( delivery.getBody(),"UTF-8"));}};//消息被取消CancelCallback cancelCallback=new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("当前会话:"+s);}};//队列 //自动应答 //成功回调 // 消息被取消的回调channel.basicConsume(QUEUE_NAME,true, deliverCallback1,cancelCallback); //(消息是异步的) 不会阻塞当前线程的运行
3、工作队列模式
多线程处理消息:一条消息只能被处理一次
生产者发送消息----> 队列---> 接收消息 大量工作线程
轮训接收消息
消费者工作线程轮流接收消息(工作线程是竞争关系)
4、消息应答 (消费者)
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制:
如果未被应答会的时候消费者程序down掉(用于否定确认应答也是应答), 消息会重新返回队列发放给其他消费者
(消费者 basic.reject或 basic.nack)并且requeue=false收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了自动应答:消费者收到消息就是成功手动应答:调用应答方法告诉RabbitMQ 该消息已经被处理了,如果未被应答会,消息会重新返回队列发放给其他消费者channel.basicAck (用于肯定确认应答) channel.basicNack (用于否定确认应答)channel.basicReject (用于否定确认应答)与channel.basicNack相比少一个参数不处理该消息直接拒绝,可以将其丢弃了手动应答的好处是可以批量应答且减少网络拥堵,也可以重新放回队列不丢失数据批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(), true)message.getEnvelope().getDeliveryTag() ;当前消费者处理的消息的标识(从1开始)true表示批量应答信道中的消息都会被应答 (未被处理的也应答)false表示不批量应答,只应答当前消息channel.basicNack(message.getEnvelope().getDeliveryTag(), true, true)message.getEnvelope().getDeliveryTag() ;当前消费者处理的消息的标识(从1开始)第一个true :表示当前消息可以重新入队第二个true: 表示批量处理未确认的消息channel.basicReject(message.getEnvelope().getDeliveryTag(), true); message.getEnvelope().getDeliveryTag():当前消费者处理的消息的标识(从1开始)true 表示否定确认的消息允许返回队列false 表示否定确认的消息不允许返回队列1、 //手动应答(正确应答) //1、消息的标记 tag (相当于当前消息在信道的位置)//2、是否批量应答 (批量应答会)//message.getEnvelope() 消息的属性//message.getEnvelope().getDeliveryTag() 消息的标识// message.getBody() 消息的内容channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
5、消息自动重新入队 (消费者)
未确认的消息默认会重新入队
当消费者消息未确认(消费者确认包括成功确认和否定确认)程序就down掉时候 消息会重新进入队列中发送给其他消费者
6、RabbitMQ持久化 (生产者)
1、 队列持久化(防止RabbitMQ down掉后队列消失)设置队列持久化(在生产者中设置)第二个参数表示是否持久化(只能在创建的时候定义,创建后不能进行修改)channel.queueDeclare(task_queue_name,true,false,false,null); 2、消息持久化(即使队列未持久化了,如果消息未持久化那么队列消息已经不会被保留)将消息标记为持久化并不能完全保证不会丢失消息设置消息持久化(在生产者中设置)第三个参数设置成 (MessageProperties.PERSISTENT_TEXT_PLAIN)channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,massage.getBytes(StandardCharsets.UTF_8));
7、不公平分发(能者多劳)
预取值设置分发给消费者的条数当前当前消费者有消息未提交确认 那么允许再接收的消息数量( 0无限大,1就是1,2就是2)RabbitMQ分发消息默认是轮训分发(按次序发送消息给消费者)设置不公平分发(在消费者的channel中设置) //设置某个消费者中暂存区的消息容量//当前当前消费者有消息未提交确认 那么允许再接收的消息数量( 0无限大,1就是1,2就是2)channel.basicQos(1) //默认是0 (表示消息暂存在消费者中无限大), 3 (表示当一条消息未处理完成时候允许暂存在消费者中暂存的消息数量为3) (当前消息未处理完且暂存区满 则不会接收消费其他消息,提交确认才表示处理完)
8、发布确认消息(生产者)
生产者--发消息-->队列
消息不丢失条件:
1、设置要求队列持久化 (生产者)
2、设置队列中的消息必须持久化 (生产者)
3、发布确认
发布确认步骤:信道开启发布确认方法(默认是关闭的)Channel channel =connection.createChannel();channel.confirmSelect();//开启发布确认channel.confirmSelect(); //返回值未bool
单个确认发布消息未确认不会再发送下一条消息,速速度慢,但能确认每一天消息是否发布成功//开启发布消息确认channel.confirmSelect();//生产者发布消息后//确认消息//每执行一次调用一次改方法 (单个发布确认)boolean flag=channel.waitForConfirms(); //发布耗时间:513ms 总共条数:1000条
批量确认发布消息批量发布确认,消息出错无法确认哪个有问题,速度块// 76ms 总共条数:1000条//发布很多消息后再执行改方法,(批量发布确认)boolean flag=channel.waitForConfirms(); //发布耗时间:513ms 总共条数:1000条
异步确认消息性价比高,但是编程逻辑复制些,通过函数回调来保证是否投递成功//channel.getNextPublishSeqNo() 获取下次发送消息的标识 (标识从1开始)//开启发布确认channel.confirmSelect();//消息的监听器 ,监听哪些消息成功了,哪些消息失败//消息确认成功 回调函数ConfirmCallback AckconfirmCallback=new ConfirmCallback() {@Overridepublic void handle(long l, boolean b) throws IOException {//(l 消息的标识(第几条消息),b 是否批量确认消息)System.out.println("确认的消息:"+l);}};//消息确认失败 回调函数//参数:(消息编号,是否为批量确认)ConfirmCallback NacconfirmCallback=new ConfirmCallback() {@Overridepublic void handle(long l, boolean b) throws IOException {System.out.println("未确认的消息:"+l);}};// 异步的监听(定义在发布消息前面,确认消息)//确认消息回调 未确认消息消息回调channel.addConfirmListener(confirmCallback1,confirmCallback2); //
处理未确认的消息使用并发编程相关的集合统计 ConcurrentLinkeQueue (可以进行主线程(发布消息那个线程)和异步确认线程交互)这个队列在confirm callbacks(发布确认回调:成功确认,未成功确认) 与发布线程之间进行消息的传递ConcurrentSkipListMap<Long,String> outstandingConfirms=new ConcurrentSkipListMap<>();//消息的监听器 ,监听哪些消息成功了,哪些消息失败//消息确认成功 回调函数ConfirmCallback AckconfirmCallback=new ConfirmCallback() {@Overridepublic void handle(long l, boolean b) throws IOException {//(l 消息的标识(第几条消息),b 是否批量确认消息)//确认成功不用处理outstandingConfirms.remove(l);//处理确认消息:2、在全部消息中删除掉已经确认的消息System.out.println("确认的消息:"+l);}};//消息确认失败 回调函数//参数:(消息编号,是否为批量确认)ConfirmCallback NacconfirmCallback=new ConfirmCallback() {@Overridepublic void handle(long l, boolean b) throws IOException {//处理确认消息:3、打印一些未确认的消息System.out.println("未确认的消息:"+outstandingConfirms.get(l));}};// 异步的监听(定义在发布消息前面,确认消息)channel.addConfirmListener(AckconfirmCallback,NacconfirmCallback); //for (int i=0;i<COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));//信道 channel.getNextPublishSeqNo() :下一次发布消息的序号 (序号从1开始)Long sd2=channel.getNextPublishSeqNo();outstandingConfirms.put(sd2-1, message);//发布确认//处理确认消息:1、记录下全部发布的消息}
9、交换机
空串为默认交换机
生产者-->交换机-->多个队列-->队列对应的消费者
每个任务都恰好交付给一个消费者(工作进程)
我们将做一些完全不同的事件传递给多个消费者,这种模式称为发布/订阅模式
//声名一个交换机 //交换机名字 交换机类型
channel.exchangeDeclare(“logs”, "fanout");
10、Exchanges交换机的类型
直接(direct路由类型,默认类型,""空串)、
主题(topic)
标题(headers头类型)、
扇出(fanout发布/订阅),
对应枚举
BuiltinExchangeType.FANOUT 等
//声名一个交换机 //交换机名字 交换机类型channel.exchangeDeclare(“logs”, "fanout");//绑定交换机和队列 队列名 交换机名 routingKey 绑定序号(路由序号)channel.queueBind(queueName, "logs","");
11、临时队列
web控件显示:AD、ExdRabbitMQ随机创建的随机名称的队列,一旦我们断开了消费者的连接,队列将被自动删除String queueName=channel.queueDedare.getQueue(); //,创建一个临时队列,返回值队列的名称
12、交换机和队列的关系
路由根据RoutingKey绑定队列RoutingKey 为路由序号(交换机绑定队列的序号)//绑定交换机和队列 队列名 交换机名 routingKey 绑定序号(路由序号)channel.queueBind(queueName, EXCHANGE_NAME, "");
13、交换机模式
(1)Fanout模式(发布/订阅)
Five包对应枚举BuiltinExchangeType.FANOUT 等路由把消息 全部一样发给它绑定的队列中(与routingKey路由id无关)它是将接收到的所有消息广播到它知道的所以队列中。系统中默认有exchange交换机类型//声名一个交换机 //交换机名字 交换机类型channel.exchangeDeclare(“logs”, "fanout");
(2)Direct 直接交换机模式(路由模式)
six对应枚举BuiltinExchangeType.DIRECT 等是交换机默认的模式通过指定routingKey路由id 来发送消息给对应的队列(routingKey可以相同)一个队列的routingKey 可以有多个(可以绑定多个路由,或者绑定一个路由多个routingKey)生产者发送更加路由id发送消息给队列,队列轮询发部消息给消费者
(3)Topic 主题模式
最强大
Receive包把路由id设置成层次结构(x.x.x) ,交换机会发送满足路由i条件的数据给它绑定的队列枚举 BuiltinExchangeType.TOPICroutingKey设置成 层次结构 (wer.we或 wer.re 或 *.er或者wer.*.tr 类似文件结构命名)lazy.# 表示后面可以是任意多个.x.x一个队列可以多个路由id ,也能绑定多个路由路由发送消息根据routingKey去匹配注意 当一个队列绑定的是#,那么这个队列就会接收这个路由发送的所有消息
14、死信队列(队列消息自动发送到配置的路由中)
Eites 包队列中有消息无法被消费,成为了死信,自然就形成了死信队列例如:用户下单不付款死信的来源:消息TTL(存活时间)过期、队列达到最大长度(队列满了,无法再添加数据到mq中),消息被拒(消费者否定应答 channel.basicNack 或 channel.basicReject )并且 requeue=false (消息不重新入队)
队列中消息成为死信-->死信消息传输到死信交换机-->死信消息发布到死信队列-->消费者处理消息
接收消息队列绑定死信交换机(通过其他配置map)1、消息过期时间情况(可以普通队列map设置,也可以生产者发送消息设置)//死信消息 设置过期时间msAMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();// 交换机 队列名字 其他配置 消息内容channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties, i.toString().getBytes()); 2、 消息被拒情况channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false); //否定应答 且不会重新发送回到队列,该队列如果设置了死信路由,消息会被发送到死信路由中3、队列消息超出通过map配置队列中正常消息,超出部分将会被自动转发到死信路由中通过map改变对队列配置,如果队列已经存在,那么会报错//设置正常消息的允许在队列中堆积数为6, 超出部分将会成为死信消息,自动转发到死信路由中map.put("x-max-length",6);普通队列设置死信路由 通过其他参数的map(普通队列中的消息 过期、被拒绝、队列超出 都自动转发到死信路由中)//其他参数Map<String,Object> mao=new HashMap<>();//过期时间 生产者发送消息再设置也行//map.put("x-message-ttl",10000);//配置项 //死信路由名字map.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置当前队列的死信路由//配置项 //普通队列中的死信路由和死信队列之间的路由idmap.put("x-dead-letter-routing-key","lisi"); //配置当前队列被拒的消息或者过期消息 转发到死信路由中的哪个队列(这个路由id为死信路由和死信队列绑定的id,普通队列配置了死信路由它们之间不需要路由id)//普通队列 队列名字 持久化 单独消费者 自动删除 其他配置(map)channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
15、延迟队列(ttl)
ttl过期消息(即死信消息)传输到消费者延迟队列应用场景订单在十分钟之内 未支付则自动取消新创建的店铺,如果十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天没有登录则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议之后,需要在预定时间的时间点前十分钟通知各个与会议人员参加