27.7 开启一致性哈希环变更监听处理和consul-watch服务

server/2024/10/24 12:15:09/

本节重点介绍 :

  • 开启一致性哈希环变更监听处理
    • 这个服务的节点变更了(节点宕机、扩容)就对哈希环进行重置
  • consul中watch 服务中节点变化
    • 遍历所有的service和变更chan的map,开启watch

开启一致性哈希环变更监听处理

  • 位置 service/shard_service.go
  • 当这个服务的节点变更了(节点宕机、扩容)
  • 通过consul的watch操作会通知到这里,也就是 this.NodeUpdateChan会有数据
  • 这时需要从 哈希环中获取节点信息oldNodes := this.ring.Members(),然后两边对对比
  • 如果节点不同则,更新哈希环this.ReShardRing(nodes)
func (this *ShardService) RunReshardHashRing() {level.Info(this.logger).Log("msg", "RunRefreshServiceNode start....")for {select {case nodes := <-this.NodeUpdateChan:oldNodes := this.ring.Members()sort.Strings(nodes)sort.Strings(oldNodes)isEq := StringSliceEqualBCE(nodes, oldNodes)if isEq == false {level.Info(this.logger).Log("msg", "RunReshardHashRing_node_update_reshard", "old_num", len(oldNodes), "new_num", len(nodes), "oldnodes", strings.Join(oldNodes, ","), "newnodes", strings.Join(nodes, ","))this.ReShardRing(nodes)} else {level.Info(this.logger).Log("msg", "RunReshardHashRing_node_same", "nodes", strings.Join(nodes, ","))}case <-this.ctx.Done():level.Info(this.logger).Log("msg", "RunReshardHashRingQuit")return}}
}

两个string切片比较 的函数

func StringSliceEqualBCE(a, b []string) bool {if len(a) != len(b) {return false}if (a == nil) != (b == nil) {return false}b = b[:len(a)]for i, v := range a {if v != b[i] {return false}}return true
}

reshard函数

func (ss *ShardService) ReShardRing(nodes []string) {ss.Lock()defer ss.Unlock()newRing := consistent.NewConsistent(common.Replicas)for _, node := range nodes {newRing.Add(node)}ss.ring = newRing}

在初始化完 ShardService后就开启上面的协程

  • service/shard_service.go NewShardService函数中
	s.SetNodes(cg.Nodes)// 开启一致性哈希环变更监听go s.RunReshardHashRing()return s

consul中watch 服务中节点变化

  • 位置 watch/consul.go WatchService方法
  • 调用consul api的watch功能 ,对指定的srvName进行watch
  • 并将变化的结果 塞入到nodeUpdateChan srvName对应的chan中
