K8S之自定义Controller

news/2024/10/31 1:33:27/

简介

在此之前我们先来了解下kubernetes的两个概念"声明式API"和"控制器模式"。
"声明式API"核心原理就是当用户向kubernetes提交了一个API对象的描述后,Kubernetes会负责为你保证整个集群里各项资源的状态,都与你的API对象描述的需求相一致。而对于每个保存在etcd里的API对象,kubernetes都通过启动一种叫做"控制器模式"的无限循环,不断检查,然后调谐,最后确保整个集群的状态与这个API对象的描述一致。在kubernetes中,我们所熟悉的deployment\statefulset都是其自带的一些控制器。

k8s提供了强大了扩展能力来操作它里面的资源,
这些资源可以是内置资源,比如pod、node等,也可以自定义资源CRD。

对于自定义资源,稍微麻烦些,我们需要做如下几步:

  1. 自定义CRD

  2. 通过k8s提供的代码生成器自动生成基础代码

  3. 编写自己的业务逻辑,也就是自定义controller来操作CRD

对于内置资源,我们就只需第三步就可以了。

注:CRD+自定义controller又被称作operator
https://github.com/operator-framework/awesome-operators
" href="https://github.com/operator-framework/awesome-operators%2a%2a%2a">https://github.com/operator-framework/awesome-operators*

自定义CRD

比如我们想模仿pod的yaml那样自定义一个资源:

apiVersion: xxxx/v1beta1
kind: TsTest
metadata:name: tstest-sample
spec:# Add fields heredeploymentName: tstest-sample-deploymentreplicas: 2

如果我们直接kubectl create 这个yaml文件会报错,因为k8s不认识这个资源,
那怎样才能让k8s识别我们的自定义资源呢,需要像下面这样创建一个自定义资源描述文件,标准格式如下:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:# 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>'name: tstests.stable.example.com
spec:# 组名称,用于 REST API: /apis/<组>/<版本>group: stable.example.com# 列举此 CustomResourceDefinition 所支持的版本versions:- name: v1# 每个版本都可以通过 served 标志来独立启用或禁止served: true# 其中一个且只有一个版本必需被标记为存储版本storage: trueschema:openAPIV3Schema:type: objectproperties:spec:type: objectproperties:cronSpec:type: stringimage:type: stringreplicas:type: integer# 可以是 Namespaced 或 Clusterscope: Namespacednames:# 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式>plural: tstests# 名称的单数形式,作为命令行使用时和显示时的别名singular: tstest# kind 通常是单数形式的驼峰编码(CamelCased)形式。你的资源清单会使用这一形式。kind: TsTest# shortNames 允许你在命令行使用较短的字符串来匹配资源shortNames:- ts

上面就定义了一个叫TsTest的crd,然后通过kubectl命令创建它,此时我们再次执行我们最开始定义的那个资源文件,就不会报错了,并且也可以像pod那样CRUD操作了
但是上面的那些都只是让k8s能够识别我们自定义的这个资源,当我们通过yaml文件创建这个资源的时候,k8s也只是将这个资源记录在了etcd中了而已,没有触发任何的业务逻辑。

自定义controller

k8s提供了一个专门的client-go库来简化开发者扩展k8s的代码编写,编写自定义controller也是基于client-go库,因此实现自定义controller大致就是:

  • 第1步是listwatch的初始化,主要是返回一个针对某类资源的ListFunc和WatchFunc。

  • 第2步是informer的初始化,新建Indexer并将上述Listwatcher一同放入informer结构中

  • 第3步是给informer添加AddEventHandler(通常包含add update delete),并根据情况引入workqueue(可是默认的也可以是限速、延时等队列)

  • 第4步是启动informer.Run(),然后等待同步完成

  • 第5步是worker的启动,监听退出信号等待退出。

其实一个controller就是一个生产者和消费者模式,上面的1-4对应的就是生产者操作,5步对应的就是消费者操作。

下面以简单的示例代码来看:

