golang rabbitmq客户端连接及重连

ops/2024/12/30 1:35:18/

目录

  • 1、连接、发送、发送异常、重连
  • 2、调用示例

1、连接、发送、发送异常、重连

package rabbitmqimport ("encoding/json""fmt""time""github.com/sirupsen/logrus""github.com/streadway/amqp"
)type RabbitMQ struct {conn            *amqp.Connectionchannel         *amqp.Channelconfigs         RabbitMqConfigconnErrorChan   chan *amqp.ErrorreturnErrorChan chan amqp.ReturnactivateChan    chan interface{}
}func NewRabbitMQ() *RabbitMQ {return &RabbitMQ{}
}// Init 初始化队列服务
func (r *RabbitMQ) Init(cfg RabbitMqConfig) error {// 建立连接conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s%s",cfg.User,cfg.PassWord,cfg.Addr,cfg.VHost))if err != nil {return err}r.conn = connr.configs = cfg// 创建管道if err := r.initChannel(); err != nil {return err}// 交换if err := r.exchangeDeclare(); err != nil {return err}// 队列if err := r.queueDeclare(); err != nil {return err}// 队列绑定交换if err := r.queueBind(); err != nil {return err}// 保持连接活动状态r.connErrorChan = make(chan *amqp.Error, 1)r.conn.NotifyClose(r.connErrorChan)go r.reopen()// 消息没有成功路由到队列监控r.returnErrorChan = make(chan amqp.Return, 1)r.channel.NotifyReturn(r.returnErrorChan)go r.messagePushError()r.activateChan = make(chan interface{}, 1)return nil
}// reopen 重试
func (r *RabbitMQ) reopen() {for {select {case err := <-r.connErrorChan:logrus.WithError(err).Error("RabbitMq server exception retry")if r.conn != nil {r.conn = nil        // 清除连接r.activateChan <- 1 // 通知监控协程结束}// r.isActivate = falsetime.Sleep(time.Second * time.Duration(r.configs.Interval))if err := r.Init(r.configs); err != nil {logrus.WithError(err).Error("reopen queue rabbitmq")continue}logrus.Info("reopen rabbitmq success ")return}}
}// messagePushError 消息发送到队列时异常监控
func (r *RabbitMQ) messagePushError() {for {select {case v, ok := <-r.returnErrorChan:if !ok {continue}logrus.WithFields(map[string]interface{}{"code":    v.ReplyCode,"message": v.ReplyText,"content": string(v.Body),}).Error("send to rabbitmq failed")case <-r.activateChan:logrus.Info("The current connection has been interrupted")return}}
}// initChannel 初始化管道
func (r *RabbitMQ) initChannel() error {channel, err := r.conn.Channel()if err != nil {return err}if err := channel.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global); err != nil {return err}r.channel = channelreturn nil
}// exchangeDeclare 创建交换器
func (r *RabbitMQ) exchangeDeclare() error {exchange := r.configs.RabbitmqExchangereturn r.channel.ExchangeDeclare(exchange.Name,exchange.Kind,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)
}// queueDeclare 创建队列
func (r *RabbitMQ) queueDeclare() error {_, err := r.channel.QueueDeclare(r.configs.RabbitmqQueue.Name,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)return err
}// queueBind 队列与交换器的绑定
func (r *RabbitMQ) queueBind() error {return r.channel.QueueBind(r.configs.RabbitmqQueue.Name,r.configs.RabbitmqQueue.Name,r.configs.RabbitmqExchange.Name,r.configs.NoWait, nil)
}// Send 消息发送
func (r *RabbitMQ) Send(message interface{}) error {messageByte, err := json.Marshal(message)if err != nil {return err}err = r.channel.Publish("",                           // 交换机r.configs.RabbitmqQueue.Name, // 路由队列的Keytrue,                         // 发送到队列失败是否进行通知false,                        // 目标队列没有消费者时是否进行通知,官方不建议开启amqp.Publishing{Headers:         amqp.Table{},ContentType:     "text/plain",ContentEncoding: "",DeliveryMode:    amqp.Persistent, // 消息持久化Body:            messageByte,},)if err != nil {return err}return nil
}// Close 关闭服务
func (r *RabbitMQ) Close() error {if err := r.conn.Close(); err != nil {return err}return nil
}// 队列配置项结构
type RabbitMqConfig struct {Addr             string           `mapstructure:"addr"`        // mq地址VHost            string           `mapstructure:"vhost"`       // mq的vhostUser             string           `mapstructure:"user"`        // mq用户名PassWord         string           `mapstructure:"password"`    // mq密码Durable          bool             `mapstructure:"durable"`     //持久化标识, true: 持久化, false: 否AutoDelete       bool             `mapstructure:"auto_delete"` //是否自动删除, true: 是, false: 否Internal         bool             `mapstructure:"internal"`    //是否是内部的NoWait           bool             `mapstructure:"nowait"`      //是否等待服务器确认, true: 不等待, false: 等待Interval         int              `mapstructure:"interval"`    // 重连时间间隔RabbitmqExchange RabbitmqExchange `mapstructure:"exchange"`RabbitmqQueue    RabbitmqQueue    `mapstructure:"queue"`
}type RabbitmqExchange struct {Name string `mapstructure:"name"` // 交换机名称Kind string `mapstructure:"kind"` // 交换机类型, direct: 默认值, 使用路由, fanout: 不使用路由,topic: 订阅,
}
type RabbitmqQueue struct {Name string `mapstructure:"name"` //队列名称
}

 

2、调用示例

package mainimport ("standard/rabbitmq/rmq"    //自行替换为上面的包的位置"github.com/sirupsen/logrus"
)func main() {cfg := rabbitmq.RabbitMqConfig{Addr:       "127.0.0.1:5672",VHost:      "/",User:       "guest",PassWord:   "guest",Durable:    true,AutoDelete: false,Internal:   false,NoWait:     false,RabbitmqExchange: rabbitmq.RabbitmqExchange{Name: "exchange.test",Kind: "direct",},RabbitmqQueue: rabbitmq.RabbitmqQueue{Name: "queue.test",},Interval: 2,}err := rabbitmq.NewRabbitMQ().Init(cfg)if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("init rabbitmq success")/**err = rabbitmq.NewRabbitMQ().Send(nil)  // 发送数据至队列if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("send to rabbitmq success")**/select {}
}

http://www.ppmy.cn/ops/9013.html

相关文章

stack queue Leetcode 栈和队列算法题

232.用栈实现队列 Queue 是 Collection 接口下的&#xff0c;她的一个实现类是ArrayDeque. 不推荐使用 Vector 实现的 Stack&#xff0c;因为为了保证线程安全使得 Stack 的效率很低&#xff0c;而且由于继承的 Vector 导致没有屏蔽一些栈不应该有的操作 stack 下使用入栈出…

5.Eureka原理分析

消费者如何获取服务提供者具体信息&#xff1f; 1.服务提供者启动时向Eureka注册自己的信息。 2.Eureka保存这些信息。 3.消费者根据服务名称向Eureka拉取提供者信息。 如果有多个服务的提供者&#xff0c;消费者该如何选择&#xff1f; 1.服务消费者利用负载均衡算法&…

Nacos介绍和docker安装

1. Nacos是什么&#xff1f; Nacos官网 | Nacos 官方社区 | Nacos 下载 | Nacos 官方&#xff1a;一个更易于构建云原生应用的动态服务发现(Nacos Discovery )、服务配置(Nacos Config)和服务管理平台。 集 注册中心配置中心服务管理 平台 Nacos 的关键特性包括: 服务发现和…

Oracle 可传输表空间(Transportable Tablespace)

在数据归档、备份、测试等场景&#xff0c;我们经常需要将数据从一个系统移动到另一个系统&#xff0c;一个较常用的方案是数据的导出/导入&#xff08;export/import&#xff09;&#xff0c;但是在数据量较大的场景&#xff0c;此方案可能比较耗时。而可传输表空间是一种以文…

量子时代加密安全与区块链应用的未来

量子时代加密安全与区块链应用的未来 现代密码学仍然是一门相对年轻的学科&#xff0c;但其历史却显示了一种重要的模式。大多数的发展都是基于几年甚至几十年前的研究。而这种缓慢的发展速度也是有原因的&#xff0c;就像药物和疫苗在进入市场之前需要经过多年的严格测试一样&…

学习微服务nacos遇到的问题

在学习微服务注册到nacos的时候&#xff0c;所有过程都正确了&#xff0c;注册也成功了&#xff0c;但是访问不了调用的地址报错出现问题。 一、引入依赖 在cloud-demo父工程的pom文件中的<dependencyManagement>中引入SpringCloudAlibaba的依赖 1、springboot <pa…

Linux默认shell简介、查看和更改

在Linux环境中&#xff0c;用户的“默认shell”是指他们登录系统后自动启动的命令行解释器或交互式shell。这个shell用于处理用户在命令行界面输入的命令和操作。每个用户都拥有一个特定的默认shell。 Linux系统中常见的默认shell包括以下几种&#xff1a; Bash&#xff08;B…

每日写题(洛谷第三章:循环结构 1.找最小值)

#include<bits/stdc.h> using namespace std; int main() {int n;cin>>n;int a[n];for(int i0;i<n;i)cin>>a[i];int mina[0];int temp;for(int i0;i<n;i){if(a[i]<min){tempa[i];a[i]min;mintemp;}}cout<<min;return 0; }