RabbitMQ SDK 支持发布、消费,连接恢复,死信队列,多种使用场景

news/2024/12/21 21:06:37/

本文首发在这里
基于Example封装便于使用的SDK,支持发布消费连接恢复死信队列,以及官方入门中的多种使用场景

参数解释(测试代码在下面)

  • 直接使用amq.topic,仅是出于逻辑简单代码少,若想数据隔离,可声明自定义主题类型交换(ExchangeDeclare)并使用
  • 建议生产消费者尽量对称提供queueNamekeys,确保发布消费前完成声明绑定逻辑。queueName用来声明名称是queueNamequeueName_dead_letter(死信)的队列,绑定queueNameamq.topic,并使用所有的keys充当路由键
  • 若队列已声明且路由键已绑定,未来消费者启动仅需提供queueName表明从此队列消费,未来生产者启动仅需提供key用做发布路由键,但queueName会因为空而使用默认值default_queue_name来触发总是执行的声明队列(QueueDeclare)逻辑,避免意外声明RabbitMQ随机命名的队列
  • 使用手动消息确认,队列和消息标记为持久,并不使用临时独占队列
  • 消费者可合理调大Qos.prefetchCount来提高吞吐率

RabbitMQ_10">启动RabbitMQ

docker run -d -p 15672:15672 -p 5672:5672 --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3-management

消费

package mainimport ("context""flag""log/slog""os""os/signal""strings""sync""time""github.com/panshiqu/golang/rabbitmq"amqp "github.com/rabbitmq/amqp091-go"
)var addr = flag.String("addr", "amqp://guest:guest@localhost:5672/", "address")
var queueName = flag.String("queueName", "", "queue name")
var keys = flag.String("keys", "", "routing keys")func onDelivery(delivery *amqp.Delivery) error {slog.Info("onDelivery", slog.String("body", string(delivery.Body)))// if string(delivery.Body) == "hi2" {// 	return errors.New("dead letter")// }time.Sleep(time.Second)slog.Info("onDelivery done")return nil
}func main() {flag.Parse()client := rabbitmq.New(*queueName, *addr, strings.Split(*keys, ","))ctx, cancel := context.WithCancel(context.Background())wg := &sync.WaitGroup{}go client.ConsumeFunc(ctx, wg, onDelivery)c := make(chan os.Signal, 1)signal.Notify(c, os.Interrupt)sig := <-cslog.Info("notify", slog.Any("signal", sig))cancel()wg.Wait()
}

生产者

package mainimport ("flag""fmt""log/slog""os""os/signal""strings""sync""time""github.com/panshiqu/golang/rabbitmq"
)var addr = flag.String("addr", "amqp://guest:guest@localhost:5672/", "address")
var queueName = flag.String("queueName", "", "queue name")
var keys = flag.String("keys", "", "routing keys")
var key = flag.String("key", "", "routing key")func publish(client *rabbitmq.Client, wg *sync.WaitGroup) {wg.Add(1)defer wg.Done()for i := 1; ; i++ {time.Sleep(time.Second)data := fmt.Sprintf("hi%d", i)if err := client.Push(*key, []byte(data)); err != nil {slog.Error("push", slog.Any("err", err))return}slog.Info("push", slog.String("data", data))}
}func main() {flag.Parse()client := rabbitmq.New(*queueName, *addr, strings.Split(*keys, ","))wg := &sync.WaitGroup{}go publish(client, wg)c := make(chan os.Signal, 1)signal.Notify(c, os.Interrupt)sig := <-cslog.Info("notify", slog.Any("signal", sig))if err := client.Close(); err != nil {slog.Error("close", slog.Any("err", err))}wg.Wait()
}

多种使用场景

Hello World
go run consume/main.go -queueName one_queue -keys one_key
go run publish/main.go -queueName one_queue -keys one_key -key one_keygo run consume/main.go -queueName one_queue -keys one_key_new

注:后续启动修改keys,并不会解绑原路由键,而是纯粹绑定新路由键,如此以来发布key不管是one_key还是one_key_new都将路由到one_queue

# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name     destination_kind     routing_key     arguments
amq.topic       exchange        one_queue            queue                one_key         []
amq.topic       exchange        one_queue            queue                one_key_new     []
Work Queues
go run consume/main.go -queueName two_queue -keys two_key
go run consume/main.go -queueName two_queue -keys two_key
go run publish/main.go -queueName two_queue -keys two_key -key two_key
Publish/Subscribe
go run consume/main.go -queueName three_queue_new -keys three_key
go run publish/main.go -queueName three_queue -keys three_key -key three_key
go run consume/main.go -queueName three_queue -keys three_key

注:上面的生产者仅会声明queueNamethree_queue的队列,并不会声明three_queue_new,所以声明three_queue_new消费者必须在生产者发布消息前启动,但three_queue没有这个限制

Routing
go run consume/main.go -queueName four_queue -keys error
go run consume/main.go -queueName four_queue_new -keys error,info
go run publish/main.go -queueName four_queue -keys error -key error
go run publish/main.go -queueName four_queue -keys error -key info
Topics
go run consume/main.go -queueName five_queue -keys "*.error"
go run consume/main.go -queueName five_queue_new -keys "kern.#"
go run publish/main.go -queueName five_queue -keys "*.error" -key cron.error
go run publish/main.go -queueName five_queue -keys "*.error" -key kern.info
go run publish/main.go -queueName five_queue -keys "*.error" -key kern.error
死信队列