//Create the endpoints watcher,endpointsQueue,endpointController
endpointsListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(),"endpoints", v1.NamespaceAll, fields.Everything())
endpointsQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
endpointsIndexer, endpointsInformer := cache.NewIndexerInformer(endpointsListWatcher, &amp;v1.Endpoints{}, ReSyncPeriod,cache.ResourceEventHandlerFuncs{AddFunc:    c.addEndpoints,DeleteFunc: c.deleteEndpoints,UpdateFunc: c.updateEndpoints,}, cache.Indexers{})//Create the configmap watcher,endpointsQueue,endpointController
configmapListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(),"configmaps", v1.NamespaceAll, fields.Everything())
configmapQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
configmapIndexer, configmapInformer := cache.NewIndexerInformer(configmapListWatcher, &amp;v1.ConfigMap{}, ReSyncPeriod,cache.ResourceEventHandlerFuncs{AddFunc:   c.addConfigMap,DeleteFunc: c.deleteConfigMap,UpdateFunc: c.updateConfigMap,}, cache.Indexers{})c.endpointsQueue = endpointsQueue
c.endpointsIndexer = endpointsIndexer
c.endpointsInformer = endpointsInformerc.configmapQueue = configmapQueue
c.configmapIndexer = configmapIndexer
c.configmapInformer = configmapInformer

上面的代码就是对应1-2步

go c.endpointsInformer.Run(stopCh)
go c.configmapInformer.Run(stopCh)
go c.informerFactory.Start(stopCh)
go c.stsplusInformerFactory.Start(stopCh)//Wait for all involved caches to be synced, before processing items from the endpointsQueue is started
//if !cache.WaitForCacheSync(stopCh, c.endpointsInformer.HasSynced, c.configmapInformer.HasSynced) {
if !cache.WaitForCacheSync(stopCh, c.endpointsInformer.HasSynced, c.configmapInformer.HasSynced, c.podListerSynced, c.stsplusListerSynced) {runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))return
}

上面这段代码对应的就是3-4步。

for i := 0; i < threadiness; i++ {go wait.Until(c.runEndPointsProcessWorker, time.Second, stopCh)go wait.Until(c.runConfigmapProcessWorker, time.Second, stopCh)
}<-stopChfunc (c *Controller) processEndPointsEvents() bool {seelog.Infof("[processEndPointsEvents]: start to processEndPointsEvents")endpointsKey, endpointsQuit := c.endpointsQueue.Get()//Wait until there is a new item in the working endpointsQueuefor ;!endpointsQuit; endpointsKey, endpointsQuit = c.endpointsQueue.Get(){defer c.endpointsQueue.Done(endpointsKey)//Handle the error if something went wrong during the execution of the updateRoute methodgo c.handleRoute(endpointsKey.(string))//defer c.handleErr(endpointsErr, endpointsKey)//c.endpointsQueue.Done(endpointsKey)}return true
}

上面这段代码对应的就是5步。

到目前为止,一个自定义controller的模式化代码就完成了,剩下的就是具体的业务代码了,
因此可以看出,依靠k8s提供的这个client-go库,真正的做到了开发者只需关注业务开发,基础相关的代码都已经被封装了。

源码分析

代码太多,后面有时间再分析吧,先上个总结:

  1. Reflector使用ListAndWatch方法,先从apiserver中list某类资源的所有实例,拿到对象的最新版本,

  2. 然后用watch方法监听该resourceversion之后的所有变化,若中途出现异常,reflector则会从断开的resourceversion处重新监听所有变化

  3. 一旦有Add、Del、Update动作,Reflector会收到更新通知,该事件及它所对应的API对象这个组合,被称为增量Delta,它会被放进DeltaFIFO中

  4. Informer会不断从这个DeltaFIFO中读取增量,每拿出一个对象,Informer就会判断这个增量的事件类型,然后创建或更新本地的缓存。

  5. DeltaFIFO再pop这个事件到controller中,controller会调用事先注册到ResourceEventHandler回调函数进行处理。

【Kubebuilder】

因为对于k8s内置的资源来说,client-go已经提供了对应的infomer、cache、listenAndWatch等操作,但是对于自定义资源来说,如果要编写自定义controller,这些都得自己编写这些样板代码,基于此,社区针对k8s的api扩展,提供了一个通用的工具来自动生成脚手架,这个工具就是Kubebuilder

