目录
- 初识MQ
- 一:同步调用
- 二:异步调用
- 三:技术选型
- RabbitMQ
- 一:安装部署
- 二:快速入门
- 三:数据隔离
- java客户端
- 一:快速入门
- 二:workqueues
- 三:Fanout交换机
- 四:Direct交换机
- 五:Topic交换机
- 六:声明队列和交换机
- 1:基于bean声明
- 2:基于注解
- 七:消息转换器
初识MQ
一:同步调用
同步调用,以支付服务为例,支付服务首先就要扣减用户的余额,等待用户余额扣减成功才会更改支付状态,这里必须使用同步,因为只有用户金额扣减完毕才能更改支付状态,支付状态要等待扣减余额成功的消息,所以要同步调用;
而后续的更改订单状态,通知服务,积分服务,不是和支付强相关的,如果同步执行的话因为串行执行所以性能下将,然后增加了代码的耦合性拓展性差,还有就是服务多了,如果有服务出现了问题而又是同步调用很可能出现雪崩,而且就算是使用了服务保护策略如熔断,那就会出现数据的不一致;
二:异步调用
异步调用的三个角色:1:消息发送者:投递消息,也就是调用者
2:消息接收者:接收消息,也就是原来的被调用的服务
3:消息代理:管理暂存转发消息;
异步调用会把消息消息直接发送到消息代理,就不用管了,直接就解除了耦合,拓展性强;
即使消息接收者出现了问题也不会影响到消息发送者,而且即使挂了,消息也会在消息代理存储,直到服务健康,还是会把消息发送出去进行处理,实现了故障隔离。
而且消息发送者只是发布了通知,无需等待回复,所以性能好;
对于高并发的请求,我们可以先将消息缓存到消息代理中,等待请求小的时候慢慢处理,防止对服务器造成巨大的压力;
实现流量的削峰填谷;
异步调用的问题:不能立即获取结果时效性差,不确定下游业务是否执行成功;业务的安全依靠消息代理的可靠性;
如何选择:对时效性要求很高的可以选择使用同步调用,对时效性要求不高并且对性能要求较高的可以使用异步调用;
三:技术选型
RabbitMQ
一:安装部署
使用docker安装:
docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management
然后访问15672是rabbitMQ的控制台端口,5672是消息接发
然后就可以访问15672端口控制台
rabbitMQ中有几个角色:消息的发送者,消息的接收者,消息队列,消息发送者发送消息不是直接到达消息队列,而是通过exchange,交换机,来将消息路由到队列,再有队列转发给接收者;然后我们一个消息中间件可能要多个项目使用,为了将不同的项目隔离开,就像是数据库的database一样,会单独整出来一个虚拟主机;
二:快速入门
交换机只会路由消息不能存储消息;交换机必须绑定对应的队列,才会发送消息给绑定的队列;
三:数据隔离
这里添加用户就可以了
java_115">java客户端
一:快速入门
publisher是发消息:
如何发消息:
1:引入依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2:配置yaml文件
spring:rabbitmq:host: 117.72.106.51 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
3:创建测试类(注意测试类要和main在相同包下)
java">@SpringBootTest
class PublisherApplicationTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid rabbitPublish() {String queueName="simple.queue";String msg="hello,springAMQP";rabbitTemplate.convertAndSend(queueName,msg);}
}
4:接收消息:
java">@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void Lisen(String msg){log.info(msg);}
}
二:workqueues
workqueue任务模型:多个消费者绑定同一个队列;
通过验证我们发现,50条消息是均匀分配的,提前匹配好然后均分好,不论哪个消费者处理的是快还是慢都是均分:
多个消费者的好处:提高消息处理的速度,有点类似于负载均衡;但是均分的话,有的处理的快的消费者的接收能力会被限制
一般我们在项目中只需要定义一个消费者,因为我们部署了集群会有多个实例,每个实例都会一个消费者,从而达到了多消费者处理消息的模式
通过prefetch,每个消费者每次只能获取一条消息,防止提前平均分配浪费性能,这个有点像抢夺消息;
listener:direct:prefetch: 1 #一次只会分配一条消息
三:Fanout交换机
交换机的作用:我们之前直接使用队列进行转发,一个消息只会被一个消费者处理,而我们的服务调用,一个消息可能有多个服务都需要,所以我们引入了交换机,交换机可以将消息转发的多个队列,队列在发送给消费者,相当于一个消息发出复制了多份,而不是只能被一个消费者使用;
交换机有不同的类型如fanout就是广播类型,只要发送消息给fanout交换机他就会吧消息发送给所有的队列
我们之前只用了converandsend直接把消息发到了队列中这里要发送到交换机需要中间加上一个参数null;
四:Direct交换机
direct交换机发送消息的方式是定向的:我们的交换机在绑定队列的时候要指定bingdingkey,然后我们在java中向交换机发送的时候要带上bingdingkey,只发送给对应bindingkey的队列;那如果设置的bindingkey一致就会发送给不同的队列;
五:Topic交换机
topic的bindingkey通过通配符来匹配更编辑,而且拓展性更强
六:声明队列和交换机
1:基于bean声明
java">@Configuration
public class FanoutConfiguration {@Beanpublic Queue queueBuilder1(){return new Queue("fanout.queue1");}@Beanpublic Queue queueBuilder2(){return new Queue("fanout.queue2");}@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}@Beanpublic Binding binding1(){return BindingBuilder.bind(queueBuilder1()).to(fanoutExchange());}@Beanpublic Binding binding2(){return BindingBuilder.bind(queueBuilder2()).to(fanoutExchange());}
}
通过bean来声明,队列返回的类型是Queue,交换机要先确定交换机的类型比如fanout交换机FanoutExchange,然后绑定队列与交换机Binding,使用BindingBuilder的build-to 方法;
2:基于注解
java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"blue","red"}
))
public void Listen3(String msg){log.info("dirct.queue1"+msg);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"yellow","red"}
))
public void Listen4(String msg){log.info("dirct.queue2"+msg);
}
七:消息转换器
我们将复杂的对象发送到队列,rabbit默认会使用java自带的序列话器,序列化出来的对象不便于我们查看,我们可以将默认的序列化器改为json序列化,需要引入jackson的依赖,然后通过bean注入到ioc中;