java微服务中消息队列处理中间件基础语法学习,零基础学习

embedded/2025/1/21 16:05:52/

在 Java 微服务中,消息队列处理中间件可以帮助实现服务之间的异步通信、解耦和负载均衡。常用的 Java 消息队列工具包括 RabbitMQ、Apache Kafka 和 ActiveMQ。下面我将详细介绍这些消息队列工具在 Java 中的基础语法和使用方法。

1. RabbitMQ

RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种协议(AMQP、MQTT、STOMP 等)。我们可以使用 Spring AMQP 来简化 RabbitMQ 的集成。

1.1 安装 RabbitMQ
  • 在 Ubuntu 上安装 RabbitMQ

bash

sudo apt-get update
sudo apt-get install rabbitmq-server
  • 启动 RabbitMQ 服务

bash

sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
  • 启用管理插件

bash

sudo rabbitmq-plugins enable rabbitmq_management
  • 访问管理界面:打开浏览器,访问 http://<your_server_ip>:15672,默认用户名和密码为 guest

1.2 基本概念
  • Exchange: 消息的路由中心。
  • Queue: 存储消息的地方。
  • Binding: 将 Exchange 和 Queue 关联起来。
  • Routing Key: 路由键,用于匹配 Binding。
  • Producer: 发送消息的应用程序。
  • Consumer: 接收消息的应用程序。
1.3 示例代码
1.3.1 创建 Spring Boot 项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,并添加以下依赖:

  • Spring Web
  • Spring AMQP
  • Lombok (可选)
1.3.2 配置 RabbitMQ

application.yml 中配置 RabbitMQ 连接信息:

yaml

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
1.3.3 生产者

创建一个生产者类来发送消息:

RabbitMQSender.java

java">package com.example.demo.rabbitmq;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RabbitMQSender {@Autowiredprivate AmqpTemplate amqpTemplate;public void send(String message) {amqpTemplate.convertAndSend("my_queue", message);System.out.println("Sent message = " + message);}
}
1.3.4 消费者

创建一个消费者类来接收消息:

RabbitMQReceiver.java

java">package com.example.demo.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQReceiver {@RabbitListener(queues = "my_queue")public void receiveMessage(String message) {log.info("Received message = {}", message);}
}
1.3.5 控制器

创建一个控制器来触发消息发送:

RabbitMQController.java

java">package com.example.demo.controller;import com.example.demo.rabbitmq.RabbitMQSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class RabbitMQController {@Autowiredprivate RabbitMQSender rabbitMQSender;@GetMapping("/send-rabbitmq-message")public String sendMessage(@RequestParam String message) {rabbitMQSender.send(message);return "Message sent successfully";}
}
2. Apache Kafka

Apache Kafka 是一个分布式流处理平台,常用于实时数据管道和流应用程序。我们可以使用 Spring Kafka 来简化 Kafka 的集成。

2.1 安装 Kafka
  • 下载并解压 Kafka

bash

wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
  • 启动 Zookeeper:

bash

bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动 Kafka 服务器

bash

bin/kafka-server-start.sh config/server.properties
2.2 基本概念
  • Broker: Kafka 集群中的节点。
  • Topic: 数据分类的主题。
  • Partition: Topic 的分区。
  • Producer: 发送消息的应用程序。
  • Consumer: 接收消息的应用程序。
  • Consumer Group: 消费者的组。
2.3 示例代码
2.3.1 创建 Spring Boot 项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,并添加以下依赖:

  • Spring Web
  • Spring for Apache Kafka
  • Lombok (可选)
2.3.2 配置 Kafka

application.yml 中配置 Kafka 连接信息:

yaml

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.3.3 创建 Topic

在 Kafka 中创建一个主题 test-topic:

bash

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
2.3.4 生产者

创建一个生产者类来发送消息:

KafkaProducer.java

java">package com.example.demo.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test-topic", message);System.out.println("Sent message = " + message);}
}
2.3.5 消费者

创建一个消费者类来接收消息:

KafkaConsumer.java

java">package com.example.demo.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "my-group")public void consume(String message) {log.info("Consumed message = {}", message);}
}
2.3.6 控制器

创建一个控制器来触发消息发送:

KafkaController.java

java">package com.example.demo.controller;import com.example.demo.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/send-kafka-message")public String sendMessage(@RequestParam String message) {kafkaProducer.sendMessage(message);return "Message sent successfully";}
}

3. ActiveMQ

ActiveMQ 是另一个流行的开源消息代理,支持 JMS 协议。我们可以使用 Spring JMS 来简化 ActiveMQ 的集成。

