GO Message Bus

embedded/2024/10/20 16:01:16/

文章目录

    • 核心概念
      • 为什么使用 `Message Bus`?
    • Go 中的 `Message Bus` 实现
      • 1. 手动实现 `Message Bus`
        • 示例代码:简易 Message Bus 实现
      • 解释:
      • 运行结果:
      • 2. 使用库 `go-micro` 中的 `Event Bus`
        • 安装 `go-micro`
        • 使用 `go-micro` 实现消息总线
        • 解释:
      • 3. 使用库 `gobot` 实现消息总线
    • 总结

Message Bus(消息总线)是一种设计模式,用于在不同系统组件之间进行异步通信。它使得系统的模块可以松耦合地进行交互,而不需要彼此直接依赖。消息总线通常用于事件驱动的架构中,特别是在分布式系统、微服务以及面向事件的系统设计中。

在 Go 中,Message Bus 通常用于处理异步事件或在组件之间传递消息,允许多个发布者和订阅者之间进行消息传递。以下是对 Message Bus 的学习内容及使用方法:

核心概念

  • Publisher(发布者):发送消息或事件的组件。它将消息发布到消息总线上。
  • Subscriber(订阅者):接收消息或事件的组件。它订阅了特定类型的消息,并在消息发布时收到通知。
  • Topic/Channel(主题或通道):消息可以按照主题分类,订阅者订阅特定主题的消息。

为什么使用 Message Bus

  1. 解耦:发布者和订阅者不需要彼此知道对方的存在,促进了松耦合架构。
  2. 扩展性:通过引入消息总线,系统可以轻松添加或移除新的订阅者,而不会影响到其他模块。
  3. 异步通信:消息总线允许异步的消息传递,适用于事件驱动或需要异步处理的场景。
  4. 分布式架构:在微服务架构中,消息总线用于实现服务之间的通信。

Go 中的 Message Bus 实现

1. 手动实现 Message Bus

在 Go 中,可以手动使用 channelsgoroutines 来实现一个简单的 Message Bus,用于在不同的模块间传递消息。

