milvus querynode启动源码分析

ops/2024/10/18 19:28:51/

querynode启动源码分析

结构体

// QueryNode implements QueryNode grpc server
// cmd\components\query_node.go
type QueryNode struct {ctx context.Contextsvr *grpcquerynode.Server
}// Server is the grpc server of QueryNode.
type Server struct {querynode   types.QueryNodeComponentwg          sync.WaitGroupctx         context.Contextcancel      context.CancelFuncgrpcErrChan chan errorserverID atomic.Int64grpcServer *grpc.ServeretcdCli *clientv3.Client
}

querynode是一个接口,实现querynode api功能。

func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {wg.Add(1)// clear local storagerootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole)cleanLocalDir(queryDataLocalPath)// clear mmap dirmmapDir := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()if len(mmapDir) > 0 {cleanLocalDir(mmapDir)}return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
}// creator用NewQueryNode替换
role, err = creator(ctx, factory)

components.NewQueryNode是一个函数。

NewQueryNode()用来创建QueryNode结构体。

// NewQueryNode creates a new QueryNode
func NewQueryNode(ctx context.Context, factory dependency.Factory) (*QueryNode, error) {svr, err := grpcquerynode.NewServer(ctx, factory)if err != nil {return nil, err}return &QueryNode{ctx: ctx,svr: svr,}, nil
}

grpcquerynode.NewServer()产生的是本结构体Server。

// NewServer create a new QueryNode grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {ctx1, cancel := context.WithCancel(ctx)s := &Server{ctx:         ctx1,cancel:      cancel,querynode:   qn.NewQueryNode(ctx, factory),grpcErrChan: make(chan error),}return s, nil
}

qn.NewQueryNode()返回一个结构体,是 types.QueryNodeComponen接口的一个实现

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,localMsg bool,runWg *sync.WaitGroup,creator func(context.Context, dependency.Factory) (T, error),metricRegister func(*prometheus.Registry),
) component {var role Tsign := make(chan struct{})go func() {factory := dependency.NewFactory(localMsg)var err errorrole, err = creator(ctx, factory)if localMsg {paramtable.SetRole(typeutil.StandaloneRole)} else {paramtable.SetRole(role.GetName())}if err != nil {panic(err)}close(sign)// 在这里调用对应组件结构体的Run()方法,这里是QueryNode结构体if err := role.Run(); err != nil {panic(err)}runWg.Done()}()......
}

runComponent是一个包裹函数。

// Run starts service
func (q *QueryNode) Run() error {if err := q.svr.Run(); err != nil {log.Error("QueryNode starts error", zap.Error(err))return err}log.Debug("QueryNode successfully started")return nil
}

Run()方法调用q.svr.Run()方法。srv是grpcquerynode.NewServer()返回的结构体。

进入Run()方法:

// Run initializes and starts QueryNode's grpc service.
func (s *Server) Run() error {if err := s.init(); err != nil {return err}log.Debug("QueryNode init done ...")if err := s.start(); err != nil {return err}log.Debug("QueryNode start done ...")return nil
}

接下来分析s.init()和s.start()方法。

s.init()

// init initializes QueryNode's grpc service.
func (s *Server) init() error {etcdConfig := &paramtable.Get().EtcdCfgParams := &paramtable.Get().QueryNodeGrpcServerCfgif !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {paramtable.Get().Save(Params.Port.Key, fmt.Sprintf("%d", funcutil.GetAvailablePort()))log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port.GetAsInt()))}log.Debug("QueryNode", zap.Int("port", Params.Port.GetAsInt()))etcdCli, err := etcd.GetEtcdClient(etcdConfig.UseEmbedEtcd.GetAsBool(),etcdConfig.EtcdUseSSL.GetAsBool(),etcdConfig.Endpoints.GetAsStrings(),etcdConfig.EtcdTLSCert.GetValue(),etcdConfig.EtcdTLSKey.GetValue(),etcdConfig.EtcdTLSCACert.GetValue(),etcdConfig.EtcdTLSMinVersion.GetValue())if err != nil {log.Debug("QueryNode connect to etcd failed", zap.Error(err))return err}s.etcdCli = etcdClis.SetEtcdClient(etcdCli)s.querynode.SetAddress(Params.GetAddress())log.Debug("QueryNode connect to etcd successfully")s.wg.Add(1)// 启动grpc,默认端口为21123go s.startGrpcLoop(Params.Port.GetAsInt())// wait for grpc server loop starterr = <-s.grpcErrChanif err != nil {return err}s.querynode.UpdateStateCode(commonpb.StateCode_Initializing)log.Debug("QueryNode", zap.Any("State", commonpb.StateCode_Initializing))// 调用querynode的初始化方法if err := s.querynode.Init(); err != nil {log.Error("QueryNode init error: ", zap.Error(err))return err}return nil
}

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpcLoop()启动grpc端口服务。

