1、Indexer 介绍
k8s-Informer之Indexer的解析(4)
Indexer 是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource 的对象。Indexer 中的数据始终要是与 ETCD 中数据一致的,当 client-go 需要数据时,可直接通过该本地缓存获取资源对象,不需要每次都从 APIServer中获取,这样就减轻了请求过多造成对 APIServer 、etcd的压力。
从 DeltaFIFO 中 Pop 出来的资源对象,交给了 HandlerDeltas 进行处理,在 HandleDeltas 中将资源对象同步到了 Indexer 中。
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { return processDeltas(s, s.indexer, s.transform, deltas) } return errors.New("object given as Process argument is not Deltas")
}// k8s.io/client-go/tools/cache/controller.go
func processDeltas( handler ResourceEventHandler, clientState Store, transformer TransformFunc, deltas Deltas) error { // from oldest to newest for _, d := range deltas { obj := d.Object if transformer != nil { var err error obj, err = transformer(obj) if err != nil { return err } } switch d.Type { case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(obj); err == nil && exists { if err := clientState.Update(obj); err != nil { return err } handler.OnUpdate(old, obj) } else { if err := clientState.Add(obj); err != nil { return err } handler.OnAdd(obj) } case Deleted: if err := clientState.Delete(obj); err != nil { return err } handler.OnDelete(obj) } } return nil
}
2、Indexer 结构体定义
Indexer 接口继承了一个Store接口(实现本地缓存)
type Indexer interface { Store // indexName索引类,obj 是对象 // 通过计算 obj 在 indexName 索引类中的索引键,通过索引键获取所有的对象Index(indexName string, obj interface{}) ([]interface{}, error) // indexKey 是 indexName 索引类中的⼀个索引键,函数返回 indexKey 指定的所有对象键IndexKeys(indexName, indexedValue string) ([]string, error) // 获取 indexName 索引类中的所有索引键 ListIndexFuncValues(indexName string) []string // 返回所有对象 ByIndex(indexName, indexedValue string) ([]interface{}, error) // GetIndexers return the indexers GetIndexers() Indexers // 就是增加更多的索引分类AddIndexers(newIndexers Indexers) error
}// store 接口,staging/src/k8s.io/client-go/tools/cache/store.go
type Store interface {Add(obj interface{}) errorUpdate(obj interface{}) errorDelete(obj interface{}) errorList() []interface{}ListKeys() []stringGet(obj interface{}) (item interface{}, exists bool, err error)GetByKey(key string) (item interface{}, exists bool, err error)Replace([]interface{}, string) errorResync() error
}
2.1、Store 结构体:
cache struct包含一个ThreadSafeStore接口的实现和计算object key的函数KeyFunc。cache struct会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象。
type cache struct { // cacheStorage ThreadSafeStore keyFunc KeyFunc
}
2.2、ThreadSafeStore
ThreadSafeStore 接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法。
type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers Resync() error
}
2.3、threadSafeMap
threadSafeMap struct 是ThreadSafeStore接口的一个实现
type threadSafeMap struct { lock sync.RWMutex // items 就是存放资源对象,key根据资源对象来算出,value为资源对象本身items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices
}
3、Indexer 索引功能
与索引功能相关定义在 threadSafeMap 结构体中
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices
}type IndexFunc func(obj interface{}) ([]string, error)type Index map[string]sets.String // Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc // Indices maps a name to an Index
type Indices map[string]Index
3.1、 MetaNamespaceIndexFunc 函数
MetaNamespaceIndexFunc 函数在 Kubernetes 中使用的比较多的索引函数,是一个默认索引函数,它基于对象的命名空间进行索引。
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil
}
3.2 、索引的实现 ByIndex() 函数
该方法传入索引器名称 indexName 和索引键名称indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。
大致逻辑:
(1)首先根据索引器名称查找指定的索引器函数;
(2)然后根据索引器名称查找相应的缓存器函数;
(3)再根据索引 Key (indexedValue)从缓存中进行数据查询, 并返回查询结果;
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() // 1、首先根据索引器名称查找指定的索引器函数indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } // 2、然后根据索引器名称查找相应的缓存器函数index := c.indices[indexName] // 3、根据索引 Key (indexedValue)从缓存中进行数据查询, 并返回查询结果set := index[indexedValue] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) } return list, nil
}
使用示例:
pods, err := index.ByIndex("namespace", "default")
if err != nil {panic(err)
}
for _, pod := range pods {fmt.Println(pod.(*v1.Pod).Name)
}fmt.Println("------")pods, err := index.ByIndex("nodename", "node1")
if err != nil {panic(err)
}
for _, pod := range pods {fmt.Println(pod.(*v1.Pod).Name)
}
输出:
pod-1
pod-2
------
pod-1
总结:
Indexer 具有维护本地缓存的功能,还有一个更重要的功能就是索引功能了,这个索引的目的就是实现快速查找。如要查某个 Node 下的所有 Pod,或查找某个命名空间下的所有pod等等。使用索引就能实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。
Informer