milvus对象存储和消息中间件的工厂设计模式分析

embedded/2024/10/18 18:26:26/

milvus_0">milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:type: pulsarcommon:storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {msgstream.Factory// Init()给工厂传递参数。Init(p *paramtable.ComponentParam)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {Init(p *paramtable.ComponentParam)NewMsgStream(ctx context.Context) (MsgStream, error)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {standAlone          boolchunkManagerFactory storage.FactorymsgStreamFactory    msgstream.Factory
}// storage.Factory
// internal\storage\factory.go
type Factory interface {NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {// skip if using default factoryif f.msgStreamFactory != nil {return}// 初始化chunkManagerFactoryf.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)// initialize mq client or embedded mq.// 初始化msgStreamFactoryif err := f.initMQ(f.standAlone, params); err != nil {panic(err)}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{persistentStorage: persistentStorage,config:            c,}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))switch mqType {case mqTypeNatsmq:f.msgStreamFactory = msgstream.NewNatsmqFactory()case mqTypeRocksmq:f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)case mqTypePulsar:f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)case mqTypeKafka:f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)}if f.msgStreamFactory == nil {return errors.New("failed to create MQ: check the milvus log for initialization failures")}return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {return f.newChunkManager(ctx, f.persistentStorage)
}func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {switch engine {case "local":return NewLocalChunkManager(RootPath(f.config.rootPath)), nilcase "minio":return newMinioChunkManagerWithConfig(ctx, f.config)case "remote":return NewRemoteChunkManager(ctx, f.config)default:return nil, errors.New("no chunk manager implemented with engine: " + engine)}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.ClientbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStoragebucketName stringrootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {Newer             func(context.Context) (mqwrapper.Client, error) // client constructorDispatcherFactory ProtoUDFactoryReceiveBufSize    int64MQBufSize         int64
}// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {dispatcherFactory ProtoUDFactoryconfig            *paramtable.KafkaConfigReceiveBufSize    int64MQBufSize         int64
}// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {dispatcherFactory ProtoUDFactory// the following members must be public, so that mapstructure.Decode() can access themPulsarAddress    stringPulsarWebAddress stringReceiveBufSize   int64MQBufSize        int64PulsarAuthPlugin stringPulsarAuthParams stringPulsarTenant     stringPulsarNameSpace  stringRequestTimeout   time.DurationmetricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {Close()AsProducer(channels []string)Produce(*MsgPack) errorSetRepackFunc(repackFunc RepackFunc)GetProduceChannels() []stringBroadcast(*MsgPack) (map[string][]MessageID, error)AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) errorChan() <-chan *MsgPackSeek(ctx context.Context, offset []*MsgPosition) errorGetLatestMsgID(channel string) (MessageID, error)CheckTopicValid(channel string) errorEnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {ctx              context.Contextclient           mqwrapper.Clientproducers        map[string]mqwrapper.ProducerproducerChannels []stringconsumers        map[string]mqwrapper.ConsumerconsumerChannels []stringrepackFunc    RepackFuncunmarshal     UnmarshalDispatcherreceiveBuf    chan *MsgPackcloseRWMutex  *sync.RWMutexstreamCancel  func()bufSize       int64producerLock  *sync.RWMutexconsumerLock  *sync.Mutexclosed        int32onceChan      sync.OnceenableProduce atomic.Value
}// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {*mqMsgStreamchanMsgBuf         map[mqwrapper.Consumer][]TsMsgchanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPositionchanStopChan       map[mqwrapper.Consumer]chan boolchanTtMsgTime      map[mqwrapper.Consumer]TimestampchanMsgBufMutex    *sync.MutexchanTtMsgTimeMutex *sync.RWMutexchanWaitGroup      *sync.WaitGrouplastTimeStamp      TimestampsyncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {// RootPath returns current root path.RootPath() string// Path returns path of @filePath.Path(ctx context.Context, filePath string) (string, error)// Size returns path of @filePath.Size(ctx context.Context, filePath string) (int64, error)// Write writes @content to @filePath.Write(ctx context.Context, filePath string, content []byte) error// MultiWrite writes multi @content to @filePath.MultiWrite(ctx context.Context, contents map[string][]byte) error// Exist returns true if @filePath exists.Exist(ctx context.Context, filePath string) (bool, error)// Read reads @filePath and returns content.Read(ctx context.Context, filePath string) ([]byte, error)// Reader return a reader for @filePathReader(ctx context.Context, filePath string) (FileReader, error)// MultiRead reads @filePath and returns content.MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)// ReadWithPrefix reads files with same @prefix and returns contents.ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.// if all bytes are read, @err is io.EOF.// return other error if read failed.ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)// Remove delete @filePath.Remove(ctx context.Context, filePath string) error// MultiRemove delete @filePaths.MultiRemove(ctx context.Context, filePaths []string) error// RemoveWithPrefix remove files with same @prefix.RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.Client//	ctx        context.ContextbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStorage//	ctx        context.ContextbucketName stringrootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂


