1.在配置文件中配置mq的url,端口号,用户名,密码
2.读取配置文件,并获取mq的connection
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public static Connection getConnection() {
if(connection == null) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RABBIT_HOST);
connectionFactory.setPort(Integer.parseInt(RABBIT_PORT));
connectionFactory.setUsername(RABBIT_USERNAME);
connectionFactory.setPassword(RABBIT_PASSWORD);
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
3.通过connection换取channel,并生产和消费信息
Channel channel = null;
try {
//获取连接
Connection connection = MqConnectionUtil.getConnection();
//创建通道
channel = connection.createChannel();
生产者
channel.exchangeDeclare("log",BuiltinExchangeType.DIRECT); 创建交换机,采用路由模式direct channel.queueBind("queue1","log","the1"); channel.queueBind("queue2","log","the2"); 将交换机与队列绑定 channel.basicPublish("log","the1",null,"第一条发送".getBytes(StandardCharsets.UTF_8)); channel.basicPublish("log","the2",null,"第二条发送".getBytes(StandardCharsets.UTF_8));
消费者
channel.queueDeclare("queue1", true, false, false, null); channel.queueDeclare("queue2", true, false, false, null); 由消费者创建队列//4.开启监听Queue DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("接收到消息-"+body);} };
5.关闭连接
-
channel.close();
-
connection.close();