基于 Redis 发布订阅实现服务注册与发现

ops/2024/9/23 0:30:24/

写在前面

其实很少有公司会使用 Redis 来实现服务注册与发现,通常是ETCD、NACOS、ZOOKEEPER等等,但是也不妨碍我们了解。本文会先介绍 Redis 的发布/订阅模式,接着基于这个模式实现服务注册与发现。

Redis发布订阅流程图:
在这里插入图片描述

Redis 发布订阅

1. 简介

Redis的发布订阅功能主要由PUBLISHSUBSCRIBEPSUBSCRIBE 等命令组成的。

通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道,从而成为这个频道的订阅者。
在这里插入图片描述

每当有其他客户端向这个被订阅的频道发送消息的时候,频道的所有订阅者都会收到这条消息。

在这里插入图片描述
当然,客户端还可以通过PSUBSCRIBE订阅一个或多个模式,从而成为这些模式的订阅者,也就是模糊匹配

2. 订阅

每当一个客户端执行SUBSCRIBE命令订阅某个或某些频道的时候,这个客户端与被订阅者之间就会建立起一种订阅关系。而Redis会将这种订阅关系保存到pubsub_channels 这个字典中,这个字典的键是某个被订阅的频道,而值是一个链表,这个链表记录了所有订阅这个频道的客户端

在这里插入图片描述
每当有客户端执行了SUBSCRIBE命令订阅某个或某些频道的时候,服务器都会将客户端与被订阅的频道在 pubsub_channels字典中进行关联。

3. 退订

如果进行退订UNSUBSCRIBE,那么服务器会从pubsub_channels中接触客户端与被退订频道之间的关联。当这个key中,已经没有订阅者,那么会将这个key进行删除。例如下面的client7
在这里插入图片描述

4. 发布消息

当一个Redis客户端执行 PUBLISH channel message 命令将消息 message 发送给channel的时候,将消息发送给channel频道的所有订阅者(本文不讨论pattern模式)

服务注册与发现

我们了解完redis发布订阅流程之后,我们来基于这个发布订阅来实现一个服务注册与发现的功能。

Redis服务发现与注册流程图:
在这里插入图片描述

1. 对象定义

redis服务发现与注册的结构体

type RedisRegistryService struct {config *RedisConfig // the config about rediscli *redis.Client // client for redisrwLock *sync.RWMutex // rwLock lock groupList when update service instance// vgroupMapping to store the cluster group// eg: map[cluster_name_key]cluster_namevgroupMapping map[string]string// groupList store all addresses under this cluster// eg: map[cluster_name][]{service_instance1,service_instance2...}groupList map[string][]*ServiceInstancectx context.Context
}

订阅的消息内容,为key 以及 value ,而key就是服务的name,value就是服务的具体地址

type NotifyMessage struct {// key = registry.redis.${cluster}_ip:portKey   string `json:"key"`Value string `json:"value"`
}

2. 对象加载

新建一个redis服务注册与发现对象,并且在创建的这个对象的时候,我们会做两件事情

  1. redis中所已存在的key都load一次,存到本地缓存中。
  2. 开启一些协程进行发布订阅,不断监听上游的注册消息
func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService {if redisConfig == nil {log.Fatalf("redis config is nil")panic("redis config is nil")}cfg := &redis.Options{Addr:     redisConfig.ServerAddr,Username: redisConfig.Username,Password: redisConfig.Password,DB:       redisConfig.DB,}cli := redis.NewClient(cfg)vgroupMapping := config.VgroupMappinggroupList := make(map[string][]*ServiceInstance)redisRegistryService := &RedisRegistryService{config:        redisConfig,cli:           cli,ctx:           context.Background(),rwLock:        &sync.RWMutex{},vgroupMapping: vgroupMapping,groupList:     groupList,}// loading all server at init timeredisRegistryService.load()// subscribe at real timego redisRegistryService.subscribe()return redisRegistryService
}

3. 服务加载

load 函数:将所有 key 都 scan 出来,再遍历所有的key,拿到对应的value,进行一次初始化操作,加载到本地缓存中

func (s *RedisRegistryService) load() {// find all the server list redis register by redisFileKeyPrefixkeys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result()if err != nil {log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err)return}for _, key := range keys {clusterName := s.getClusterNameByKey(key)val, err := s.cli.Get(s.ctx, key).Result()if err != nil {log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err)continue}ins, err := s.getServerInstance(val)if err != nil {log.Errorf("RedisRegistryService-getServerInstance-val:%s, Err:%s", val, err)continue}// put server instance list in group lists.rwLock.Lock()if s.groupList[clusterName] == nil {s.groupList[clusterName] = make([]*ServiceInstance, 0)}s.groupList[clusterName] = append(s.groupList[clusterName], ins)s.rwLock.Unlock()}
}

