消费端:
修改服务端口:
server:port: 8991
创建监听器:
@Component
public class MessageListener {/*** 交换机名*/public static final String EXCHANGE_NAME = "exchange.direct.order";/*** 路由键*/public static final String ROUTING_KEY = "order";/*** 队列名*/public static final String QUEUE_NAME = "queue.order";/**** @param data* @param msg* @param channel*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_NAME),key = {ROUTING_KEY}))public void processMessage(String data, Message msg, Channel channel) {System.out.println("接收到消息:" + data);}
}
上面处理消息的方法,按上述所示添加注解,当消费端运行起来后,如果RabbitMQ中没有相应的交换机和队列,会自动创建,同时会自动创建绑定关系。如果RabbitMQ中已经手动配置了交换机和队列,则可以采用如下所示的简单配置方式(只是监听):
@RabbitListener(queues={QUEUE_NAME}))
生产端:
指定端口:
server:port: 8981
创建消息生产者
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {/*** 交换机名*/public static final String EXCHANGE_NAME = "exchange.direct.order";/*** 路由键*/public static final String ROUTING_KEY = "order";@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "hello rabbitmq"+ System.currentTimeMillis());return "消息发送成功!!!";}
}
测试
依次启动消息端、生产端。网页中请求生产端生成消息,在消费端控制台可以看到输出的消息。
可靠性保障
故障情况1:消息没有发送到消息队列
- 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
- 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
故障情况2:消息队列服务器宕机导致内存中消息丢失
- 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
解决思路故障情况3:消费端宕机或抛异常导致消息没有成功被消费
- 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
- 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)