SpringBoot整合RabbitMQ,生产者
(1)创建maven项目
(2)引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version>
</parent><dependencies><!-- spring的上下文 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></dependency><!-- spring整合amqp插件包 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 单元测试包 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><scope>test</scope></dependency></dependencies>
(3)创建 rabbitmq.properties 配置文件
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
(4)创建 spring-rabbitmq-producer.xml 配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 加载配置文件 --><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq连接工厂 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><!-- 定义管理交换机、队列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 定义持久化队列,不存在则自动创建。不绑定到交换机则绑定到默认交换机。默认交换机类型为direct,名字为:“”,路由键为队列的名称 --><rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/><!-- 广播,所有独立额都能收到消息 --><!-- 定义广播交换机中的持久化队列,不存在则自动创建 --><rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/><rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/><!-- 定义广播交换机,并绑定上述两个队列 --><rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"><rabbit:bindings><rabbit:binding queue="spring_fanout_queue_1"/><rabbit:binding queue="spring_fanout_queue_2"/></rabbit:bindings></rabbit:fanout-exchange><!-- 通配符,*匹配一个单词,#匹配多个单词 --><!-- 定义广播交换机中的持久化队列,不存在则自动创建 --><rabbit:queue id="spring_topic_queue_start" name="spring_topic_queue_start" auto-declare="true"/><rabbit:queue id="spring_topic_queue_swell" name="spring_topic_queue_swell" auto-declare="true"/><rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/><rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"><rabbit:bindings><rabbit:binding pattern="juexing.*" queue="spring_topic_queue_start" /><rabbit:binding pattern="juexing.#" queue="spring_topic_queue_swell"/><rabbit:binding pattern="test.#" queue="spring_topic_queue_well2"/></rabbit:bindings></rabbit:topic-exchange><!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息 --><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
(5)编写测试代码,发送消息
package com.juexing;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testHelloWorld(){rabbitTemplate.convertAndSend("spring_queue", "hello world spring...");}@Testpublic void testFanout(){rabbitTemplate.convertAndSend("spring_fanout_exchange", "","spring_fanout...");}@Testpublic void testTopic(){rabbitTemplate.convertAndSend("spring_topic_exchange", "juexing.erci","spring_testTopic...");}}
(6)项目结构图展示
(7)运行测试代码,查看效果
- 两个交换机
spring_fanout_exchange交换机,绑定了spring_fanout_queue1、spring_fanout_queue2 两个消息队列
spring_topic_exchange交换机,绑定了spring_topic_queue_start、spring_topic_queue_swell、spring_topic_queue_well2
- 6个消息队列,与未消费的消息。
SpringBoot整合RabbitMQ,消费者
(1)创建maven项目
(2)引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version></parent><dependencies><!-- spring的上下文 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></dependency><!-- spring整合amqp插件包 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><!-- 单元测试包 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><scope>test</scope></dependency></dependencies>
(3)创建 rabbitmq.properties 配置文件
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
(4)创建 spring-rabbitmq-consumer.xml 配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 加载配置文件 --><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq连接工厂 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><bean id="springQueueListener" class="com.juexing.listener.SpringQueueListener"/>
<!-- <bean id="fanoutListener1" class="com.juexing.listener"/>-->
<!-- <bean id="fanoutListener2" class="com.juexing.listener"/>-->
<!-- <bean id="topicListenerStart" class="com.juexing.listener"/>-->
<!-- <bean id="topicListenerSwell" class="com.juexing.listener"/>-->
<!-- <bean id="topicListenerWell2" class="com.juexing.listener"/>--><rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"><rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>-->
<!-- <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>-->
<!-- <rabbit:listener ref="topicListenerStart" queue-names="spring_topic_queue_start"/>-->
<!-- <rabbit:listener ref="topicListenerSwell" queue-names="spring_topic_queue_swell"/>-->
<!-- <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>--></rabbit:listener-container></beans>
(5)编写监听类,消费消息
package com.juexing.listener;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;public class SpringQueueListener implements MessageListener {@Overridepublic void onMessage(Message message) {//打印消息System.out.println(new String(message.getBody()));}
}
(6)编写测试类
package com.juexing;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {@Testpublic void testSpringQueue(){}
}
(7)运行测试类,打印消息