本节演示点对点模式的消息发送的 Spring + ActiveMQ 代码。
Spring + ActiveMQ 整合
1、依懒包
spring:4.2.5.RELEASE,activemq-all:5.15.0
<dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><version>3.0.1</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.0</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.2.5.RELEASE</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.0</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency>
2、工程介绍
producer 生产者工程
consumer 消费者工程
3、整合关键类 JmsTemplate
Spring 官方提供了一个 JmsTemplate 的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory-ref 和 defaultDestination-ref 正好对应 JMS 中的 ConnectionFactory 和 Destination。
点对点(point to point)消息发送
1、生产者代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!-- 创建一个ConnectionFactory,为了提升性能用了连接池 --><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50" /></bean><!-- 创建消息目的地,constructor-arg是目的地的名称,此处为spring-queue --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-queue" /></bean><!-- 构建JmsTemplate --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="defaultDestination" ref="destination" /><property name="messageConverter"><beanclass="org.springframework.jms.support.converter.SimpleMessageConverter" /></property></bean></beans>
生产者关键代码:SpringMessageSender
package producer;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;/*** 点对点(point to point)消息发送,spring整合* * @author JPM*/
public class SpringMessageSender {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/springContext-activemq.xml");JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");jmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("hello,spring-queue!");return message;}});context.close();}
}
运行 SpringMessageSender 类,查看 ActiveMQ 管理界面
说明消息已经发送到了 spring-queue 中。
2、消费者代码(receive方法获取消息)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50" /></bean><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-queue" /></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="defaultDestination" ref="destination" /><property name="messageConverter"><beanclass="org.springframework.jms.support.converter.SimpleMessageConverter" /></property></bean></beans>
消费者关键代码:SpringMessageReceiver
package consumer;import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;/*** 点对点(point to point)消息接收,spring整合* * @author JPM*/
public class SpringMessageReceiver {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/springContext-activemq.xml");JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");String message = (String) jmsTemplate.receiveAndConvert();System.out.println(message);context.close();}
}
运行 SpringMessageReceiver 类,查看控制台和 ActiveMQ 管理界面
说明消费者读取到了消息,并打印到控制台显示。
3、消费者代码(使用消息监听器获取消息)
使用刚才的生产者,再次发送一条消息
spring 关键代码:springContext-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50" /></bean><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-queue" /></bean><bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="destination" /><property name="messageListener" ref="messageListener" /></bean><bean id="messageListener" class="consumer.SpringMessageListener" /></beans>
消费者关键代码:SpringMessageListener
java">package consumer;import java.io.IOException;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 点对点(point to point)消息接收,spring整合,使用Listener* * @author JPM*/
public class SpringMessageListener implements MessageListener {public void onMessage(Message message) {String msg = null;try {msg = ((TextMessage) message).getText();} catch (JMSException e) {e.printStackTrace();}System.out.println(msg);}public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/springContext-activemq.xml");try {System.in.read();} catch (IOException e) {e.printStackTrace();}context.close();}}
运行 SpringMessageListener 类,查看控制台和 ActiveMQ 管理界面
说明消费者读取到了消息,并打印到控制台显示。
使用 Listener 和 receive方法的区别在于 Listener 会一直运行,主动监听消息的变化,及时消费。
Listener 验证,再次运行生产者,观察 ActiveMQ 管理界面