kubernetest中wait.Until()方法的源码解读

news/2024/9/23 7:26:39/

概述

摘要:本文从源码层面解读了kubernetes源码中常用的wait.Until()方法的源码实现,并且本文也举例说明了wait.Until()方法的在kubernete源码中的典型使用场景。

wait.Until()源码解读

在Kubernetes源码中, 我们经常会读到wait.Until()函数,它的作用是在一个goroutine中执行一个函数,直到接收到停止信号。这个函数通常用于执行一些需要定期执行的任务。wait.Until源码位于k8s.io/apimachinery项目下,该项目是一个关于Kubernetes API资源的工具集。

Until()

Until()方法作用是每间隔 period 时间而周期性的执行f()函数,直到收到stopCh信号

f 是一个无参数的函数,表示要执行的任务。
period 是一个 time.Duration 类型的值,表示执行任务的周期。
stopCh 是一个 <-chan struct{} 类型的通道,用于接收停止信号

go">// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {// 内部调用了JitterUntil函数,并且将参数jitterfactor设置为0,sliding为trueJitterUntil(f, period, 0.0, true, stopCh)
}

JitterUntil()

Utill调用JitterUntil()并传入参数jitterFactor=0,sliding=true

jitterFactor=0的作用是,每隔 period 时间段,就执行一个f(). 假如jitterFactor不等于0,间隔时间就是“period加一个随机时间”。

sliding=true的作用是在f()执行后,再等待一个 period 定时周期。假如sliding=false,先等一个 period 定时周期,再执行f()

源码解读如下,请留意注释

go">
// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {// 定义一个定时器var t *time.Timer// 用于控制resetOrReuseTimer()函数,让resetOrReuseTimer()每次都返回以 jitteredPeriod 为周期的定时器t.Reset(jitteredPeriod)var sawTimeout boolfor {// 如果收到stop信号,就退出JitterUntil函数select {case <-stopCh:returndefault:}// period和jitteredPeriod,一起控制“抖动”时间,jitteredPeriod := period// 抖动的时间jitteredPeriod的范围是从period 到 period*(1+maxFactor)之间的随机时间;// 如果jitterFactor<=0时,就jitteredPeriod就不变。也就是时间不抖动(或者说波动)if jitterFactor > 0.0 {jitteredPeriod = Jitter(period, jitterFactor)}// 如果sliding为true,则跳过这步。if !sliding {// 用jitteredPeriod为时间周期,重置定时器t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)}func() {defer runtime.HandleCrash()f()}()// 如果sliding为true,则这里会执行if sliding {// 用jitteredPeriod为时间周期,重置定时器t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)}// NOTE: b/c there is no priority selection in golang// it is possible for this to race, meaning we could// trigger t.C and stopCh, and t.C select falls through.// In order to mitigate we re-check stopCh at the beginning// of every loop to prevent extra executions of f().select {case <-stopCh:returncase <-t.C:sawTimeout = true}}
}

Jitter()作用是返回一个波动时间,波动时间的范围是x附近的一个值。值的范围是:min=x, max=x*(1+maxFactor)

如果maxFactor小于0,wait的取值范围是: x<wait<2x 备注:(x的单位是duration)

如果maxFactor大于0,wait的取值范围是: x<x*(1+ (0~1)*maxFactor)<x(1+maxFactor)

源码解读如下,请留意注释

go">// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {if maxFactor <= 0.0 {maxFactor = 1.0}// wait = duration加一个随机时间// rand.Float64()是返回(0,1)之间的小数,// maxFactor = 1.0时,wait = x + (0~1) * maxFactor * x = x*(1+ (0~1)*1),故wait取值范围 (x<wait<2x)// wait = x + (0~1) * maxFactor * x = x*(1+ (0~1)*maxFactor)wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))return wait
}

resetOrReuseTimer用jitteredPeriod为时间周期,重置定时器.注意改函数不是线程安全的

源码解读如下,请留意注释

go">// resetOrReuseTimer avoids allocating a new timer if one is already in use.
// Not safe for multiple threads.
// 改函数不是线程安全的
func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {// 如果t==nil,就用d为时间周期,初始化一个定时器if t == nil {return time.NewTimer(d)}// sawTimeout为true会跳过 t.Cif !t.Stop() && !sawTimeout {<-t.C}// d为时间周期,重置一个定时器t.Reset(d)return t
}

测试

编写一个简单程序,测试wait.Until()方法的使用

go">package mainimport ("fmt""k8s.io/apimachinery/pkg/util/wait""time"
)func main() {// 创建一个停止通道stopCh := make(chan struct{})var num = 0// 每秒执行一次任务,直到stopCh被关闭go wait.Until(func() {fmt.Printf("do one times job,num = %v\n", num)num++}, 1*time.Second, stopCh)// 等待10秒,确保定时任务执行完毕time.Sleep(10 * time.Second)// 关闭停止通道,停止定时任务close(stopCh)
}
---------------执行与输出如下-----------------
dev ✗ $ go run main.go                                                                                                                                                
do one times job,num = 0
do one times job,num = 1
do one times job,num = 2
do one times job,num = 3
do one times job,num = 4
do one times job,num = 5
do one times job,num = 6
do one times job,num = 7
do one times job,num = 8
do one times job,num = 9

