交换机
Exchanges 概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
Exchanges 的类型
- 直接:direct 路由模式
- 主题:topic
- 标题:headers(不常用)
- 扇出:fanout 广播模式,发布订阅模式
无名exchange
第一个参数是交换机的名称,空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
绑定bindings
binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。
Fanout模式介绍
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。
代码示例
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;import java.util.Scanner;public class Producer {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明扇出类型交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//向交换机发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());System.out.printf("消息:%s发送成功!",message);}}
}
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println(msg);};channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});}
}
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println(msg);};channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});}
}
Direct模式介绍
Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct(直接) 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去
- 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green
- 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 black/green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
多重绑定
如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多
代码实战
package com.vmware.rabbit.demo6;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;
import java.util.UUID;public class Producer {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.232");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由模式交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//向交换机发送消息Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String message = UUID.randomUUID().toString();String routingKey = scanner.next();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.printf("发送消息:%s成功!RoutingKey:%s\n",message,routingKey);}}
}
package com.vmware.rabbit.demo6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"error");channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});}
}
package com.vmware.rabbit.demo6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"info");channel.queueBind(queueName,EXCHANGE_NAME,"warn");channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});}
}
Topics主题模式介绍
存在的问题:尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型
Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节
-
*星号可以代替一个单词
-
#井号可以替代零个或多个单词
-
当一个队列绑定键是#那么这个队列将接收所有数据,就有点像fanout了
-
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了
代码实现
package com.vmware.rabbit.demo7;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;import java.util.HashMap;
import java.util.Map;public class Producer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();System.out.println("连接RabbitMQ服务器成功!");Channel channel = connection.createChannel();//声明主题模式交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);System.out.println("交换机创建成功!");Thread.sleep(15*1000);//发布消息HashMap<String,String> msgMap = new HashMap<>();msgMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");msgMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");msgMap.put("quick.orange.fox","被队列 Q1 接收到");msgMap.put("lazy.brown.fox","被队列 Q2 接收到");msgMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");msgMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");msgMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");msgMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for (Map.Entry<String, String> entry : msgMap.entrySet()) {channel.basicPublish(EXCHANGE_NAME,entry.getKey(),null,entry.getValue().getBytes("UTF-8"));}}
}
package com.vmware.rabbit.demo7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "topic_logs";private static final String QUEUE_NAME = "Q1";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定交换机和队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");DeliverCallback deliverCallback = (tag,msg)->{String message = new String(msg.getBody());System.out.println(message+"\tRouting Key:"+msg.getEnvelope().getRoutingKey());};CancelCallback cancelCallback = (tag)->{};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
package com.vmware.rabbit.demo7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "topic_logs";private static final String QUEUE_NAME = "Q2";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定交换机和队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");DeliverCallback deliverCallback = (tag,msg)->{String message = new String(msg.getBody());System.out.println(message+"Routing Key:"+msg.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=(tag)->{};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}