3.1 安装 ActiveMQ
  • 下载并解压 ActiveMQ

bash

wget https://downloads.apache.org/activemq/5.16.5/apache-activemq-5.16.5-bin.tar.gz
tar -xzf apache-activemq-5.16.5-bin.tar.gz
cd apache-activemq-5.16.5
  • 启动 ActiveMQ 服务:
bin/activemq start
  • 访问管理界面:

     

    打开浏览器,访问 http://<your_server_ip>:8161/admin,默认用户名和密码为 adminadmin

3.2 基本概念
  • Broker: 消息代理。
  • Queue: 存储消息的地方。
  • Topic: 订阅模式的消息通道。
  • Producer: 发送消息的应用程序。
  • Consumer: 接收消息的应用程序。
3.3 示例代码
3.3.1 创建 Spring Boot 项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,并添加以下依赖:

  • Spring Web
  • Spring for Apache ActiveMQ
  • Lombok (可选)
3.3.2 配置 ActiveMQ

application.yml 中配置 ActiveMQ 连接信息:

yaml

spring:activemq:broker-url: tcp://localhost:61616user: adminpassword: admin
3.3.3 生产者

创建一个生产者类来发送消息:

ActiveMQProducer.java

java">package com.example.demo.activemq;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;@Service
public class ActiveMQProducer {@Autowiredprivate JmsTemplate jmsTemplate;public void sendMessage(String message) {jmsTemplate.convertAndSend("my_queue", message);System.out.println("Sent message = " + message);}
}
3.3.4 消费者

创建一个消费者类来接收消息:

ActiveMQConsumer.java

java">package com.example.demo.activemq;import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ActiveMQConsumer {@JmsListener(destination = "my_queue")public void receiveMessage(String message) {log.info("Received message = {}", message);}
}
3.3.5 控制器

创建一个控制器来触发消息发送:

ActiveMQController.java

java">package com.example.demo.controller;import com.example.demo.activemq.ActiveMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ActiveMQController {@Autowiredprivate ActiveMQProducer activeMQProducer;@GetMapping("/send-activemq-message")public String sendMessage(@RequestParam String message) {activeMQProducer.sendMessage(message);return "Message sent successfully";}
}

4.RabbitMQ、Apache Kafka 和 ActiveMQ 在微服务架构中作为消息队列处理工具的优缺点对比。

1. RabbitMQ
优点
  • 成熟稳定: 已经存在很长时间,社区支持强大。
  • 丰富的协议支持: 支持多种协议(AMQP、MQTT、STOMP 等),灵活性高。
  • 可靠的消息传递: 提供持久化和确认机制,确保消息不丢失。
  • 易于管理: 提供强大的管理界面和监控工具。
  • 广泛使用: 社区活跃,文档丰富。
缺点
  • 性能限制: 相比 Kafka,在大规模数据处理方面可能稍逊一筹。
  • 复杂性: 配置和管理相对复杂,尤其是在高级场景下。
  • 许可证: 使用 Erlang VM,可能会增加一些学习成本。
2. Apache Kafka
优点
  • 高性能: 设计用于高吞吐量和低延迟,适合大规模数据处理。
  • 可扩展性强: 容易水平扩展,支持多副本和分区。
  • 持久化能力强: 默认情况下所有消息都会被持久化到磁盘。
  • 容错性好: 内置了复制机制,确保数据安全。
  • 流处理能力: 内置流处理功能,支持实时数据处理。
缺点
  • 配置复杂: 初次配置和调优较为复杂。
  • 资源消耗大: 需要较多的内存和磁盘空间。
  • 依赖 Zookeeper: 需要额外部署和维护 Zookeeper。
  • 学习曲线陡峭: 对于新手来说,理解其内部工作原理需要一定时间。
3. ActiveMQ
优点
  • 成熟的 JMS 实现: 符合 JMS 规范,适用于 Java 应用程序。
  • 支持多种传输协议: 支持 TCP、UDP、HTTP 等多种传输协议。
  • 广泛的集成: 与 Spring 等框架无缝集成。
  • 易于配置: 相对简单,适合中小型项目。
  • 灵活的消息模式: 支持点对点(Queue)和发布/订阅(Topic)两种模式。
缺点
  • 性能较低: 相比 RabbitMQ 和 Kafka,在高吞吐量场景下性能较差。
  • 缺乏现代特性: 不如 Kafka 新颖,缺少一些现代流处理特性。
  • 社区活跃度下降: 相比其他两个工具,社区活跃度有所下降。
  • 复杂性: 在大型系统中配置和管理相对复杂。
