目录
- 1、连接、发送、发送异常、重连
- 2、调用示例
1、连接、发送、发送异常、重连
package rabbitmqimport ("encoding/json""fmt""time""github.com/sirupsen/logrus""github.com/streadway/amqp"
)type RabbitMQ struct {conn *amqp.Connectionchannel *amqp.Channelconfigs RabbitMqConfigconnErrorChan chan *amqp.ErrorreturnErrorChan chan amqp.ReturnactivateChan chan interface{}
}func NewRabbitMQ() *RabbitMQ {return &RabbitMQ{}
}// Init 初始化队列服务
func (r *RabbitMQ) Init(cfg RabbitMqConfig) error {// 建立连接conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s%s",cfg.User,cfg.PassWord,cfg.Addr,cfg.VHost))if err != nil {return err}r.conn = connr.configs = cfg// 创建管道if err := r.initChannel(); err != nil {return err}// 交换if err := r.exchangeDeclare(); err != nil {return err}// 队列if err := r.queueDeclare(); err != nil {return err}// 队列绑定交换if err := r.queueBind(); err != nil {return err}// 保持连接活动状态r.connErrorChan = make(chan *amqp.Error, 1)r.conn.NotifyClose(r.connErrorChan)go r.reopen()// 消息没有成功路由到队列监控r.returnErrorChan = make(chan amqp.Return, 1)r.channel.NotifyReturn(r.returnErrorChan)go r.messagePushError()r.activateChan = make(chan interface{}, 1)return nil
}// reopen 重试
func (r *RabbitMQ) reopen() {for {select {case err := <-r.connErrorChan:logrus.WithError(err).Error("RabbitMq server exception retry")if r.conn != nil {r.conn = nil // 清除连接r.activateChan <- 1 // 通知监控协程结束}// r.isActivate = falsetime.Sleep(time.Second * time.Duration(r.configs.Interval))if err := r.Init(r.configs); err != nil {logrus.WithError(err).Error("reopen queue rabbitmq")continue}logrus.Info("reopen rabbitmq success ")return}}
}// messagePushError 消息发送到队列时异常监控
func (r *RabbitMQ) messagePushError() {for {select {case v, ok := <-r.returnErrorChan:if !ok {continue}logrus.WithFields(map[string]interface{}{"code": v.ReplyCode,"message": v.ReplyText,"content": string(v.Body),}).Error("send to rabbitmq failed")case <-r.activateChan:logrus.Info("The current connection has been interrupted")return}}
}// initChannel 初始化管道
func (r *RabbitMQ) initChannel() error {channel, err := r.conn.Channel()if err != nil {return err}if err := channel.Qos(1, // prefetch count0, // prefetch sizefalse, // global); err != nil {return err}r.channel = channelreturn nil
}// exchangeDeclare 创建交换器
func (r *RabbitMQ) exchangeDeclare() error {exchange := r.configs.RabbitmqExchangereturn r.channel.ExchangeDeclare(exchange.Name,exchange.Kind,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)
}// queueDeclare 创建队列
func (r *RabbitMQ) queueDeclare() error {_, err := r.channel.QueueDeclare(r.configs.RabbitmqQueue.Name,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)return err
}// queueBind 队列与交换器的绑定
func (r *RabbitMQ) queueBind() error {return r.channel.QueueBind(r.configs.RabbitmqQueue.Name,r.configs.RabbitmqQueue.Name,r.configs.RabbitmqExchange.Name,r.configs.NoWait, nil)
}// Send 消息发送
func (r *RabbitMQ) Send(message interface{}) error {messageByte, err := json.Marshal(message)if err != nil {return err}err = r.channel.Publish("", // 交换机r.configs.RabbitmqQueue.Name, // 路由队列的Keytrue, // 发送到队列失败是否进行通知false, // 目标队列没有消费者时是否进行通知,官方不建议开启amqp.Publishing{Headers: amqp.Table{},ContentType: "text/plain",ContentEncoding: "",DeliveryMode: amqp.Persistent, // 消息持久化Body: messageByte,},)if err != nil {return err}return nil
}// Close 关闭服务
func (r *RabbitMQ) Close() error {if err := r.conn.Close(); err != nil {return err}return nil
}// 队列配置项结构
type RabbitMqConfig struct {Addr string `mapstructure:"addr"` // mq地址VHost string `mapstructure:"vhost"` // mq的vhostUser string `mapstructure:"user"` // mq用户名PassWord string `mapstructure:"password"` // mq密码Durable bool `mapstructure:"durable"` //持久化标识, true: 持久化, false: 否AutoDelete bool `mapstructure:"auto_delete"` //是否自动删除, true: 是, false: 否Internal bool `mapstructure:"internal"` //是否是内部的NoWait bool `mapstructure:"nowait"` //是否等待服务器确认, true: 不等待, false: 等待Interval int `mapstructure:"interval"` // 重连时间间隔RabbitmqExchange RabbitmqExchange `mapstructure:"exchange"`RabbitmqQueue RabbitmqQueue `mapstructure:"queue"`
}type RabbitmqExchange struct {Name string `mapstructure:"name"` // 交换机名称Kind string `mapstructure:"kind"` // 交换机类型, direct: 默认值, 使用路由, fanout: 不使用路由,topic: 订阅,
}
type RabbitmqQueue struct {Name string `mapstructure:"name"` //队列名称
}
2、调用示例
package mainimport ("standard/rabbitmq/rmq" //自行替换为上面的包的位置"github.com/sirupsen/logrus"
)func main() {cfg := rabbitmq.RabbitMqConfig{Addr: "127.0.0.1:5672",VHost: "/",User: "guest",PassWord: "guest",Durable: true,AutoDelete: false,Internal: false,NoWait: false,RabbitmqExchange: rabbitmq.RabbitmqExchange{Name: "exchange.test",Kind: "direct",},RabbitmqQueue: rabbitmq.RabbitmqQueue{Name: "queue.test",},Interval: 2,}err := rabbitmq.NewRabbitMQ().Init(cfg)if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("init rabbitmq success")/**err = rabbitmq.NewRabbitMQ().Send(nil) // 发送数据至队列if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("send to rabbitmq success")**/select {}
}