目录:
- 初识MQ
- RabbitMQ快速入门
- SpringAMQP
初始MQ:
- 同步通讯
- 异步通讯
- MQ常见框架
初始MQ–同步通讯的优缺点
同步通讯和异步通讯
同步通讯:比如微信视频,同一时间只能跟一个人视频,其他人想跟你视频的话,得等你这个视频结束之后才可以
异步通信:比如微信发消息,发了一个人后,别人可能还没回你,但你还可以给其他人发
同步调用的问题
微服务间基于Feign的调用就属于同步方法,存在一些问题。
1.耦合度高
- 每次加入新的需求,都要修改原来的代码
2.性能下降
- 调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
3.资源浪费
- 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
4.级联失败
- 如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺排骨一样,迅速导致整个微服务群故障。
总结
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
初始MQ–异步通讯的优缺点
异步调用常见实现就是事件驱动模式
事件驱动优势
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量削峰
小结
异步通信的优点
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
异步通信的缺点:
- 依赖于Broken的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
初始MQ–mq常见技术介绍
什么是MQ
MQ,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabiit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWrite,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟区 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
kafka适用于海量数据的传输,但对数据安全要求不高的,比如说日志数据
中小型企业推荐使用 RabbitMQ
如果是大型企业,需要做更深度的定制,推荐使用RocketMQ
RabbitMQ快速入门–介绍何安装
RabbitMQ概述
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:http://www.rabbitmq.com/
RabbitMQ部署指南
1.单机部署
在Centos7虚拟机中使用Docker来安装
1.1下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式一:从本地加载
从资料中提供的镜相包上传到虚拟机后,使用命令加载镜像即可:
docker load -i mq.tar
1.2安装MQ
执行下面的命令来运行MQ容器
docker run \-e RABBITMQ_DEFAULT_USR=luxifa \ #设置环境变量-e RABBITMQ_DEFAULT_PASSWORD=1234 \ #设置环境变量--name mq \ #起名--hostname mql \ #配置主机名,不配也没关系,如果将来做集群部署的话,就得配--p 15672:15672 \ # 开放了两个端口 15672是rabbitmq 管理平台的端口--p 5672:5672 \ # 做消息通信的端口-d \ # 后台运行rabbitmq:3-management # 镜像的名称
把命令在控制台运行,容器即可创建完成
通过 docker ps 查看一下
在浏览器输入:http://机器IP:15672 即可进入RabblitMQ的管理界面
界面中:
Overview:总览,主要就是节点的一些详细信息,因为现在是单节点运行,没有集群,所以节点中只有一个
Connections:连接,将来,无论是消息的发布者还是消息的消费者,都要跟mq建立连接
Channels:通道,建立连接以后一定要建立通道,生产者或消费者才能基于channels完成消息的发送或接收,可以认为channe是mq中消息发送也好,接收也好等等各种操作的一个对象,将来每一个连上的人都应该创建一个或多个channel
Exchanges:交换机
Queues:队列,用来做消息存储的
Admin:管理,在这个界面中可以管理当前的用户信息
RabbitMQ的结构和概念
发布者会将消息发送到exchange,exchange负责路由,把消息投递到queue,queue负责暂存消息,消费者从queue中获取消息,处理消息
小结
RabbitMQ中的几个概念:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
RabbitMQ快速入门–消息模型介绍
常见消息模型
MQ的官方文档中(http://rabbitmq.com/getstarted.html)给出了5个MQ的Demo示例,对应了几种不同的用放大:
- 基本消息队列(BasicQueue)
- 工作消息队列(WorkQueue)
这两种都是基于消息队列来完成的,没有经过交换机
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
HelloWorld案例
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publicsher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
案例:完成官方Demo中的hello world 案例
实现步骤:
- 导入资料中的demo工程
- 运行publisher服务中的测试类PublisherTest中的测试方法testSendMessage()
- 查看RabibitMQ控制台的消息
- 启动consumer服务,查看是否能接收消息
PublisherTest.java
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1设置连接参数 分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("luxifa");factory.setPassword("123456");// 1.2建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,durable:false,exclusive:false,qutoDelete:false,arguments:null);// 4.发送消息String message = "hello rabbitmq!";channel.basicPublish(exchange:"",queueName,props:null,message.getBytes());System.out.println("发送消息成功:【" + message + "】");//5.关闭通道和连接channel.close();connection.close(); }
}
ConsumerTest.java
public class ConsumerTest{@Testpublic static void main(Strign[] args) throws IOException,TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1设置连接参数 分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("luxifa");factory.setPassword("123456");// 1.2建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,durable:false,exclusive:false,qutoDelete:false,arguments:null);// 4.订阅消息channel.basicConsume(queueName,autoAck:true,new DefaultConsumer(channel) {@Ovrridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息......");}
}
小结:
基本消息队列的消息发送流程:
1.建议connection
2.创建channel
3.利用channer声明队列
4.利用channel向队列发送消息
基本消息队列的消息接收流程:
1.建议connection
2.创建channel
3.利用channer声明队列
4.定义consumer的消费行为handleDelevery()
5.利用channel将消费者与队列绑定