Kubebuilder 是一个基于 CRD 来构建 Kubernetes API 的框架,可以使用 CRD 来构建 API、Controller 和 Admission Webhook。Kubebuilder的工作流程如下:

  1. 创建一个新的工程目录;

  2. 创建一个或多个资源 API CRD 然后将字段添加到资源;

  3. 在控制器中实现协调循环(reconcile loop),watch 额外的资源;

  4. 在集群中运行测试(自动安装 CRD 并自动启动控制器);

  5. 更新引导集成测试测试新字段和业务逻辑;

  6. 使用用户提供的 Dockerfile 构建和发布容器。


http://www.ppmy.cn/news/62063.html

相关文章

产品经理该怎么催进度?

这算是一个项目管理相关的问题&#xff0c;很多公司会把产品经理与项目经理的工作职能划分并没有这么清晰&#xff0c;而且项目是否能够按时上线&#xff0c;在整个项目推进过程中也是至关重要的。如果是公司的自研产品&#xff0c;项目没办法定期交付&#xff0c;挨老板一顿骂…

K8s基础3——应用部署流程、服务编排、集群资源利用率、日志管理

文章目录 一、应用部署流程二、服务编排2.1 YAML文件格式说明2.2 部署应用2.2.1 命令部署2.2.2 yaml文件部署2.2.2.1 编写deployment.yaml文件2.2.2.2 编写service.yaml文件2.2.2.3 两个yaml文件混用2.2.2.4 测试——service和deployment的标签不一致导致访问网页混乱 2.2.3 自…

腹部肿瘤内科专家朱利明:化疗也能“订制”,晚期结直肠癌不再“无药可救”

肠癌是发生在结肠和直肠的癌症&#xff0c;近二三十年来发病率快速上升。就在近期&#xff0c;“日本女大胃王菅原初代患肠癌病逝”的消息登上热搜&#xff0c;一时引发网友关注热议。 “人生有哲学三问&#xff1a;我是谁&#xff1f;我从哪里来&#xff1f;我到哪里去&#x…

初识区块链

初识区块链 01货币的发展 货币从古到今一直存在&#xff0c;并且不断地发展。 从古代的贝壳、铜钱&#xff0c;到现代的纸币、电子支付&#xff0c;货币的演变历程就像是人类文明的一部分。在古代&#xff0c;人们用物物交换来满足自己的需求&#xff0c;但随着社会的发展和…

IPWorks SSH 2022.0.8505 C++ Edition Crack

IPWorks SSH 2022.0.8505 C Edition 轻松将安全外壳 &#xff08;SSH&#xff09; 安全性集成到您的互联网应用程序中。IPWorks SSH 库包括支持 SSH 的客户端、服务器和代理组件&#xff0c;支持强 SSH 2.0 加密和高级加密。 SSH库 SSH 文件传输和通信 借助 IPWorks SSH&#x…

JVM 程序计数器(PC 寄存器)

PC Register 介绍 JVM中的程序计数寄存器( Program Counter Register) 中&#xff0c;Register 的命名源于 CPU 的寄存器, 寄存器存储指令相关的现场信息。 CPU 只有把数据装载到寄存器才能够运行JVM 中的 PC 寄存器是对物理 PC 寄存器的一种抽象模拟PC 寄存器用来存储指向下一…

Java阶段二Day16

Java阶段二Day16 文章目录 Java阶段二Day16SSMSpringBoot简述核心特性创建SpringBoot工程创建工程失败排查 MyBatis框架-注解管理概述MyBatis环境初始化整合MyBatisPojo对象设计Dao接口设计 SSM Spring&#xff1a;Spring是一个轻量级的容器和框架&#xff0c;为开发者提供了一…

antd-vue - - - - - row-selection的使用

1. 正常需求使用 antd-vue table 官方示例代码如下&#xff1a; <template><a-table :row-selection"rowSelection" :columns"columns" :data-source"data"><template #bodyCell"{ column, text }"><template …