milvus datacoord启动源码分析

embedded/2024/9/25 21:27:49/

datacoord启动源码分析

结构体

// components.DataCoord
// DataCoord implements grpc server of DataCoord server
type DataCoord struct {ctx context.Contextsvr *grpcdatacoordclient.Server
}// grpcdatacoord.Server
// Server is the grpc server of datacoord
type Server structgrpcdatacoord.Contextcancel context.CancelFuncserverID atomic.Int64wg        sync.WaitGroupdataCoord types.DataCoordComponentetcdCli *clientv3.ClienttikvCli *txnkv.ClientgrpcErrChan chan errorgrpcServer  *grpc.Server
}

dataCoord是一个接口,实现dataCoord api功能。

func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {wg.Add(1)return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord)
}// creator用NewDataCoord替换
role, err = creator(ctx, factory)

components.NewDataCoord是一个函数。

NewDataCoord()用来创建DataCoord结构体。

// NewDataCoord creates a new DataCoord
func NewDataCoord(ctx context.Context, factory dependency.Factory) (*DataCoord, error) {s := grpcdatacoordclient.NewServer(ctx, factory)return &DataCoord{ctx: ctx,svr: s,}, nil
}

grpcdatacoordclient.NewServer()产生的是本结构体Server。

进入NewServer:

// NewServer new data service grpc server
func NewServer(ctx context.Context, factory dependency.Factory, opts ...datacoord.Option) *Server {ctx1, cancel := context.WithCancel(ctx)s := &Server{ctx:         ctx1,cancel:      cancel,grpcErrChan: make(chan error),}s.dataCoord = datacoord.CreateServer(s.ctx, factory, opts...)return s
}

datacoord.CreateServer()返回一个结构体datacoord.Server,是接口types.DataCoordComponent的实现。

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,localMsg bool,runWg *sync.WaitGroup,creator func(context.Context, dependency.Factory) (T, error),metricRegister func(*prometheus.Registry),
) component {var role Tsign := make(chan struct{})go func() {factory := dependency.NewFactory(localMsg)var err errorrole, err = creator(ctx, factory)if localMsg {paramtable.SetRole(typeutil.StandaloneRole)} else {paramtable.SetRole(role.GetName())}if err != nil {panic(err)}close(sign)// 在这里调用对应组件结构体的Run()方法,这里是components.DataCoord结构体if err := role.Run(); err != nil {panic(err)}runWg.Done()}()......
}

runComponent是一个包裹函数。

// Run starts service
func (s *DataCoord) Run() error {if err := s.svr.Run(); err != nil {log.Error("DataCoord starts error", zap.Error(err))return err}log.Debug("DataCoord successfully started")return nil
}

Run()方法调用s.svr.Run()方法。srv是datacoord.CreateServer()返回的结构体datacoord.Server。

// grpcdatacoord
// Run starts the Server. Need to call inner init and start method.
func (s *Server) Run() error {if err := s.init(); err != nil {return err}log.Debug("DataCoord init done ...")if err := s.start(); err != nil {return err}log.Debug("DataCoord start done ...")return nil
}

接下来分析s.init()和s.start()方法。

s.init()

func (s *Server) init() error {params := paramtable.Get()etcdConfig := &params.EtcdCfgetcdCli, err := etcd.GetEtcdClient(etcdConfig.UseEmbedEtcd.GetAsBool(),etcdConfig.EtcdUseSSL.GetAsBool(),etcdConfig.Endpoints.GetAsStrings(),etcdConfig.EtcdTLSCert.GetValue(),etcdConfig.EtcdTLSKey.GetValue(),etcdConfig.EtcdTLSCACert.GetValue(),etcdConfig.EtcdTLSMinVersion.GetValue())if err != nil {log.Debug("DataCoord connect to etcd failed", zap.Error(err))return err}s.etcdCli = etcdClis.dataCoord.SetEtcdClient(etcdCli)s.dataCoord.SetAddress(params.DataCoordGrpcServerCfg.GetAddress())if params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {log.Info("Connecting to tikv metadata storage.")tikvCli, err := getTiKVClient(&paramtable.Get().TiKVCfg)if err != nil {log.Warn("DataCoord failed to connect to tikv", zap.Error(err))return err}s.dataCoord.SetTiKVClient(tikvCli)log.Info("Connected to tikv. Using tikv as metadata storage.")}// 启动grpc,默认为13333err = s.startGrpc()if err != nil {log.Debug("DataCoord startGrpc failed", zap.Error(err))return err}// 执行真正的初始化if err := s.dataCoord.Init(); err != nil {log.Error("dataCoord init error", zap.Error(err))return err}return nil
}

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpc()启动grpc端口服务。

最终调用s.dataCoord.Init()进行初始化,代码位置:internal\datacoord\server.go

s.queryCoord是接口类型types.DataCoordComponent,DataCoordComponent继承于Component。

type DataCoordComponent interface {DataCoordSetAddress(address string)SetEtcdClient(etcdClient *clientv3.Client)SetTiKVClient(client *txnkv.Client)SetRootCoordClient(rootCoord RootCoordClient)SetDataNodeCreator(func(context.Context, string, int64) (DataNodeClient, error))SetIndexNodeCreator(func(context.Context, string, int64) (IndexNodeClient, error))
}// DataCoord is the interface `datacoord` package implements
type DataCoord interface {Componentdatapb.DataCoordServer
}// Component is the interface all services implement
type Component interface {Init() errorStart() errorStop() errorRegister() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续datacoord的init。

// Init change server state to Initializing
func (s *Server) Init() error {var err errors.factory.Init(Params)if err = s.initSession(); err != nil {return err}if s.enableActiveStandBy {......}// 执行真正的初始化return s.initDataCoord()
}

继续进入c.initDataCoord():

func (s *Server) initDataCoord() error {s.stateCode.Store(commonpb.StateCode_Initializing)var err errorif err = s.initRootCoordClient(); err != nil {return err}s.broker = NewCoordinatorBroker(s.rootCoordClient)storageCli, err := s.newChunkManagerFactory()if err != nil {return err}if err = s.initMeta(storageCli); err != nil {return err}s.handler = newServerHandler(s)if err = s.initCluster(); err != nil {return err}s.allocator = newRootCoordAllocator(s.rootCoordClient)s.initIndexNodeManager()if err = s.initServiceDiscovery(); err != nil {return err}if Params.DataCoordCfg.EnableCompaction.GetAsBool() {s.createCompactionHandler()s.createCompactionTrigger()}if err = s.initSegmentManager(); err != nil {return err}s.initGarbageCollection(storageCli)s.initIndexBuilder(storageCli)s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)return nil
}

从代码可以看出初始化是在填充datacoord结构体。

s.start()

启动组件的逻辑。

func (s *Server) start() error {err := s.dataCoord.Register()if err != nil {log.Debug("DataCoord register service failed", zap.Error(err))return err}err = s.dataCoord.Start()if err != nil {log.Error("DataCoord start failed", zap.Error(err))return err}return nil
}

s.dataCoord是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

进入s.dataCoord.Start():

func (s *Server) Start() error {if !s.enableActiveStandBy {s.startDataCoord()log.Info("DataCoord startup successfully")}return nil
}

真正执行启动逻辑在s.startDataCoord()。

func (s *Server) startDataCoord() {if Params.DataCoordCfg.EnableCompaction.GetAsBool() {s.compactionHandler.start()s.compactionTrigger.start()}s.startServerLoop()s.stateCode.Store(commonpb.StateCode_Healthy)sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID)
}

