使用 RabbitMQ 和 Go 构建异步订单处理系统

embedded/2024/10/22 13:39:44/

使用 RabbitMQ 和 Go 构建异步订单处理系统

我们可以通过构建一个订单处理系统来演示如何使用消息队列(MQ)实现异步任务处理。这个项目将使用 RabbitMQ 作为消息队列,并使用 Go 语言来实现。以下是项目的详细教程和相关环境配置。

项目描述

功能:模拟一个简单的电商订单处理系统,包括下单、库存扣减、邮件通知、以及发货通知。每个任务通过消息队列异步处理。

环境配置

1. 安装 Go 环境

确保已经安装 Go 语言开发环境,可以通过以下命令确认:

go version

如果没有安装,可以从 Go 官方网站 下载并安装。
2. 安装 RabbitMQ

RabbitMQ 可以通过 Docker 轻松安装:

docker run -d --hostname rabbitmq --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
RabbitMQ 管理控制台可以通过 http://localhost:15672 访问。
默认用户名和密码都是 guest。

3. 创建 Go 项目

mkdir order-processing
cd order-processing
go mod init order-processing

4. 安装 RabbitMQ Go 客户端库

go get github.com/streadway/amqp

项目结构

项目将包含以下文件和目录:

order-processing/
│
├── main.go                 // 入口文件,初始化 MQ 连接
├── producer.go             // 生产者代码,模拟下单请求
├── consumer_inventory.go   // 消费者代码,处理库存扣减
├── consumer_email.go       // 消费者代码,处理邮件发送
└── consumer_shipping.go    // 消费者代码,处理发货通知

代码实现

1. main.go - 初始化 MQ 连接

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明队列q, err := ch.QueueDeclare("order_queue", // 队列名称true,          // 持久化false,         // 自动删除false,         // 独占false,         // 无需等待nil,           // 额外属性)failOnError(err, "Failed to declare a queue")log.Printf(" [*] Waiting for messages. To exit press CTRL+C")// 创建消费者msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符true,   // 自动应答false,  // 独占false,  // 无需等待false,  // no localnil,    // 额外属性)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()<-forever
}

2. producer.go - 生产者代码

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("order_queue",true,false,false,false,nil,)failOnError(err, "Failed to declare a queue")body := "New Order"err = ch.Publish("",q.Name,false,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}

3. consumer_inventory.go - 处理库存扣减

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("order_queue",true,false,false,false,nil,)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name,"",true,false,false,false,nil,)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Processing inventory for order: %s", d.Body)// 处理库存扣减逻辑}}()<-forever
}

4. consumer_email.go - 处理邮件发送

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("order_queue",true,false,false,false,nil,)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name,"",true,false,false,false,nil,)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Sending email for order: %s", d.Body)// 处理邮件发送逻辑}}()<-forever
}

5. consumer_shipping.go - 处理发货通知

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("order_queue",true,false,false,false,nil,)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name,"",true,false,false,false,nil,)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Notifying shipping for order: %s", d.Body)// 处理发货通知逻辑}}()<-forever
}

运行步骤

1.启动 RabbitMQ(如果使用 Docker 已启动,则跳过这步)。

docker run -d --hostname rabbitmq --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

2.启动消费者:分别运行 consumer_inventory.goconsumer_email.goconsumer_shipping.go文件。

3.发送订单消息:运行 producer.go 文件,模拟用户下单。
在这里插入图片描述

运行后的验证

1.消费者处理消息的日志输出:

每个消费者程序启动后,会开始监听 RabbitMQ 中相应的队列。当生产者发送消息时,消费者会从队列中读取消息,并输出处理的日志信息。
消费者处理消息的流程如下:consumer_inventory.go 处理库存更新的消息。consumer_email.go 处理订单确认邮件的发送。consumer_shipping.go 处理发货通知的消息。

你可以在每个运行的消费者窗口中看到相应的日志输出,例如:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2.RabbitMQ 管理界面监控: 通过访问 RabbitMQ 管理界面,你可以检查 RabbitMQ 队列的状态:

查看哪些队列正在运行。
检查队列中是否有消息积压。
查看消息的流入和流出情况,确保消费者正在从队列中消费消息。

在这里插入图片描述

项目中的关键点解释

RabbitMQ 消息模型:生产者会将消息发布到 RabbitMQ 的一个交换机(Exchange)上,交换机会将消息路由到相应的队列(Queue)。消费者监听队列并从中获取消息进行处理。每个消费者可以处理一个特定类型的消息。AMQP协议:Go 代码中的 amqp 库基于 AMQP 协议(Advanced Message Queuing Protocol),这是 RabbitMQ 使用的消息协议。它定义了如何在生产者和消费者之间传递消息。队列持久化与自动应答:RabbitMQ 队列可以配置为持久化消息,确保在系统重启时消息不会丢失。你可以在消费者中配置是否自动应答(acknowledgment),以确认消息处理成功后再从队列中移除。当前例子中是使用自动应答的模式。

