rabbitmq官网地址:https://www.rabbitmq.com/tutorials
下面介绍rabbitmq官网中七种使用方式在spring boot中如何使用
下面是基于 Spring Boot 使用 RabbitMQ 实现这七种模式的示例代码。假设已经引入了以下依赖:
Maven 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. Hello World! 模式
生产者和消费者直接发送和接收消息。
配置类
@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "hello";@Beanpublic Queue helloQueue() {return new Queue(QUEUE_NAME);}
}
生产者
@RestController
@RequestMapping("/hello")
public class HelloProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "Hello RabbitMQ!");return "Message sent!";}
}
消费者
@Component
public class HelloConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receive(String message) {System.out.println("Received: " + message);}
}
2. Work Queues(工作队列)
多个消费者从同一个队列中获取任务,进行任务分发。
配置类
@Configuration
public class WorkQueueConfig {public static final String WORK_QUEUE = "work_queue";@Beanpublic Queue workQueue() {return new Queue(WORK_QUEUE);}
}
生产者
@RestController
@RequestMapping("/work")
public class WorkQueueProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send/{msg}")public String send(@PathVariable String msg) {rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, msg);return "Work message sent!";}
}
消费者(多个)
@Component
public class WorkConsumer {@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)public void receive(String message) throws InterruptedException {System.out.println("Worker received: " + message);Thread.sleep(1000); // 模拟任务耗时}
}
3. Publish/Subscribe(发布/订阅)
使用 Fanout Exchange 实现广播。
配置类
@Configuration
public class FanoutConfig {public static final String FANOUT_EXCHANGE = "fanout_exchange";@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Queue queue1() {return new Queue("fanout.queue1");}@Beanpublic Queue queue2() {return new Queue("fanout.queue2");}@Beanpublic Binding binding1() {return BindingBuilder.bind(queue1()).to(fanoutExchange());}@Beanpublic Binding binding2() {return BindingBuilder.bind(queue2()).to(fanoutExchange());}
}
生产者
@RestController
@RequestMapping("/fanout")
public class FanoutProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, "", "Fanout message!");return "Fanout message sent!";}
}
4. Routing(路由模式)
使用 Direct Exchange 和路由键实现定向投递。
配置类
@Configuration
public class DirectConfig {public static final String DIRECT_EXCHANGE = "direct_exchange";@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}@Beanpublic Queue errorQueue() {return new Queue("direct.error");}@Beanpublic Queue infoQueue() {return new Queue("direct.info");}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");}@Beanpublic Binding infoBinding() {return BindingBuilder.bind(infoQueue()).to(directExchange()).with("info");}
}
5. Topics(主题模式)
使用 Topic Exchange 实现多级路由。
配置类
@Configuration
public class TopicConfig {public static final String TOPIC_EXCHANGE = "topic_exchange";@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Queue topicQueue1() {return new Queue("topic.queue1");}@Beanpublic Queue topicQueue2() {return new Queue("topic.queue2");}@Beanpublic Binding binding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.error");}@Beanpublic Binding binding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("log.#");}
}
6. RPC(远程过程调用)
生产者发送请求,消费者处理后返回响应。
RPC 服务端
@Component
public class RpcServer {@RabbitListener(queues = "rpc_queue")public String process(String message) {return "Processed: " + message;}
}
RPC 客户端
@RestController
@RequestMapping("/rpc")
public class RpcClient {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send/{msg}")public String send(@PathVariable String msg) {Object response = rabbitTemplate.convertSendAndReceive("rpc_queue", msg);return "RPC response: " + response;}
}
7. Publisher Confirms(发布者确认)
确保消息成功发送到 RabbitMQ 服务器。
配置类
@Configuration
public class ConfirmConfig {@Beanpublic CachingConnectionFactory connectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory("localhost");factory.setPublisherConfirms(true);return factory;}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message confirmed");} else {System.err.println("Message failed: " + cause);}});return template;}
}