addNode生成peer
func (c *client) heartbeatStreamLoop() {defer c.wg.Done()for {select {case <-c.ctx.Done():returndefault:}ctx, cancel := context.WithCancel(c.ctx)c.connMu.RLock()stream, err := c.leaderClient().RegionHeartbeat(ctx)c.connMu.RUnlock()if err != nil {cancel()c.schedulerUpdateLeader()time.Sleep(retryInterval)continue}errCh := make(chan error, 1)wg := &sync.WaitGroup{}wg.Add(2)go c.reportRegionHeartbeat(ctx, stream, errCh, wg)go c.receiveRegionHeartbeat(stream, errCh, wg)select {case err := <-errCh:log.Warnf("[%s][scheduler] heartbeat stream get error: %s ", c.tag, err)cancel()c.schedulerUpdateLeader()time.Sleep(retryInterval)wg.Wait()case <-c.ctx.Done():log.Info("cancel heartbeat stream loop")cancel()return}}
}
func (m *MockSchedulerClient) RegionHeartbeat(req *schedulerpb.RegionHeartbeatRequest) error {if err := m.checkBootstrap(); err != nil {return err}m.Lock()defer m.Unlock()regionID := req.Region.GetId()for _, p := range req.Region.GetPeers() {delete(m.pendingPeers, p.GetId())}for _, p := range req.GetPendingPeers() {m.pendingPeers[p.GetId()] = p}m.leaders[regionID] = req.Leaderif err := m.handleHeartbeatVersion(req.Region); err != nil {return err}if err := m.handleHeartbeatConfVersion(req.Region); err != nil {return err}resp := &schedulerpb.RegionHeartbeatResponse{Header: &schedulerpb.ResponseHeader{ClusterId: m.clusterID},RegionId: regionID,RegionEpoch: req.Region.GetRegionEpoch(),TargetPeer: req.Leader,}if op := m.operators[regionID]; op != nil {if m.tryFinished(op, req.Region, req.Leader) {delete(m.operators, regionID)} else {m.makeRegionHeartbeatResponse(op, resp)}log.Debugf("[region %d] schedule %v", regionID, op)}store := m.stores[req.Leader.GetStoreId()]store.heartbeatResponseHandler(resp)return nil
}