4.服务发现

通过 key 从 vgroupMapping 找到对应的 value

func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) {s.rwLock.RLock()defer s.rwLock.RUnlock()cluster := s.vgroupMapping[key]if cluster == "" {err = fmt.Errorf("cluster doesnt exit")return}r = s.groupList[cluster]return
}

5. 服务注册

  1. key 和 value set到 redis
  2. key 和 value 通过 Channel 发布出去
  3. 另外开启一个协程将进行保活
func (s *RedisRegistryService) register(key, value string) (err error) {_, err = s.cli.HSet(s.ctx, key, value).Result()if err != nil {return}msg := &NotifyMessage{Key:   key,Value: value,}s.cli.Publish(s.ctx, redisRegisterChannel, msg)go func() {s.keepAlive(s.ctx, key)}()return
}

6. 服务订阅

订阅 Subscribe Channel 监听上游服务,并对服务的 key 和 value 进行更新操作。 注意这里对map进行读写的时候要加上读写锁,防止线程不安全。

func (s *RedisRegistryService) subscribe() {go func() {msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel()for msg := range msgs {var data *NotifyMessageerr := json.Unmarshal([]byte(msg.Payload), &data)if err != nil {log.Errorf("RedisRegistryService-subscribe-Subscribe-Err:%+v", err)continue}// get cluster name by keyclusterName := s.getClusterNameByKey(data.Key)ins, err := s.getServerInstance(data.Value)if err != nil {log.Errorf("RedisRegistryService-subscribe-getServerInstance-value:%s, Err:%s", data.Value, err)continue}s.rwLock.Lock()if s.groupList[clusterName] == nil {s.groupList[clusterName] = make([]*ServiceInstance, 0)}s.groupList[clusterName] = append(s.groupList[clusterName], ins)s.rwLock.Unlock()}}()return
}

注意一点:redis发布订阅消息是不存储到日志的,也没有ack确认。 所以如果发生的消息的丢失,就需要业务自己承担了,比如自己实现一个ack,发送的时候进行消息日志的存储。

完整代码:
https://github.com/CocaineCong/incubator-seata-go/blob/discovery/redis/pkg/discovery/redis.go


http://www.ppmy.cn/ops/19599.html

相关文章

密码学系列2-安全模型(CPA,CCA,selective,adaptive)

本章介绍了安全模型中的CPA,selective/adaptive CCA, EUF-CMA 加密的安全性模型定义: 一、选择明文攻击下的不可区分性(IND-CPA) 初始化:挑战者 C \mathcal{C} C运行初始化算法算法来获取系统参数。 阶段1:敌手 A \mathcal{A} A产生明文,加密的

李沐65_注意力分数——自学笔记

Additive Attention 等价于将key和value合并起来后放入到一个隐藏大小为h输出大小为1的单隐藏层 总结 1.注意力分数是query和key的相似度,注意力权重是分数的softmax结果 2.两种常见的分数计算: (1)将query和key合并起来进入一个单输出单…

《设计模式之美》第五章 总结

《设计模式之美》第五章 总结 第五章 重构技巧 5.1 重构四要素:目的、对象、时机和方法 5.1.1 重构的目的:为什么重构 定义:重构是一种对软件内部结构的改善,目的是在不改变软件对外部的可见行为的情况下,使其更容…

FPGA verilog 模板设计示例(持续更新)

重温一下大道至简的至简设计法,正式开发两年多回顾当时的设计方法,又有了更多的体会和感触,希望将模块化运用起来会更有条例。1 FPGA设计代码模板 信号命名规范: 1 clk 表示时钟信号小写 2 rstn 表示高电平复位信号 小写 3 rst_n…

MATLAB矩阵

MATLAB 矩阵 矩阵是数字的二维数组。 在MATLAB中,您可以通过在每行中以逗号或空格分隔的数字输入元素并使用分号标记每行的结尾来创建矩阵。 例如,让我们创建一个45矩阵一- 示例 a [ 1 2 3 4 5; 2 3 4 5 6; 3 4 5 6 7; 4 5 6 7 8] MATLAB将执行上述语…

gitlab关联新仓库

如果你想要将现有的Git仓库提交(或推送)到一个新的远程地址,你可以通过以下步骤来完成: 查看现有的远程仓库: 首先,确认你当前的仓库有哪些远程地址。 git remote -v如果输出中显示了旧的远程地址&#x…

粒子群算法与优化储能策略python实践

粒子群优化算法(Particle Swarm Optimization,简称PSO), 是1995年J. Kennedy博士和R. C. Eberhart博士一起提出的,它是源于对鸟群捕食行为的研究。粒子群优化算法的基本核心是利用群体中的个体对信息的共享从而使得整个群体的运动…

SSH Key生成

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…