func (c *client) WatchService(srvName string, nodeUpdateChan chan<- []string) error {watchConfig := make(map[string]interface{})watchConfig["type"] = "service"watchConfig["service"] = srvNamewatchConfig["handler_type"] = "script"watchConfig["passingonly"] = truewatchPlan, err := watch.Parse(watchConfig)if err != nil {level.Error(c.logger).Log("msg", "create_Watch_by_watch_config_error", "srv_name", srvName, "error", err)return err}watchPlan.Handler = func(lastIndex uint64, result interface{}) {if entries, ok := result.([]*consul.ServiceEntry); ok {var hs []stringfor _, a := range entries {//hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port))hs = append(hs, a.Service.Address)}if len(hs) > 0 {level.Info(c.logger).Log("msg", "service_node_change_by_healthy_check", "srv_name", srvName, "num", len(hs), "detail", strings.Join(hs, " "))nodeUpdateChan <- hs}}}if err := watchPlan.Run(c.consulServerAddr); err != nil {level.Error(c.logger).Log("msg", "watchPlan_run_error", "srv_name", srvName, "error", err)return err}return nil}

遍历所有的service和变更chan的map,开启watch

  • 位置 watch/consul.go
func (c *client) RunRefreshServiceNode(ctx context.Context, srvNameChanMap map[string]chan<- []string) error {level.Info(c.logger).Log("msg", "RunRefreshServiceNode start....")for srvName, upChan := range srvNameChanMap {srvName := srvNameupChan := upChango func() {c.WatchService(srvName, upChan)}()}select {case <-ctx.Done():level.Info(c.logger).Log("msg", "RunRefreshServiceNode_receive_quit_signal_and_quit")return nil}
}

main中 使用 编排开启这个任务

  • main.go中
	{// WatchService   manager.g.Add(func() error {err := client.RunRefreshServiceNode(ctxAll, srvNameChanMap)if err != nil {level.Error(logger).Log("msg", "watchService_error", "error", err)}return err}, func(err error) {cancelAll()})}

同时 定义处理 信号的任务

	var g run.Group{// Termination handler.term := make(chan os.Signal, 1)signal.Notify(term, os.Interrupt, syscall.SIGTERM)cancel := make(chan struct{})g.Add(func() error {select {case <-term:level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")cancelAll()return nil//TODO clean work herecase <-cancel:level.Warn(logger).Log("msg", "server finally exit...")return nil}},func(err error) {close(cancel)},)}

运行结果 3.201是后面启动的

level=info ts=2021-08-29T15:22:47.400+08:00 caller=main.go:83 msg="NewConsulClient successfully" addr=192.168.3.200:8500
ts=2021-08-29T15:22:47.457+08:00 caller=log.go:168 level=info msg="RunRefreshServiceNode start...."
level=info ts=2021-08-29T15:22:47.457+08:00 caller=consul.go:124 msg="RunRefreshServiceNode start...."
level=info ts=2021-08-29T15:22:47.459+08:00 caller=consul.go:108 msg=service_node_change_by_healthy_check srv_name=scrape_prometheus_node_exporter num=1 detai
l=192.168.3.200
ts=2021-08-29T15:22:47.459+08:00 caller=log.go:168 level=info msg=RunReshardHashRing_node_same nodes=192.168.3.200
level=info ts=2021-08-29T15:24:19.122+08:00 caller=consul.go:108 msg=service_node_change_by_healthy_check srv_name=scrape_prometheus_node_exporter num=2 detai
l="192.168.3.200 192.168.3.201"
ts=2021-08-29T15:24:19.122+08:00 caller=log.go:168 level=info msg=RunReshardHashRing_node_update_reshard old_num=1 new_num=2 oldnodes=192.168.3.200 newnodes=1
92.168.3.200,192.168.3.201

本节重点总结 :

  • 开启一致性哈希环变更监听处理
    • 这个服务的节点变更了(节点宕机、扩容)就对哈希环进行重置
  • consul中watch 服务中节点变化
    • 遍历所有的service和变更chan的map,开启watch

http://www.ppmy.cn/server/134443.html

相关文章

python作业02.

进制转换 #编写代码&#xff0c;实现十进制数据转换二进制、二进制转换十进制、十进制转换十六进制、十六进制转换十进制的代码实现 #十进制转二进制 #定一个转换函数 def decimal_to_binary(decimal):if decimal 0:return "0"binary ""while decimal &…

10_ Linux软件安装指南:RPM、YUM、源码安装

系列文章导航&#xff1a;01_Linux基础操作CentOS7学习笔记-CSDN博客 文章目录 1. RPM包安装2. YUM包管理器3. 源码安装 在Linux系统中&#xff0c;软件安装是日常管理中的一项基本任务。本文将详细介绍三种常见的软件安装方法&#xff1a;RPM包安装、YUM包管理器安装和源码编…

牛客周赛63

https://ac.nowcoder.com/acm/contest/91592 好数 简单的判断两位数&#xff0c;且十位等于个位 #include <bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define int long long using namespace std; using ll long long; using pii …

SIP 业务举例之 Call Forwarding - No Answer(无应答呼叫转移)

目录 1. Call Forwarding - No Answer 简介 2. RFC5359 的 Call Forwarding - No Answer 信令流程 呼转开始 呼转完成 3. Call Forwording - No Answer 过程总结 博主wx:yuanlai45_csdn 博主qq:2777137742 想要 深入学习 5GC IMS 等通信知识(加入 51学通信),或者想要 …

原型模式和建造模式的区别

原型模式&#xff08;Prototype Pattern&#xff09;和建造者模式&#xff08;Builder Pattern&#xff09;虽然都是创建型设计模式&#xff0c;但它们的应用场景和实现方式有着显著的区别。以下是二者的详细对比&#xff1a; 1. 意图和应用场景 原型模式&#xff1a; 意图&a…

【有啥问啥】智能座舱中的ADDW认证是什么?

智能座舱中的ADDW认证是什么&#xff1f; 随着汽车行业的智能化转型&#xff0c;智能座舱作为现代汽车的核心组成部分&#xff0c;正以前所未有的速度改变着我们的驾驶与乘坐体验。它不仅集成了先进的娱乐、导航和信息交互功能&#xff0c;更重要的是&#xff0c;通过一系列安…

STM32G474之“运放OPAMP和ADC”以及“ADC和DMA”问题

在使用STM32G474之“运放OPAMP和ADC”&#xff0c;或“ADC和DMA”时&#xff0c;要注意一下几个问题。如果有有标准库&#xff0c;就不会用HAL库了。不是没有吗&#xff1f;凑合用吧。 问题1、将“DAC3通道1”通过内部连接到"运放OPAMP1"&#xff0c;运放输出到引脚…

使用pyqt创建一个移动的矩形

使用pyqt创建一个移动的矩形 程序功能概述效果详细代码 程序功能概述 程序的主要功能是在一个窗口内绘制一个矩形框&#xff0c;并使这个矩形框能够以固定的速度向右移动。当矩形框移动出窗口右侧边界时&#xff0c;它会重新出现在窗口的左侧。 效果 详细代码 import sys fr…