最终调用s.querynode.Init()进行初始化,代码位置:internal\querynodev2\server.go

s.querynode是接口类型types.QueryNodeComponent,QueryNodeComponent继承于Component。

type QueryNodeComponent interface {QueryNodeUpdateStateCode(stateCode commonpb.StateCode)SetAddress(address string)GetAddress() stringSetEtcdClient(etcdClient *clientv3.Client)
}// QueryNode is the interface `querynode` package implements
type QueryNode interface {Componentquerypb.QueryNodeServer
}// Component is the interface all services implement
type Component interface {Init() errorStart() errorStop() errorRegister() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续querynode的init。

// Init function init historical and streaming module to manage segments
func (node *QueryNode) Init() error {var initError errornode.initOnce.Do(func() {// ctx := context.Background()log.Info("QueryNode session info", zap.String("metaPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))err := node.initSession()if err != nil {log.Error("QueryNode init session failed", zap.Error(err))initError = errreturn}err = node.initHook()if err != nil {// auto index cannot work if hook init failedif paramtable.Get().AutoIndexConfig.Enable.GetAsBool() {log.Error("QueryNode init hook failed", zap.Error(err))initError = errreturn}}node.factory.Init(paramtable.Get())localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()localChunkManager := storage.NewLocalChunkManager(storage.RootPath(localRootPath))localUsedSize, err := segments.GetLocalUsedSize(localRootPath)if err != nil {log.Warn("get local used size failed", zap.Error(err))initError = errreturn}metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024))remoteChunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)if err != nil {log.Warn("failed to init remote chunk manager", zap.Error(err))initError = errreturn}node.cacheChunkManager, err = storage.NewVectorChunkManager(node.ctx,localChunkManager,remoteChunkManager,paramtable.Get().QueryNodeCfg.CacheMemoryLimit.GetAsInt64(),paramtable.Get().QueryNodeCfg.CacheEnabled.GetAsBool(),)if err != nil {log.Error("failed to init cache chunk manager", zap.Error(err))initError = errreturn}node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.ctx)if err != nil {log.Error("QueryNode init vector storage failed", zap.Error(err))initError = errreturn}log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()node.scheduler = tasks.NewScheduler(schedulePolicy,)log.Info("queryNode init scheduler", zap.String("policy", schedulePolicy))node.clusterManager = cluster.NewWorkerManager(func(ctx context.Context, nodeID int64) (cluster.Worker, error) {if nodeID == paramtable.GetNodeID() {return NewLocalWorker(node), nil}sessions, _, err := node.session.GetSessions(typeutil.QueryNodeRole)if err != nil {return nil, err}addr := ""for _, session := range sessions {if session.ServerID == nodeID {addr = session.Addressbreak}}client, err := grpcquerynodeclient.NewClient(ctx, addr, nodeID)if err != nil {return nil, err}return cluster.NewRemoteWorker(client), nil})node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()node.subscribingChannels = typeutil.NewConcurrentSet[string]()node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()node.manager = segments.NewManager()node.loader = segments.NewLoader(node.manager, node.vectorStorage)node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())// init pipeline managernode.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)err = node.InitSegcore()if err != nil {log.Error("QueryNode init segcore failed", zap.Error(err))initError = errreturn}if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() {if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() {action := func(GOGC uint32) {debug.SetGCPercent(int(GOGC))}gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)} else {action := func(uint32) {}gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)}}log.Info("query node init successfully",zap.Int64("queryNodeID", paramtable.GetNodeID()),zap.String("Address", node.address),)})return initError
}

从代码可以看出初始化是在填充QueryNode结构体。

s.start()

启动组件的逻辑。

// start starts QueryNode's grpc service.
func (s *Server) start() error {if err := s.querynode.Start(); err != nil {log.Error("QueryNode start failed", zap.Error(err))return err}if err := s.querynode.Register(); err != nil {log.Error("QueryNode register service failed", zap.Error(err))return err}return nil
}

s.querynode是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

// Start mainly start QueryNode's query service.
func (node *QueryNode) Start() error {node.startOnce.Do(func() {node.scheduler.Start()paramtable.SetCreateTime(time.Now())paramtable.SetUpdateTime(time.Now())mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()mmapEnabled := len(mmapDirPath) > 0node.UpdateStateCode(commonpb.StateCode_Healthy)registry.GetInMemoryResolver().RegisterQueryNode(paramtable.GetNodeID(), node)log.Info("query node start successfully",zap.Int64("queryNodeID", paramtable.GetNodeID()),zap.String("Address", node.address),zap.Bool("mmapEnabled", mmapEnabled),)})return nil
}

node节点都没有standby,coord节点有standby。


http://www.ppmy.cn/ops/9973.html

相关文章

怎么通过Javascript脚本实现远程控制一路开关

怎么通过Javascript脚本实现远程控制一路开关呢&#xff1f; 本文描述了使用Javascript脚本调用HTTP接口&#xff0c;实现控制一路开关。一路开关可控制一路照明、排风扇等电器。 可选用产品&#xff1a;可根据实际场景需求&#xff0c;选择对应的规格 序号设备名称1智能WiFi…

3D开发工具HOOPS助力CAM软件优化制造流程

在现代制造业中&#xff0c;计算机辅助制造&#xff08;CAM&#xff09;软件的发展已成为提高生产效率和产品质量的关键。为了满足不断增长的需求和日益复杂的制造流程&#xff0c;CAM软件需要具备高效的CAD数据导入、云端协作、移动应用支持以及丰富的文档生成能力。 Tech So…

【Ansible】04

【Ansible】03 任务块 block任务块 使用 block 可以将多个任务合并为一个组可以将整个 block任务组 , 一起控制是否要执行 # 如果webservers组中的主机系统发行版是Rocky&#xff0c;则安装并启动nginx [rootpubserver ansible]# vim block1.yml --- - name: block tasksho…

微软刚开源就删库的WizardLM-2:MT-Bench 榜单评测超越GPT-4,7B追平Qwen1.5-32B

前言 微软最近发布的WizardLM-2大型语言模型因其先进的技术规格和短暂的开源后突然撤回&#xff0c;引起了科技界的广泛关注。WizardLM-2包括三个不同规模的模型&#xff0c;分别是8x22B、70B和7B&#xff0c;均展现了在多语言处理、复杂对话、推理和代理任务上的卓越能力。 H…

深度神经网络(DNN)

通过5个条件判定一件事情是否会发生&#xff0c;5个条件对这件事情是否发生的影响力不同&#xff0c;计算每个条件对这件事情发生的影响力多大&#xff0c;写一个深度神经网络&#xff08;DNN&#xff09;模型程序,最后打印5个条件分别的影响力。 示例 在深度神经网络&#xf…

设计模式之模板方法模式详解(下)

3&#xff09;钩子方法的使用 1.概述 钩子方法的引入使得子类可以控制父类的行为。 2.结构图 3.代码实现 将公共方法和框架代码放在抽象父类中 abstract class DataViewer {//抽象方法&#xff1a;获取数据public abstract void GetData();//具体方法&#xff1a;转换数据…

旅游陪同翻译难吗, 旅游翻译英译中哪家好?

近来&#xff0c;随着中国旅游业的蓬勃发展&#xff0c;旅游陪同翻译的需求也水涨船高&#xff0c;这些专业的翻译服务者为中外游客搭建起友谊的桥梁&#xff0c;引领他们共同探索中国这片古老而神秘的土地 。那么&#xff0c;旅游陪同翻译英译中难吗&#xff1f;我们如何在众多…

Matlab数字图像处理-直方图均衡化

下图存在明显的光照不均&#xff0c;设计算法对其进行校正。 算法原理&#xff1a; 采用直方图均衡化的方法进行校正。 直方图均衡化是一种用于增强图像对比度的技术&#xff0c;它通过重新分配图像灰度级的概率分布来实现。直方图均衡化通过重新分配像素的灰度级&#xff0c…