要详细知道启动querycoord组件做了什么事情,研究这个函数。


http://www.ppmy.cn/embedded/21340.html

相关文章

ubuntu安装源问题

一、 清华大学开源软件镜像站 https://mirrors.tuna.tsinghua.edu.cn/help/ubuntu/ 二、 python镜像源 1、临时配置 pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simplepip install -i https://pypi.tuna.tsinghua.edu.cn/simple pip -U --trusted…

js 下载音频的实现方式

通常下载文件我们会用到 <a> 标签&#xff0c;但是 a 标签在下载音频的时候会跳转到一个新页面进行播放&#xff0c;不会直接下载&#xff0c;这与我们的需求南辕北辙。这里我通过查询资料&#xff0c;找到了两种方式&#xff08;原理想通&#xff0c;也可以理解为一种&a…

智能手机加速度计和陀螺仪进行心律不齐以及心衰的检测

期刊地址&#xff0c;希望那位大佬根据这个期刊进行创业 &#xff0c;拿到NMPA证书&#xff0c;造福中国人&#xff01;太简便了这个方案。https://www.jacc.org/doi/full/10.1016/j.jchf.2024.01.022https://www.jacc.org/doi/full/10.1016/j.jchf.2024.01.022 背景与目的&…

IP地址的地理位置如何确定?

IP地址的地理位置确定是一个复杂且多步骤的过程&#xff0c;它依赖于多种技术和数据源来实现。下面将详细解释IP地址地理位置是如何被确定的。 首先&#xff0c;我们需要了解IP地址的基本结构。IP地址由一串数字组成&#xff0c;用于标识网络中的设备。这些数字实际上代表了设…

VisualGLM部署微调docker环境

一开始直接在本地环境部署,发现cuda版本冲突,所以改用docker,docke部署既不影响显卡性能,又可以避免环境冲突 1.创建docker容器 1.1. 拉取带有cuda11.7cudnn8的镜像 docker pull andersen9419/cuda11.7.1_cudnn8_ubu22_torch2.01.2.运行容器 docker run --gpus all --netho…

Leetcode30-最小展台数量(66)

1、题目 力扣嘉年华将举办一系列展览活动&#xff0c;后勤部将负责为每场展览提供所需要的展台。 已知后勤部得到了一份需求清单&#xff0c;记录了近期展览所需要的展台类型&#xff0c; demand[i][j] 表示第 i 天展览时第 j 个展台的类型。 在满足每一天展台需求的基础上&am…

233 基于matlab的多通道非负矩阵分解(MNMF)算法

基于matlab的多通道非负矩阵分解&#xff08;MNMF&#xff09;算法。其能够寻找到一个非负矩阵W和一个非负矩阵H&#xff0c;满足条件VW*H,从而将一个非负的矩阵分解为左右两个非负矩阵的乘积。使用EM准则对混合信号进行分解。程序已调通&#xff0c;可直接运行。 233 多通道非…

2024深圳杯数学建模竞赛D题(东三省数学建模竞赛D题):建立非均质音板振动模型与参数识别模型

更新完整代码和成品完整论文 《2024深圳杯&东三省数学建模思路代码成品论文》↓↓↓&#xff08;浏览器打开&#xff09; https://www.yuque.com/u42168770/qv6z0d/zx70edxvbv7rheu7?singleDoc# 2024深圳杯数学建模竞赛D题&#xff08;东三省数学建模竞赛D题&#xff0…