相信自己,终会成功
目录
主流MQ产品
1.kafaka
2.RocketMQ
3.RabbitMQ
在xshell上安装RabbitMQ
RabbitMQ七种工作模式
1.简单模式
编辑
2.工作队列模式
3.发布/订阅模式
4.路由模式
5.通配符模式
6.RPC模式
AMQP.BasicProperties 设置消息属性的类
7.发布确认模式
代码展示(生产者)
常量命名规范
连接工厂 (ConnectionFactory)
Channel(通道)
channel.queueDeclare 声明队列
channel.basicPublish 发送消息
channel.exchangeDeclare 声明创建交换机
channel.queueBind() 将队列绑定到交换机
channel.basicQos设置消费者预取限制
channel.basicAck 手动确认消息
代码展示(消费者)
DefaultConsumer:
handleDelivery 方法:
主流MQ产品
1.kafaka
- 特点:高吞吐量、分布式、持久化、支持分区和副本。
- 适用场景:日志收集、流处理、实时数据分析等大数据场景。
- 优势:高吞吐量和低延迟,适合处理大量数据。
- 缺点:配置复杂,对小型项目可能过于重量级。
2.RocketMQ
- 特点:分布式、高吞吐量、低延迟、支持事务消息。
- 适用场景:电商、金融等需要高可靠性和事务支持的场景。
- 优势:支持事务消息,适合金融等高可靠性要求的场景。
- 缺点:社区相对较小,文档和资源不如Kafka丰富。
3.RabbitMQ
- 特点:轻量级、支持多种消息协议、易于使用和部署。
- 适用场景:中小型项目、需要快速上手的场景。
- 优势:易于使用,支持多种消息协议,社区活跃。
- 缺点:在大规模高并发场景下性能不如Kafka和RocketMQ。
在xshell上安装RabbitMQ
1.更新xshell中最新的软件包列表
sudo apt-get update
2.安装erlang
sudo apt-get install erlang
输入erl,出现下图内容表示安装成功 输入halt().退出即可
3.安装rabbitmq
sudo apt-get install rabbitmq-server
4.确认安装结果
systemctl status rabbitmq-server
显示running即可
5.安装RabbitMQ管理界面
rabbitmq-plugins enable rabbiting_management
6.启动服务
sudo service rabbitmq-server start
在浏览器中输入自己云服务器的端口号+15672即可访问页面
添加用户名和密码
rabbitmqctl add_user 用户名 密码
给用户权限
rabbitmqctl set_user_tags 用户名 权限等级
RabbitMQ七种工作模式
P:生产者 C:消费者
queue:队列 X:交换机
在使用绑定的时候为 BindingKey
在发送消息的时候为RoutingKey
官方网站:RabbitMQ Tutorials | RabbitMQ
建立连接需要的信息:
1.IP 2.端口号 3.账号 4.密码 5. 虚拟主机
消费者代码:
1.创建连接 2.创建Channel 3.声明一个队列Queue 4.消费信息 5.释放资源
1.简单模式
一个生产者,一个消费者,点到点模式
2.工作队列模式
一个生产者,多个消费者
假如有十条队列消息,C1和C2是共同消费这10条消息,消息不会重复消费
3.发布/订阅模式
4.路由模式
订阅模式的变化形式
5.通配符模式
6.RPC模式
AMQP.BasicProperties 设置消息属性的类
属性名 | 类型 | 说明 |
---|---|---|
contentType | String | 消息内容的 MIME 类型(如 text/plain 、application/json )。 |
contentEncoding | String | 消息内容的编码方式(如 UTF-8 )。 |
headers | Map<String, Object> | 自定义消息头,用于传递额外信息。 |
deliveryMode | Integer | 消息的持久化模式:1 (非持久化)或 2 (持久化)。 |
priority | Integer | 消息的优先级(0 到 9,数值越大优先级越高)。 |
correlationId | String | 关联 ID,通常用于 RPC 模式,匹配请求和响应。 |
replyTo | String | 响应队列的名称,通常用于 RPC 模式。 |
expiration | String | 消息的过期时间(以毫秒为单位的字符串)。 |
messageId | String | 消息的唯一标识符。 |
timestamp | Date | 消息的时间戳。 |
type | String | 消息的类型标识符。 |
userId | String | 用户 ID,用于验证消息的发送者。 |
appId | String | 应用程序 ID,用于标识发送消息的应用程序。 |
// AMQP.BasicProperties 是一个不可变类,因此需要通过其内部类 Builder 来创建对象。AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();
客户端:
生成唯一的
correlationId
。设置
replyTo
为响应队列的名称。发送消息到请求队列(
Constants.RPC_REQUEST_QUEUE
)。监听响应队列(
Constants.RPC_RESPONSE_QUEUE
),等待服务器返回结果。服务器:
监听请求队列(
Constants.RPC_REQUEST_QUEUE
)。处理请求后,将结果发送到客户端指定的响应队列(
replyTo
)。在响应消息中设置与请求相同的
correlationId
。客户端匹配响应:
收到响应后,根据
correlationId
匹配对应的请求。
7.发布确认模式
是RabbitMQ消息可靠性保证的关键
代码展示(生产者)
下图的代码Constants是自己写的 Java 常量
常量命名规范
1.常量名使用 全大写字母,并用下划线
_
分隔单词(如VIRTUALHOST
和WORK_QUEUE
)。2.这是 Java 中的命名约定,便于区分常量和变量。
//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); //云服务器的IP地址connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUALHOST); //虚拟主机//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机 使用内置的交换机
连接工厂 (ConnectionFactory
)
是一个设计模式中的“工厂类”,它的目的是隐藏创建连接的复杂细节(比如网络配置、认证信息等)。你可以通过这个工厂对象预先设置连接参数(如服务器地址、端口、用户名、密码等),然后通过它来生成具体的连接对象
Channel(通道)
通道 是建立在连接(Connection
)之上的一个轻量级逻辑连接。一个连接(Connection
)可以创建多个通道,每个通道可以独立地发送和接收消息。通道的设计是为了复用连接,避免频繁创建和销毁连接的开销。创建通道后,通常会用它来声明队列、发送消息或消费消息
channel.queueDeclare 声明队列
//4.声明队列channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String, Object> arguments)* 参数说明:* queue: 队列名称* durable: 可持久化* exclusive: 是否独占* autoDelete: 是否自动删除* arguments: 参数*/
channel.basicPublish 发送消息
//5. 发送消息/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 参数说明:* exchange: 交换机名称* routingKey: 内置交换机, routingkey和队列名称保持一致* props: 属性配置* body: 消息*/for (int i = 0; i < 10; i++) {String msg = "hello rabbitmq~"+i;channel.basicPublish("","hello", null, msg.getBytes());}
//6. 资源释放channel.close();connection.close();
channel.exchangeDeclare 声明创建交换机
Exchange.DeclareOk exchangeDeclare(String exchange, // 交换机名称String type, // 交换机类型(direct、fanout、topic、headers)boolean durable, // 是否持久化boolean autoDelete, // 是否自动删除boolean internal, // 是否内部交换机Map<String, Object> arguments // 额外参数
) throws IOException;
channel.queueBind()
将队列绑定到交换机
-
queue
:队列名称(如Constants.PUBLISH_QUEUE1
)。 -
exchange
:交换机名称(如Constants.PUBLISH_EXCHANGE
)。 -
routingKey
:路由键(如""
或"key1"
)
channel.queueBind(Constants.PUBLISH_QUEUE1, Constants.PUBLISH_EXCHANGE, "");
channel.queueBind(Constants.PUBLISH_QUEUE2, Constants.PUBLISH_EXCHANGE, "");
//作用:将 Constants.PUBLISH_QUEUE1 和 Constants.PUBLISH_QUEUE2 绑定到 //Constants.PUBLISH_EXCHANGE。
//路由键为空字符串(""),表示所有消息都会被路由到这两个队列(前提是交换机类型支持)。
channel.basicQos设置消费者预取限制
参数名 | 类型 | 说明 |
---|---|---|
prefetchSize | int | 预取消息的总大小(以字节为单位),通常设置为 0 表示不限制大小。 |
prefetchCount | int | 预取消息的数量限制(即未确认消息的最大数量)。 |
global | boolean | 是否全局生效:true 表示对整个 Channel 生效,false 表示对每个消费者生效。 |
// 设置每个消费者最多预取 10 条未确认消息
channel.basicQos(10);// 设置整个 Channel 最多预取 100 条未确认消息
channel.basicQos(100, true);// 设置预取消息的总大小不超过 1MB,数量不超过 10 条
channel.basicQos(1024 * 1024, 10, false);
channel.basicAck 手动确认消息
参数名 | 类型 | 说明 |
---|---|---|
deliveryTag | long | 消息的唯一标识符,由 RabbitMQ 分配。 |
multiple | boolean | 是否批量确认:true 表示确认所有比 deliveryTag 小的消息;false 表示仅确认当前消息。 |
使用场景
手动确认模式:当消费者从队列中拉取消息时,如果启用了手动确认模式(
autoAck=false
),则必须调用basicAck
来确认消息。确保消息可靠性:通过手动确认,可以确保消息在处理成功后才会从队列中删除,避免消息丢失。
批量确认:通过设置
multiple=true
,可以一次性确认多条消息,提高效率。
代码展示(消费者)
public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);6. 资源释放channel.close();connection.close();}
DefaultConsumer
:
RabbitMQ 提供的默认消费者类。
channel
:消费者绑定的通道(Channel)。
匿名内部类:通过 new DefaultConsumer(channel) { ... }
创建一个匿名内部类,并写 handleDelivery
方法。
handleDelivery
方法:
当队列中有消息时,RabbitMQ 会调用此方法将消息传递给消费者
RabbitMQ 支持两种消息确认模式:
-
自动确认:
-
在
basicConsume
方法中将第二个参数设置为true
。 -
消费者接收到消息后,RabbitMQ 会自动将消息标记为已处理。
-
示例:
channel.basicConsume(QUEUE_NAME, true, consumer);
-
-
手动确认:
-
在
basicConsume
方法中将第二个参数设置为false
。 -
需要在
handleDelivery
方法中手动调用channel.basicAck()
确认消息。 -
示例:
channel.basicAck(envelope.getDeliveryTag(), false);
-