在这个例子中,Until 函数会在一个新的goroutine中执行一个函数,这个函数会打印当前时间。这个函数会每秒执行一次,直到接收到 stopCh 通道的信号。
注意,Until 函数不会返回任何值,它只是在一个新的goroutine中执行一个函数,直到接收到停止信号。

使用场景

kubernetes源码中,wait.Until()经常用于启动需要后台循环执行的任务。比如informer中的reflector的启动,kube-scheduler中的schedulerCache的启动。

示例1 reflector的启动

reflector是informer中的重要组件,用于实现对kube-apiserver的资源变化事件的监听与获取。关于reflector的详解,请见我的另一篇博文 informer中reflector机制的实现分析与源码解读

wait.Until在Informer中的使用场景如下,用于reflector的启动

go">// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {// 使用wait.Until方法,周期性的执行r.ListAndWatch()函数wait.Until(func() { 				// 启动List/Watch机制if err := r.ListAndWatch(stopCh); err != nil {	utilruntime.HandleError(err)}}, r.period, stopCh)
}

示例2 schedulerCache的启动

wait.Until在 kube-sheduler 中的使用场景如下,用于newSchedulerCache的启动

go">// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)// 启动cache的运行cache.run()return cache
}func (cache *schedulerCache) run() {// 利用go wait.Until()周期的运行cache.cleanupExpiredAssumedPods函数go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}

结论

wait.Until广泛的适用于kubernetes源码,通过对其源码的解读,我们了解到了其如何实现与使用场景。我们可以在平时日常开发中,如果有需要周期性的执行某项任务,除了可以go + for + select来自己实现外,不妨多多尝试wait.Until方法。

参考资料

Kubernete-v1.12源码


http://www.ppmy.cn/news/1514779.html

相关文章

视频达人的秘密武器:全能型剪辑软件深度剖析

剪辑视频&#xff0c;作为视频创作过程中的关键环节&#xff0c;其重要性不言而喻。无论是专业影视制作团队&#xff0c;还是热衷于Vlog创作的个人&#xff0c;都离不开一款强大且易用的视频剪辑工具。今天&#xff0c;就让我们一起踏上一场探索之旅&#xff0c;对市面上的视频…

鸿蒙OS promptAction的使用

效果如下&#xff1a; import { promptAction } from kit.ArkUIlet customDialogId: number 0Builder function customDialogBuilder() {Column() {Blank().height(30)Text(确定要删除吗&#xff1f;).fontSize(15)Blank().height(40)Row() {Button("取消").onClick…

MySQL集群+Keepalived实现高可用部署

Mysql高可用集群-双主双活-myqlkeeplived 一、特殊情况 常见案例&#xff1a;当生产环境中&#xff0c;当应用服务使用了mysql-1连接信息&#xff0c;在升级打包过程中或者有高频的数据持续写入【对数据一致性要求比较高的场景】&#xff0c;这种情况下&#xff0c;数据库连接…

单片机驱动彩屏最简方案:单片机_RA8889最小开发板驱动控制TFT彩屏介绍(二)硬件电路设计

本文介绍使用单片机RA8889来驱动和控制彩屏的最小方案。文章从RA8889的架构功能、硬件电路设计及软件设计三个方面来说明。 小编已发布多篇文章介绍了单片机RA8889来驱动控制彩屏&#xff0c;但是仍有不少单片机玩家可能对驱动彩屏还不算熟悉&#xff0c;在此加推一个短篇介绍…

C++设计模式1:单例模式(懒汉模式和饿汉模式,以及多线程问题处理)

饿汉单例模式 程序还没有主动获取实例对象&#xff0c;该对象就产生了&#xff0c;也就是程序刚开始运行&#xff0c;这个对象就已经初始化了。 class Singleton { public:~Singleton(){std::cout << "~Singleton()" << std::endl;}static Singleton* …

设计模式 - 行为型模式(第六章)

目录 6、行为型模式 6.1 模板方法模式 6.1.1 概述 6.1.2 结构 6.1.3 案例实现 6.1.3 优缺点 6.1.4 适用场景 6.1.5 JDK源码解析 6.2 策略模式 6.2.1 概述 6.2.2 结构 6.2.3 案例实现 6.2.4 优缺点 6.2.5 使用场景 6.2.6 JDK源码解析 6.3 命令模式 6.3.1 概述 …

非关系型数据库MongoDB(文档型数据库)介绍与使用实例

MongoDB介绍 MongoDB是一种开源的文档型数据库管理系统&#xff0c;它使用类似于JSON的BSON格式&#xff08;Binary JSON&#xff09;来存储数据。与传统关系型数据库不同&#xff0c;MongoDB不使用表和行的结构&#xff0c;而是采用集合&#xff08;Collection&#xff09;(My…

鸿蒙内核源码分析(特殊进程篇)

三个进程 鸿蒙有三个特殊的进程&#xff0c;创建顺序如下: 2号进程&#xff0c;KProcess&#xff0c;为内核态根进程.启动过程中创建.0号进程&#xff0c;KIdle为内核态第二个进程&#xff0c;它是通过KProcess fork 而来的.这有点难理解.1号进程&#xff0c;init&#xff0c…