示例代码:简易 Message Bus 实现
go">package mainimport ("fmt""sync"
)// 消息总线结构体
type MessageBus struct {subscribers map[string][]chan interface{}lock        sync.RWMutex
}// 创建一个新的消息总线
func NewMessageBus() *MessageBus {return &MessageBus{subscribers: make(map[string][]chan interface{}),}
}// 订阅消息
func (mb *MessageBus) Subscribe(topic string) <-chan interface{} {mb.lock.Lock()defer mb.lock.Unlock()ch := make(chan interface{}, 1)mb.subscribers[topic] = append(mb.subscribers[topic], ch)return ch
}// 发布消息
func (mb *MessageBus) Publish(topic string, msg interface{}) {mb.lock.RLock()defer mb.lock.RUnlock()if subscribers, found := mb.subscribers[topic]; found {for _, ch := range subscribers {// 异步发送消息到通道go func(c chan interface{}) {c <- msg}(ch)}}
}// 取消订阅
func (mb *MessageBus) Unsubscribe(topic string, ch <-chan interface{}) {mb.lock.Lock()defer mb.lock.Unlock()if subscribers, found := mb.subscribers[topic]; found {for i, subscriber := range subscribers {if subscriber == ch {mb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)close(subscriber) // 关闭通道break}}}
}func main() {// 创建消息总线bus := NewMessageBus()// 订阅者1订阅 "topic1"subscriber1 := bus.Subscribe("topic1")// 订阅者2订阅 "topic1"subscriber2 := bus.Subscribe("topic1")// 发布者发布消息到 "topic1"bus.Publish("topic1", "Hello, World!")// 订阅者接收消息fmt.Println("Subscriber 1 received:", <-subscriber1)fmt.Println("Subscriber 2 received:", <-subscriber2)// 发布者发布另一个消息bus.Publish("topic1", "Another message")// 订阅者接收新消息fmt.Println("Subscriber 1 received:", <-subscriber1)fmt.Println("Subscriber 2 received:", <-subscriber2)// 取消订阅bus.Unsubscribe("topic1", subscriber1)// 再次发布消息,只有订阅者2会收到bus.Publish("topic1", "Message after unsubscribe")// 订阅者2接收消息fmt.Println("Subscriber 2 received:", <-subscriber2)
}

解释:

  1. MessageBus 结构体

    • subscribers: 存储订阅者的 map,key 是订阅的主题,value 是该主题的订阅者(一个 chan 列表)。
    • lock: 用于并发安全操作的读写锁。
  2. Subscribe 方法

    • 允许用户订阅某个主题,返回一个接收消息的通道。
  3. Publish 方法

    • 发布消息到指定的主题,消息通过异步的 goroutine 发送到所有订阅者。
  4. Unsubscribe 方法

    • 从某个主题中移除订阅者,关闭其通道。

运行结果:

Subscriber 1 received: Hello, World!
Subscriber 2 received: Hello, World!
Subscriber 1 received: Another message
Subscriber 2 received: Another message
Subscriber 2 received: Message after unsubscribe

gomicro__Event_Bus_148">2. 使用库 go-micro 中的 Event Bus

go-micro 是一个强大的微服务框架,它的 Event Bus 功能实现了分布式消息传递,可以用作消息总线。使用 go-micro 的消息总线非常适合微服务架构中的服务通信。

gomicro_152">安装 go-micro
go get github.com/asim/go-micro/v3
gomicro__158">使用 go-micro 实现消息总线
go">package mainimport ("context""fmt""log""github.com/asim/go-micro/v3""github.com/asim/go-micro/v3/broker""github.com/asim/go-micro/v3/events"
)func main() {// 创建 go-micro 服务service := micro.NewService(micro.Name("message-bus"))service.Init()// 事件发布者eventBus := events.NewEventStore(service.Client())topic := "example.topic"// 发布消息go func() {for i := 0; i < 5; i++ {err := eventBus.Publish(topic, fmt.Sprintf("Message %d", i))if err != nil {log.Fatalf("Error publishing message: %v", err)}fmt.Println("Published message", i)}}()// 订阅消息_, err := eventBus.Subscribe(topic, func(ev *broker.Event) error {fmt.Printf("Received message: %s\n", string(ev.Message.Body))return nil})if err != nil {log.Fatalf("Error subscribing to topic: %v", err)}// 运行服务if err := service.Run(); err != nil {log.Fatal(err)}
}
解释:
  1. Event Busgo-micro 提供的 Event Bus 用于发布和订阅事件。
  2. Publish:使用 Publish 方法发布消息到某个主题。
  3. Subscribe:通过 Subscribe 方法订阅某个主题,当消息发布时,订阅者会接收到事件。

gobot__215">3. 使用库 gobot 实现消息总线

如果你在构建机器人系统或者处理传感器/硬件交互,可以使用 gobot 库,它同样支持消息总线模式。

go get github.com/hybridgroup/gobot

然后可以使用类似的方式来创建事件总线和订阅发布。

总结

  • 消息总线是一种解耦的设计模式,适用于分布式系统和事件驱动架构。
  • Go 中可以通过使用 channelsgoroutines 实现自定义的消息总线,也可以使用 go-micro 等库来实现更复杂的分布式消息总线功能。
  • 消息总线的典型应用包括:微服务通信、事件驱动架构、异步任务执行等。

http://www.ppmy.cn/embedded/119983.html

相关文章

jupyter本地配置

1 jupyter启动 在相关文件夹&#xff0c;启动命令行&#xff0c;在命令行执行 jupyter lab即可 将ipynb文件转换为python文件 jupyter nbconvert --to script your_notebook.ipynb会在同一个文件夹下产生同名的your_notebook.py文件 错误 (1)错误&#xff1a;# jupyter n…

Windows开发工具使用技巧大揭秘:让编码效率翻倍的秘籍!

【ACM出版|厦大主办|EI稳定检索】第五届计算机科学与管理科技国际学术会议&#xff08;ICCSMT 2024&#xff09;_艾思科蓝_学术一站式服务平台 更多学术会议请看&#xff1a;学术会议-学术交流征稿-学术会议在线-艾思科蓝 目录 引言 1. 快捷键大全&#xff1a;加速你的编码…

CSS外边距

元素的外边距&#xff08;margin&#xff09;是围绕在元素边框以外&#xff08;不包括边框&#xff09;的空白区域&#xff0c;这片区域不受 background 属性的影响&#xff0c;始终是透明的。 为元素设置外边距 默认情况下如果不设置外边距属性&#xff0c;HTML 元素就是不会…

关于最小二乘法

最小二乘法的核心思想简单而优雅&#xff1a;我们希望找到一条最佳的曲线&#xff0c;使其尽可能贴近所有的数据点。想象一下&#xff0c;当你在画布上描绘一条线&#xff0c;目标是让这条线与点的距离最小。数学上&#xff0c;这可以表示为&#xff1a; 在这个公式中&#xff…

【C++】set详解

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

从kafka和zookeeper中获取生产和消费偏移量

从kafka和zookeeper中获取生产和消费偏移量 特殊说明 该命令是使用python进行编译&#xff0c;需要使用centos7系统上进行使用。 命令详细 [rootmongodb_1 get_offsets_num]# ./get_offsets_num -h usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST][-m INT…

sql 时间交集

任务&#xff08;取时间交集&#xff09; 前端输入开始时间和结束时间&#xff0c;通过sql筛选出活动开始时间和活动结束时间再开时时间和结束时间有交集的活动 想法&#xff1a; 前后一段时间内遇到了类似取交集的&#xff0c;从网上找到了两种写法&#xff0c;再结合GPT等…

无人化焦炉四大车系统 武汉正向科技 工业机车无人远程控制系统

焦炉四大车无人化系统介绍 采用格雷母线光编码尺双冗余定位技术&#xff0c;炉门视觉定位自学习技术&#xff0c;wifi5G无线通讯技术&#xff0c;激光雷达安全识别技术&#xff0c;焦化智慧调度&#xff0c;手机APP监控功能。 焦炉四大车无人化系统功能 该系统能自动生成生产…