本节重点介绍 :
开启一致性哈希环变更监听处理 这个服务的节点变更了(节点宕机、扩容)就对哈希环进行重置 consul中watch 服务中节点变化 遍历所有的service和变更chan的map,开启watch
开启一致性哈希环变更监听处理
位置 service/shard_service.go 当这个服务的节点变更了(节点宕机、扩容) 通过consul的watch操作会通知到这里,也就是 this.NodeUpdateChan会有数据 这时需要从 哈希环中获取节点信息oldNodes := this.ring.Members()
,然后两边对对比 如果节点不同则,更新哈希环this.ReShardRing(nodes)
func ( this * ShardService) RunReshardHashRing ( ) { level. Info ( this. logger) . Log ( "msg" , "RunRefreshServiceNode start...." ) for { select { case nodes := <- this. NodeUpdateChan: oldNodes := this. ring. Members ( ) sort. Strings ( nodes) sort. Strings ( oldNodes) isEq := StringSliceEqualBCE ( nodes, oldNodes) if isEq == false { level. Info ( this. logger) . Log ( "msg" , "RunReshardHashRing_node_update_reshard" , "old_num" , len ( oldNodes) , "new_num" , len ( nodes) , "oldnodes" , strings. Join ( oldNodes, "," ) , "newnodes" , strings. Join ( nodes, "," ) ) this. ReShardRing ( nodes) } else { level. Info ( this. logger) . Log ( "msg" , "RunReshardHashRing_node_same" , "nodes" , strings. Join ( nodes, "," ) ) } case <- this. ctx. Done ( ) : level. Info ( this. logger) . Log ( "msg" , "RunReshardHashRingQuit" ) return } }
}
两个string切片比较 的函数
func StringSliceEqualBCE ( a, b [ ] string ) bool { if len ( a) != len ( b) { return false } if ( a == nil ) != ( b == nil ) { return false } b = b[ : len ( a) ] for i, v := range a { if v != b[ i] { return false } } return true
}
reshard函数
func ( ss * ShardService) ReShardRing ( nodes [ ] string ) { ss. Lock ( ) defer ss. Unlock ( ) newRing := consistent. NewConsistent ( common. Replicas) for _ , node := range nodes { newRing. Add ( node) } ss. ring = newRing}
在初始化完 ShardService后就开启上面的协程
service/shard_service.go NewShardService函数中
s. SetNodes ( cg. Nodes) go s. RunReshardHashRing ( ) return s
consul中watch 服务中节点变化
位置 watch/consul.go WatchService方法 调用consul api的watch功能 ,对指定的srvName进行watch 并将变化的结果 塞入到nodeUpdateChan srvName对应的chan中
func ( c * client) WatchService ( srvName string , nodeUpdateChan chan <- [ ] string ) error { watchConfig := make ( map [ string ] interface { } ) watchConfig[ "type" ] = "service" watchConfig[ "service" ] = srvNamewatchConfig[ "handler_type" ] = "script" watchConfig[ "passingonly" ] = true watchPlan, err := watch. Parse ( watchConfig) if err != nil { level. Error ( c. logger) . Log ( "msg" , "create_Watch_by_watch_config_error" , "srv_name" , srvName, "error" , err) return err} watchPlan. Handler = func ( lastIndex uint64 , result interface { } ) { if entries, ok := result. ( [ ] * consul. ServiceEntry) ; ok { var hs [ ] string for _ , a := range entries { hs = append ( hs, a. Service. Address) } if len ( hs) > 0 { level. Info ( c. logger) . Log ( "msg" , "service_node_change_by_healthy_check" , "srv_name" , srvName, "num" , len ( hs) , "detail" , strings. Join ( hs, " " ) ) nodeUpdateChan <- hs} } } if err := watchPlan. Run ( c. consulServerAddr) ; err != nil { level. Error ( c. logger) . Log ( "msg" , "watchPlan_run_error" , "srv_name" , srvName, "error" , err) return err} return nil }
遍历所有的service和变更chan的map,开启watch
func ( c * client) RunRefreshServiceNode ( ctx context. Context, srvNameChanMap map [ string ] chan <- [ ] string ) error { level. Info ( c. logger) . Log ( "msg" , "RunRefreshServiceNode start...." ) for srvName, upChan := range srvNameChanMap { srvName := srvNameupChan := upChango func ( ) { c. WatchService ( srvName, upChan) } ( ) } select { case <- ctx. Done ( ) : level. Info ( c. logger) . Log ( "msg" , "RunRefreshServiceNode_receive_quit_signal_and_quit" ) return nil }
}
main中 使用 编排开启这个任务
{ g. Add ( func ( ) error { err := client. RunRefreshServiceNode ( ctxAll, srvNameChanMap) if err != nil { level. Error ( logger) . Log ( "msg" , "watchService_error" , "error" , err) } return err} , func ( err error ) { cancelAll ( ) } ) }
同时 定义处理 信号的任务
var g run. Group{ term := make ( chan os. Signal, 1 ) signal. Notify ( term, os. Interrupt, syscall. SIGTERM) cancel := make ( chan struct { } ) g. Add ( func ( ) error { select { case <- term: level. Warn ( logger) . Log ( "msg" , "Received SIGTERM, exiting gracefully..." ) cancelAll ( ) return nil case <- cancel: level. Warn ( logger) . Log ( "msg" , "server finally exit..." ) return nil } } , func ( err error ) { close ( cancel) } , ) }
运行结果 3.201是后面启动的
level = info ts = 2021 -08-29T15:22:47.400+08:00 caller = main.go:83 msg = "NewConsulClient successfully" addr = 192.168 .3.200:8500
ts = 2021 -08-29T15:22:47.457+08:00 caller = log.go:168 level = info msg = "RunRefreshServiceNode start...."
level = info ts = 2021 -08-29T15:22:47.457+08:00 caller = consul.go:124 msg = "RunRefreshServiceNode start...."
level = info ts = 2021 -08-29T15:22:47.459+08:00 caller = consul.go:108 msg = service_node_change_by_healthy_check srv_name = scrape_prometheus _node_exporter num = 1 detai
l = 192.168 .3.200
ts = 2021 -08-29T15:22:47.459+08:00 caller = log.go:168 level = info msg = RunReshardHashRing_node_same nodes = 192.168 .3.200
level = info ts = 2021 -08-29T15:24:19.122+08:00 caller = consul.go:108 msg = service_node_change_by_healthy_check srv_name = scrape_prometheus _node_exporter num = 2 detai
l = "192.168.3.200 192.168.3.201"
ts = 2021 -08-29T15:24:19.122+08:00 caller = log.go:168 level = info msg = RunReshardHashRing_node_update_reshard old_num = 1 new_num = 2 oldnodes = 192.168 .3.200 newnodes = 1
92.168 .3.200,192.168.3.201
本节重点总结 :
开启一致性哈希环变更监听处理 这个服务的节点变更了(节点宕机、扩容)就对哈希环进行重置 consul中watch 服务中节点变化 遍历所有的service和变更chan的map,开启watch