http://www.ppmy.cn/embedded/17513.html

相关文章

Linux下怎么快速部署MySQL服务,并使用

下载镜像 [zrylocalhost ~]$ docker pull mysql Using default tag: latest latest: Pulling from library/mysql bce031bc522d: Pull complete cf7e9f463619: Pull complete 105f403783c7: Pull complete 878e53a613d8: Pull complete 2a362044e79f: Pull complete 6e4d…

Kubernetes:云原生时代的核心引擎

文章目录 一、Kubernetes简介&#xff1a;引领云原生潮流二、K8s的核心特性&#xff1a;自动化与智能化三、K8s的实践应用&#xff1a;打造高效云原生应用架构四、K8s的挑战与应对&#xff1a;安全与性能并重五、K8s的未来展望&#xff1a;无限可能与挑战并存《Kubernetes快速进…

STM32存储左右互搏 SDIO总线FATS文件读写SD/MicroSD/TF卡

STM32存储左右互搏 SDIO总线FATS文件读写SD/MicroSD/TF卡 SD/MicroSD/TF卡是基于FLASH的一种常见非易失存储单元&#xff0c;由接口协议电路和FLASH构成。市面上由不同尺寸和不同容量的卡&#xff0c;手机领域用的TF卡实际就是MicroSD卡&#xff0c;尺寸比SD卡小&#xff0c;而…

【yolov5yolov7yolov8火焰和烟雾检测】

火焰检测数据集和训练模型 YOLOv5训练好的火焰检测模型&#xff0c;并包含2000张标注好的火焰和烟雾数据集&#xff0c;标签格式为xml和txt两种&#xff0c;类别名为fire&#xff0c; 有QT界面 采用pytrch框架&#xff0c;代码是python的 火灾检测数据集-1 YOLOv3火焰识别训练…

Linux交换空间的创建使用

交换空间&#xff1a; 换出&#xff1a;将内存中不常用&#xff08;冷数据&#xff09;的放去硬盘里 换出&#xff1a;内存要使用这部分数据时&#xff0c;将硬盘的这部分数据放入内存 在内存和硬盘上用来交换数据的空间就是交换空间 创建交换空间的步骤 1.去磁盘上创建一个分…

NLP step by step -- 了解Transformer

Transformer模型 Transformer相关历史 首先我们先看一下有关Transformer模型的发展历史&#xff0c;下面的图是基于Transformer架构的一些关键模型节点&#xff1a; 图片来源于Hugging Face 图片来源于Hugging Face Transformer 架构 于 2017 年 6 月推出。原本研究的重点是…

AI容器化部署开发尝试 (一)(Pycharm连接docker,并部署django测试)

目标&#xff1a;使用容器化技术快速部署AI应用进行开发。 注意&#xff1a;从 Docker 19.03 开始&#xff0c;Docker 引入了对 NVIDIA GPU 的原生支持&#xff0c;因此若AI要调用GPU算力的话docker版本也是有要求的&#xff0c;后面博客测试。 当然本篇博客还没设计到GPU的调…

mysql根据datetime统计每日数据量

要统计MySQL新闻表中每天的数据量&#xff0c;你可以使用GROUP BY子句配合COUNT(*)函数来实现。假设你的新闻表名为news&#xff0c;发布时间字段名为publish_time&#xff0c;你可以按照以下SQL查询来统计每天的数据量&#xff1a; SELECT DATE(publish_time) AS publish_dat…