总结
特征RabbitMQApache KafkaActiveMQ
成熟稳定性
协议支持多种协议 (AMQP, MQTT, STOMP)主要支持 Kafka 协议多种协议 (TCP, UDP, HTTP)
可靠性极强
易于管理是,提供强大的管理界面较复杂,需要配置 Zookeeper是,易于配置
性能中等中等
扩展性中等非常好,支持水平扩展中等
持久化极强
容错性极强
流处理能力基本支持内置流处理功能基本支持
学习曲线中等陡峭中等
资源消耗中等中等

选择哪种消息队列工具取决于你的具体需求,例如性能要求、可靠性需求、现有技术栈以及团队的技术背景。

以上内容涵盖了如何在 Java 微服务中使用 RabbitMQ、Apache Kafka 和 ActiveMQ 进行消息队列处理。每个部分都包含了安装步骤、基本概念和示例代码。你可以根据实际需求选择合适的工具,并进一步扩展和完善这些示例。


http://www.ppmy.cn/embedded/155810.html

相关文章

TryHackMe - Linux - Mountaineer

Mountaineer 6w$的事情出现了反转&#xff0c;目前还没有最新消息&#xff0c;后面差不多了再出后续&#xff0c;不管怎样我们都是罐菌&#xff0c;恭喜张云彬拿下2024 QQ飞车年度总决赛冠军&#x1f3c6; 最近换了MacBook Pro&#xff0c;玩几台靶机找找手感&#xff0c;mac…

【k8s面试题2025】2、练气初期

在练气初期&#xff0c;灵气还比较稀薄&#xff0c;只能勉强在体内运转几个周天。 文章目录 简述k8s静态pod为 Kubernetes 集群移除新节点&#xff1a;为 K8s 集群添加新节点Kubernetes 中 Pod 的调度流程 简述k8s静态pod 定义 静态Pod是一种特殊类型的Pod&#xff0c;它是由ku…

一文大白话讲清楚webpack基本使用——4——vue-loader的配置和使用

一文大白话讲清楚webpack基本使用——4——vue-loader的配置和使用 1. 建议按文章顺序从头看是看 第一篇&#xff1a;一文大白话讲清楚啥是个webpack第二篇&#xff1a;一文大白话讲清楚webpack基本使用——1——完成webpack的初步构建第三篇一文大白话讲清楚webpack基本使用…

小型分布式发电项目优化设计方案

一、项目背景与目标 在能源转型的大趋势下&#xff0c;小型分布式发电项目凭借其高效、灵活等优势&#xff0c;成为满足特定区域用电需求的重要方式。本项目选址于[具体地点]&#xff0c;此地年均日照时长可观&#xff0c;具备良好的太阳能资源开发潜力。项目旨在构建一个稳定…

Python并发编程 07 事件驱动模型、进程切换、进程阻塞、文件描述符、缓存I/O、selectors模块

文章目录 一、事件驱动模型二、进程切换三、进程阻塞四、文件描述符五、缓存I/O1、缓存I/O概述2、IO模型&#xff08;1&#xff09;阻塞(blocking) IO&#xff08;2&#xff09;非阻塞(nonblocking) IO&#xff08;3&#xff09;IO多路复用&#xff08;I/O multiplexing&#x…

复健第二天之[MoeCTF 2022]baby_file

打开题目在线环境可以看到&#xff1a; 感觉要用伪协议去求&#xff0c;但是我们并不知道flag的位置&#xff0c;这里我选择用dirsearch去扫一下&#xff1a; 最像的应该就是flag.php了 于是就构建payload&#xff1a; **?filephp://filter/convert.base64-encode/resource…

Linux高并发服务器开发 第十五天(fork函数)

目录 1.fork 函数 1.1创建子进程 1.2getpid 函数 1.3getppid 函数 1.4getgid函数 1.5循环创建 n 个子进程 1.6fork后父子进程异同 1.6.1读时共享&#xff0c;写时复制 1.6.2fork后父子进程共享 1.6.3gdb调试父子进程 1.fork 函数 pid_t fork(void); 成功&#xff1a;…

eBay账号安全攻略:巧妙应对风险

在跨境电商的浪潮中&#xff0c;eBay宛如一座璀璨的灯塔&#xff0c;照亮了无数买卖双方的交易之路。但别忘了&#xff0c;网络安全的阴霾也在悄然蔓延&#xff0c;让eBay账号时刻处于黑客攻击、数据泄露、钓鱼诈骗等风险的阴影之下。别担心&#xff0c;今天就来为你支支招&…