问题排查

如果在运行过程中遇到问题,可以按以下步骤排查:

1.检查 RabbitMQ 是否正常运行:确保 Docker 容器正常运行,并且端口没有冲突。

docker ps

如果 RabbitMQ 没有启动,可以通过命令 docker start some-rabbit 来启动。

2.确保消费者能够连接到 RabbitMQ:如果消费者无法连接到 RabbitMQ,可能是因为配置错误或 RabbitMQ 服务未启动。检查消费者的日志输出,确认连接是否成功。

3.查看 RabbitMQ 日志:通过 RabbitMQ 管理界面可以查看日志,排查是否有消息未路由到队列或者连接失败的错误。

通过这些步骤,你应该能够顺利运行和验证这个消息队列项目。如果有更多问题或需要其他帮助,随时告诉我!

总结

通过这篇博客,你应当掌握了 RabbitMQ 的基本使用方法,了解了如何将 RabbitMQ 与 Go 应用集成,从而构建可靠的消息传递系统。这些技能将为你在开发异步消息处理和微服务架构方面奠定坚实的基础。


http://www.ppmy.cn/embedded/108749.html

相关文章

小阿轩yx-Kubernertes日志收集

小阿轩yx-Kubernertes日志收集 前言 在 Kubernetes 集群中如何通过不同的技术栈收集容器的日志&#xff0c;包括程序直接输出到控制台日志、自定义文件日志等 有哪些日志需要收集 日志收集与分析很重要&#xff0c;为了更加方便的处理异常 简单总结一些比较重要的需要收集…

(四)webAPI的发布和访问

我们已经创建了一个core webapi项目&#xff0c;基于.net6.0&#xff0c;默认包含WeatherForecastController控制器。&#xff08;可参见前几期的博文&#xff09;。 1.项目发布 使用命令 dotnet publish -o publish来发布项目。&#xff08;也可以右击项目->发布->文件…

pytorch torch.gather函数介绍

torch.gather 是 PyTorch 中的一个用于从给定维度上按索引取值的函数。它根据一个索引张量 index&#xff0c;从源张量 input 中收集值&#xff0c;并返回一个新的张量。torch.gather 常用于需要从张量的特定位置抽取元素的操作。 1. 函数签名 torch.gather(input, dim, inde…

# Windows下配置Redis以服务方式启动

Windows下配置Redis以服务方式启动 Redis以服务方式启动 winR快捷键打开运行窗口&#xff0c;输入cmd进入 DOS窗口。进入redis的安装目录。安装redis服务 &#xff0c; 输入命令 redis-server --service-install redis.windows.conf --loglevel verbose 启动服务&#xff0c…

第十七题:电话号码的字母组合

题目描述 给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有可能的由它组成的字母组合。你可以假设输入字符串至少包含一个数字&#xff0c;并且不超过3位数字。 实现思路 使用哈希表或数组存储每个数字对应的字符&#xff0c;然后通过递归或迭代的方式生成所有可能的组…

快速失败 (fail-fast) 和安全失败 (fail-safe)

1. 定义与工作原理 1.1 快速失败&#xff08;Fail-Fast&#xff09; 定义&#xff1a; 快速失败是一种系统设计原则&#xff0c;当系统遇到异常情况或错误时&#xff0c;立即停止执行并返回错误&#xff0c;而不是试图继续执行或处理潜在的问题。快速失败系统会主动检测系统中…

基于Tomcat的JavaWeb(ASP)项目构建(图解)

目录 配置IDEA的TOMCAT环境 环境设置 导入API(可选) 创建项目 构建项目 ​编辑 运行项目 项目结果 ​编辑 查看配置基础项目 配置IDEA的TOMCAT环境 环境设置 导入API(可选) 创建项目 构建项目 运行项目 项目结果 查看配置基础项目 了解Web Application: Exploded与…

【免费分享】高斯过程回归(Gaussian process regression)原理详解及MATLAB代码实战

MATLAB实战 net fitrgp(p_train, t_train, KernelFunction, ardsquaredexponential, ...Optimizer, lbfgs, KernelParameters, [sigmaL0; sigmaF0], Sigma, sigmaN0);fitrgp 函数来训练一个 高斯过程回归模型 (Gaussian Process Regression, GPR)。具体来说&#xff0c;它在训…