一、搭建RabbitMq服务并创建账号
服务采用Docker临时搭建,版本采用3.8,命令如下
-
拉取镜像
docker pull rabbitmq:3.8.34-management
-
创建容器
由于我使用的是 Docker Desktop 可通过可视化界面创建容器,将端口对应好即可,如下图:
-
创建账号
容器启动完毕后,访问本机 127.0.0.1:15672进入管理页面,输入初始化用户名密码 guest/guest
进入页面后选择,创建一个管理员账号,步骤如下图:
二、整合SpringBoot
-
首先引入依赖,版本可根据自身得springboot版本来确定
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.5.4</version></dependency>
-
yml中添加配置项
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: admin
三、服务初始化创建队列
通过集合配置实现自定交换机与队列的创建,具体代码如下:
@Slf4j
@Configuration
public class RabbitMqConfig {private List<RabbitMq> getConfig(){return Arrays.asList(RabbitMq.builder().type(ExchangeType.TOPIC).exchangeName("Topic.Student.Operate").queueName("Queue.student.Operate").key("student").build(),RabbitMq.builder().type(ExchangeType.FANOUT).exchangeName("Fanout.Notice.Operate").queueName("Queue.Notice.Operate.East").key("notice").build(),RabbitMq.builder().type(ExchangeType.FANOUT).exchangeName("Fanout.Notice.Operate").queueName("Queue.Notice.Operate.West").key("notice").build(),RabbitMq.builder().type(ExchangeType.FANOUT).exchangeName("Fanout.Notice.Operate").queueName("Queue.Notice.Operate.South").key("notice").build(),RabbitMq.builder().type(ExchangeType.FANOUT).exchangeName("Fanout.Notice.Operate").queueName("Queue.Notice.Operate.North").key("notice").build());}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory){RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);rabbitAdmin.setAutoStartup(true);for (RabbitMq mq : getConfig()){rabbitAdmin.declareExchange(getExchange(mq.getType(), mq.getExchangeName()));rabbitAdmin.declareQueue(new Queue(mq.getQueueName(),true,false,false));rabbitAdmin.declareBinding(new Binding(mq.getQueueName(),Binding.DestinationType.QUEUE, mq.getExchangeName(), mq.getKey(), null));log.info("【RabbitMQ】create new Queue '"+mq.getQueueName()+"' in '"+mq.getType()+"' Exchange '" +mq.getExchangeName()+"' with '"+mq.getKey()+"'");}return rabbitAdmin;}private Exchange getExchange(ExchangeType type,String name){switch (type){case TOPIC:return new TopicExchange(name);case FANOUT:return new FanoutExchange(name);case DIRECT:return new DirectExchange(name);default:throw new BusinessException("无效的交换机类型");}}}
其中RabbitMq实体代码如下:
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class RabbitMq {private ExchangeType type;private String exchangeName;private String queueName;private String key;
}
配置完毕后启动服务,查看服务日志,如下所示:
2023-03-17 08:39:33.455 INFO 6612 --- [ main] com.ryan.project.config.RabbitMqConfig : 【RabbitMQ】create new Queue 'Queue.student.Operate' in 'TOPIC' Exchange 'Topic.Student.Operate' with 'student'
2023-03-17 08:39:33.458 INFO 6612 --- [ main] com.ryan.project.config.RabbitMqConfig : 【RabbitMQ】create new Queue 'Queue.Notice.Operate.East' in 'FANOUT' Exchange 'Fanout.Notice.Operate' with 'notice'
2023-03-17 08:39:33.460 INFO 6612 --- [ main] com.ryan.project.config.RabbitMqConfig : 【RabbitMQ】create new Queue 'Queue.Notice.Operate.West' in 'FANOUT' Exchange 'Fanout.Notice.Operate' with 'notice'
2023-03-17 08:39:33.462 INFO 6612 --- [ main] com.ryan.project.config.RabbitMqConfig : 【RabbitMQ】create new Queue 'Queue.Notice.Operate.South' in 'FANOUT' Exchange 'Fanout.Notice.Operate' with 'notice'
2023-03-17 08:39:33.465 INFO 6612 --- [ main] com.ryan.project.config.RabbitMqConfig : 【RabbitMQ】create new Queue 'Queue.Notice.Operate.North' in 'FANOUT' Exchange 'Fanout.Notice.Operate' with 'notice'
此时交换机与队列均已创建完毕,可通过页面查看,如图所示:
四、消息发送
创建完队列后即可发送消息,下面是2个案例,向不同交换机发送消息,代码如下:
-
定义发送消息接口
public interface RabbitMqService {/*** 发送学生操作消息** @param message 消息内容*/public void sendStudentOperateMessage(String message);/*** 发送通知** @param message 通知内容*/public void sendNoticeMessage(String message); }
-
发送消息接口实现类
@Slf4j @Service public class RabbitMqServiceImpl implements RabbitMqService {@Resourceprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendStudentOperateMessage(String message) {String exchange = "Topic.Student.Operate";rabbitTemplate.convertAndSend(exchange, "student", message);log.info("【RabbitMQ】Send message to '" + exchange + "' with '" + message + "'");}@Overridepublic void sendNoticeMessage(String message) {String exchange = "Fanout.Notice.Operate";rabbitTemplate.convertAndSend(exchange, "notice", message);log.info("【RabbitMQ】Send message to '" + exchange + "' with '" + message + "'");} }
-
编写测试Controller
@RestController public class RabbitMqController {@Resourceprivate RabbitMqService rabbitMqService;@GetMapping("/student")public String student() {String message = "张剑注销了自己的饭卡";rabbitMqService.sendStudentOperateMessage(message);return message;}@GetMapping("/notice")public String notice() {String message = "【通知】四个校区今年全体放假364天";rabbitMqService.sendNoticeMessage(message);return message;}}
-
访问接口
2023-03-17 08:55:09.955 INFO 2348 --- [nio-8888-exec-1] c.r.project.service.RabbitMqServiceImpl : 【RabbitMQ】Send message to 'Fanout.Notice.Operate' with '【通知】四个校区今年全体放假364天'2023-03-17 08:56:00.953 INFO 2348 --- [nio-8888-exec-5] c.r.project.service.RabbitMqServiceImpl : 【RabbitMQ】Send message to 'Topic.Student.Operate' with '张剑注销了自己的饭卡'
-
查看消息
发现已经存在消息,当然也可以进入具体队列,查看对应消息,如下:
五、接收消息
接收消息是针对每个具体的队列,实现代码如下:
@Slf4j
@Component
public class RabbitMqReceive {@RabbitListener(queues = "Queue.student.Operate")public void opera(String context) {log.info("【学生操作】" + context);}@RabbitListener(queues = "Queue.Notice.Operate.East")public void east(String context) {log.info("【东部学院】" + context);}@RabbitListener(queues = "Queue.Notice.Operate.West")public void west(String context) {log.info("【西部学院】" + context);}@RabbitListener(queues = "Queue.Notice.Operate.South")public void south(String context) {log.info("【南部学院】" + context);}@RabbitListener(queues = "Queue.Notice.Operate.North")public void north(String context) {log.info("【北部学院】" + context);}}
服务启动查看控制台打印信息:
2023-03-17 09:02:03.985 INFO 3688 --- [ntContainer#0-1] c.ryan.project.service.RabbitMqReceive : 【学生操作】张剑注销了自己的饭卡
2023-03-17 09:02:03.985 INFO 3688 --- [ntContainer#1-1] c.ryan.project.service.RabbitMqReceive : 【南部学院】【通知】四个校区今年全体放假364天
2023-03-17 09:02:03.988 INFO 3688 --- [ntContainer#2-1] c.ryan.project.service.RabbitMqReceive : 【东部学院】【通知】四个校区今年全体放假364天
2023-03-17 09:02:03.991 INFO 3688 --- [ntContainer#3-1] c.ryan.project.service.RabbitMqReceive : 【西部学院】【通知】四个校区今年全体放假364天
2023-03-17 09:02:03.995 INFO 3688 --- [ntContainer#4-1] c.ryan.project.service.RabbitMqReceive : 【北部学院】【通知】四个校区今年全体放假364天
此时查看可视化页面,发现消息数均为0,说明消息均被消费