1 认识RabbitMq
RabbitMQ是⼀个消息中间件,也是⼀个生产者消费者模型,它负责接收,存储并转发消息。
2.1 Producer和Consumer
Producer:生产者,是RabbitMQServer的客户端,向RabbitMQ发送消息
Consumer:消费者,也是RabbitMQServer的客户端,从RabbitMQ接收消息
Broker:其实就是RabbitMQServer,主要是接收和收发消息
如图:
2.2 Connection和Channel
Connection:连接,是客户端和RabbitMQ服务器之间的⼀个TCP连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息。
Channel:通道,信道。Channel是在Connection之上的⼀个抽象层。在RabbitMQ中,⼀个TCP连接可以有多个Channel,每个Channel都是独⽴的虚拟连接。消息的发送和接收都是基于Channel的,通道的主要作用是将消息的读写操作复用到同⼀个TCP连接上,这样可以减少建立和关闭连接的开销,提高性能。
如图:
2.3 Virtualhost
Virtualhost:虚拟主机,这是⼀个虚拟概念。它为消息队列提供了⼀种逻辑上的隔离机制,对于 RabbitMQ而言,⼀个BrokerServer上可以存在多个VirtualHost。当多个不同的用户使用同一个 RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
2.4 Queue
Queue:队列,是RabbitMQ的内部对象,用于存储消息。
如图:
2.5 Exchange
Exchange:交换机,message到达broker的第⼀站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到⼀个或多个Queue列中
如图:
RabbitMq的使用
在这里我是在maven项目中,举例使用rabbitmq。
在这里我们创建好maven项目之后呢,需要先添加依赖。
如代码:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
生产者代码编写步骤:
1. 建立连接
2. 开启信道
3. 声明交换机
4. 声明队列
5. 发送消息
6. 资源释放
消费者代码步骤:
1. 创建连接
2. 创建Channel
3. 声明一个队列
4. 消费信息
5. 释放资源
代码编写
定义常量类中的代码:
public static final String HOST = "47.108.157.13";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";
生产者代码:
public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.108.157.13");connectionFactory.setPort(5672); //需要提前开放端口号connectionFactory.setUsername("study");//账号connectionFactory.setPassword("study"); //密码connectionFactory.setVirtualHost("bite"); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机 使用内置的交换机//4. 声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String, Object> arguments)* 参数说明:* queue: 队列名称* durable: 可持久化* exclusive: 是否独占* autoDelete: 是否自动删除* arguments: 参数*/channel.queueDeclare("hello", true, false, false, null);//5. 发送消息/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 参数说明:* exchange: 交换机名称* routingKey: 内置交换机, routingkey和队列名称保持一致* props: 属性配置* body: 消息*/for (int i = 0; i < 10; i++) {String msg = "hello rabbitmq~"+i;channel.basicPublish("","hello", null, msg.getBytes());}System.out.println("消息发送成功~");//6. 资源释放channel.close();connection.close();}
消费者代码:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 创建连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.108.157.13");connectionFactory.setPort(5672);connectionFactory.setUsername("study");connectionFactory.setPassword("study");connectionFactory.setVirtualHost("bite");Connection connection = connectionFactory.newConnection();//2. 创建ChannelChannel channel = connection.createChannel();//3. 声明队列(可以省略)channel.queueDeclare("hello",true, false, false, null);//4. 消费消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数说明:* queue: 队列名称* autoAck: 是否自动确认* callback: 接收到消息后, 执行的逻辑*/DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume("hello", true, consumer);//等待程序执行完成Thread.sleep(2000);//5. 释放资源channel.close();connection.close();}