业务逻辑如下返回错误,重新投递仍失败后将进入死信队列,保证消息不丢失,还可反复消费来排查问题

	if string(delivery.Body) == "hi2" {return errors.New("dead letter")}
go run consume/main.go -queueName six_queue -keys six_key
go run publish/main.go -queueName six_queue -keys six_key -key six_key
go run consume/main.go -queueName six_queue_dead_letter

特殊情况说明

  • 生产者正常退出若是等待发布接口返回消息将不丢失
  • 消费者正常退出是会等待业务逻辑处理且发送确认完成的
  • 消费者崩溃,RabbitMQ将会重新投递没有收到确认的消息,那怕仅是消费者确认没发出去
  • 消费者处理消息成功,RabbitMQ关闭,消费者发送确认失败,服务开启后仍会重新投递此消息
  • RabbitMQ关闭,发布接口将阻塞至服务启动,消息基本不会丢失
  • 理论上消费者和生产者均可先于RabbitMQ启动

建议业务逻辑对消息的消费支持幂等

关于发布确认

目前并未支持一对一的确认,意思就是发布三条消息,某次的发布因未收到确认而阻塞,但是无从知晓是三条中的哪条消息?当前是可以换用PublishWithDeferredConfirm发布并记录本端递增生成的DeliveryTag,虽然相同通道的消息是加锁顺序发布的,但是想要和对端RabbitMQ递增生成的DeliveryTag对应起来,在依赖可靠网络传输的基础上,收到消息时应立即递增生成,如此以来对应关系才能得到保证,目前还没有看RabbitMQ实现源码来明确这点

heartbeat

两端协调心跳时间会取较小值,但想禁用心跳双端须同时置0,如此便于调试避免触发断线重连

amqp://guest:guest@localhost:5672/?heartbeat=1800rabbitmqctl eval 'application:get_env(rabbit, heartbeat).'
rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1800).'

有用的命令

rabbitmqctl list_queues
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_exchanges
rabbitmqctl list_bindings

http://www.ppmy.cn/news/1527174.html

相关文章

RabbitMq中交换机(Exchange)、队列(Queue)和路由键(Routing Key)

RabbitMQ 是一个消息代理系统&#xff0c;使用交换机&#xff08;Exchange&#xff09;、队列&#xff08;Queue&#xff09;和路由键&#xff08;Routing Key&#xff09;来管理消息的传递。它们分别起到不同的作用&#xff0c;构成了消息从生产者到消费者的传递路径。 以下是…

Matlab如何配置小波工具(Wavelet Toolbox)

1、发现问题 因为实验要使用小波工具函数&#xff0c;运行时报错如下&#xff1a; 查看对应文件夹发现没有小波工具&#xff08;也可在控制台输入ver&#xff09;&#xff0c;检查是否有该工具&#xff0c;输入后回车返回如下&#xff1a; 2、下载工具包 没有这个工具就要去下…

深入剖析:C++类对象的内存布局与优化

深入剖析&#xff1a;C类对象的内存布局与优化 引言 在C编程中&#xff0c;理解类对象的内存布局对于优化内存使用和提高程序性能至关重要。本文将详细介绍C类对象的内存布局&#xff0c;包括数据成员、虚函数表指针以及静态变量和静态方法在内存中的位置。通过这些知识&…

【加密社】Solidity 中的事件机制及其应用

加密社 引言 在Solidity合约开发过程中&#xff0c;事件&#xff08;Events&#xff09;是一种非常重要的机制。它们不仅能够让开发者记录智能合约的重要状态变更&#xff0c;还能够让外部系统&#xff08;如前端应用&#xff09;监听这些状态的变化。 本文将详细介绍Solidity中…

OpenFeign接口调用日志

一、介绍 在开发或测试环境中&#xff0c;需要更多的调试信息&#xff1b;在通过 Spring Cloud OpenFeign 调用远程服务的接口时&#xff0c;可能需要记录接口调用的日志详情&#xff0c;比如&#xff1a;请求头、请求参数、响应等。 Spring Cloud OpenFeign 打印 FeignClien…

达芬奇竖屏导出有黑屏解决方案

文章目录 项目设置导出设置 初学达芬奇&#xff0c;导出的时候&#xff0c;总是有黑边。 经过研究&#xff0c;才发现导出的时候的分辨率和项目分辨率 2个地方都要设置&#xff0c;否则导出就会导致有黑边。 项目设置 点击 文件 选择项目设置 选择竖屏分辨率 导出设置

C++ MFC SnowWorld

目录 效果 项目 代码 下载 效果 SnowWorld 项目 代码 // ChildView.cpp : implementation of the CChildView class // #include "stdafx.h" #include "SnowWorld.h" #include "ChildView.h" #ifdef _DEBUG #define new DEBUG_NEW #und…

无人机之防风性能篇

无人机的防风性能是评价其在不同风力条件下稳定性和安全性的重要指标。以下是关于无人机防风性能的几个关键点&#xff1a; 一、防风性能的影响因素 机身设计与结构&#xff1a;无人机的机身设计、材料选择以及结构强度直接影响其防风性能。例如&#xff0c;采用坚固耐用的材…