-
需创建复合项目
-
父工程 Maven 依赖:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><!-- 父工程要打成 pom 包--><groupId>com.qcby</groupId><artifactId>springboot-rabbitmq</artifactId><packaging>pom</packaging><modules><module>producer</module><module>sms-consumer</module></modules><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- springboot-web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 添加springboot对amqp的支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><!--fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
-
生产者工程代码:
- 配置类:
package com.qcby.config;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class RabbitMQConfig {//定义交换机private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_ex";//定义短信队列private String FANOUT_SMS_QUEUE = "fanout_sms_queue";//定义邮件队列private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";//配置邮件队列@Beanpublic Queue emailQueue(){return new Queue(FANOUT_EMAIL_QUEUE);}//配置消息队列@Beanpublic Queue smsQueue(){return new Queue(FANOUT_SMS_QUEUE);}//配置交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);}//将队列绑定交换机@Beanpublic Binding bindingSmsFanoutExchange(Queue smsQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(smsQueue).to(fanoutExchange);}@Beanpublic Binding bindingEmailFanoutExchange(Queue emailQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(emailQueue).to(fanoutExchange);} }
- 消息实体类:
package com.qcby.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable;@Data @AllArgsConstructor @NoArgsConstructor public class MsgEntity implements Serializable {private String MsgId;private String UserId;private String phone;private String email; }
- 生产者代码:
package com.qcby.controller;import com.qcby.entity.MsgEntity; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@RestController public class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 发送消息** @return*/@RequestMapping("/sendMsg")public String sendMsg() {/*** 1.交换机名称* 2.路由key名称* 3.发送内容*/MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),"22","12345","edddd");amqpTemplate.convertAndSend("boyatop_ex", "", msgEntity);return "success";} }
- yml 文件:
spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: boyatopVirtualHost
- 配置类:
-
消费者工程代码:
- 实体类:
package com.qcby.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable;@Data @AllArgsConstructor @NoArgsConstructor public class MsgEntity implements Serializable {private String MsgId;private String UserId;private String phone;private String email; }
- 监听队列:
package com.qcby.controller;import com.qcby.entity.MsgEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Slf4j @Component @RabbitListener(queues = "fanout_sms_queue") public class smsController {@RabbitHandlerpublic void listen(MsgEntity msgEntity){log.info("email:" + msgEntity);System.out.println(msgEntity);} }
- yml 文件:
spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: boyatopVirtualHost server:port: 8081
- 实体类:
生产者如何获取消费结果:
-
根据业务来定:
- 消费者消费成功结果:能够在数据库中插入一条数据
-
Roketmq 自带全局消息 id,能够根据该全局消息获取消费结果
- 异步返回一个全局 id,前端使用 ajax 定时主动查询
- 在roketmq 中,自带根据消息 id 查询是否消费成功
-
原理:
- 生产者投递消息 MQ 服务器,NQ 服务器端在这时候返回一个全局消息 id ,当消费者消费该消息之后,消费者会给我们 MQ 服务器发送通知,标识该消息消费成功
- 生产者获取到该消息全局 id,每隔 2s 时间调用 MQ 服务器接口查询是否有被消费成功