写在前面:偶然发现了一段关于retry函数的优雅实现方法的代码,读下来觉得有许多值得学习的地方,特此记录下来。
retry函数简介
retry函数在日常的研发中用到的频率特别高,特别是连接kube-apiservert进行k8s资源的增删改查时,常常会因为网络等各种各样的问题而操作失败,因此我们一般都会在限定时间内进行限定次数的重试操作(比如15s内重试3次,每5s重试一次)。
通常我们的做法是考虑封装一个retry函数,用来执行重试操作。
如下所示:
- 我们封装一个函数类型的结构体ExecutionFunc,用来作为我们retry函数的参数。
- 在retry函数内部,我们调用"k8s.io/apimachinery/pkg/util/wait"的ConditionFunc函数,该函数的作用解释如下ConditionFunc returns true if the condition is satisfied, or an error if the loop should be aborted.我们在其设定退出的条件为execFunc执行成功或者重试次数达到3次,这样子可以避免一直处于重试状态。
// ExecutionFunc defines the execution function
type ExecutionFunc func() (err error)// ExecuteFuncWithRetry will exec given function with retry
func ExecuteFuncWithRetry(execFunc ExecutionFunc) error {if execFunc == nil {return errors.New("execution func can't be nil")}var lastErr errorretryCount, execFuncName := 0, runtime.FuncForPC(reflect.ValueOf(execFunc).Pointer()).Name()wrapConditionFunc := wait.ConditionFunc(func() (done bool, err error) {if retryCount >= constant.DefaultMaxRetries {return true, fmt.Errorf("failed to execFunc func %s after %d retries", execFuncName, constant.DefaultMaxRetries)}if err := execFunc(); err != nil {log.WithError(err).Error(fmt.Sprintf("failed to execFunc func %s", execFuncName))lastErr = errif apierrors.IsNotFound(err) || apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) {return true, err}retryCount++return false, nil}return true, nil})if err := wait.PollImmediate(constant.DefaultInterval, constant.DefaultTotalTimeout, wrapConditionFunc); err != nil {log.WithError(err).Error(fmt.Sprintf("failed to execFunc func %s", runtime.FuncForPC(reflect.ValueOf(execFunc).Pointer()).Name()))return lastErr}return nil
}
在封装完重试函数之后,我们可以封装针对k8s资源增删改查的相关函数,如下所示:
Create k8s资源对象
// CreateObjectWithRetry will create given kubernetes resource with retry
func CreateObjectWithRetry(ctx context.Context, kubeClient client.Client, object client.Object, resourceName string) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeClient.Create(ctx, object)})if err != nil {log.WithContext(ctx).WithError(err).Error(fmt.Sprintf("failed to create resource(%s/%s/%s)", resourceName, object.GetNamespace(), object.GetName()))return err}return nil
}
Delete k8s资源对象
// DeleteObjectWithRetry will delete given kubernetes resource with retry
func DeleteObjectWithRetry(ctx context.Context, kubeclient client.Client, object client.Object, resourceName string, opts ...client.DeleteOption) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeclient.Delete(ctx, object, opts...)})if err != nil {log.Error(ctx, fmt.Sprintf("failed to delete resource(%s/%s/%s): %v", resourceName, object.GetNamespace(), object.GetName(), err))return err}return nil
}
Update k8s资源对象
// UpdateObjectWithRetry will update given kubernetes resource with retry
func UpdateObjectWithRetry(ctx context.Context, kubeClient client.Client, object client.Object, resourceName string) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeClient.Update(ctx, object)})if err != nil {log.Error(fmt.Sprintf("failed to update resource(%s/%s/%s): %v", resourceName, object.GetNamespace(), object.GetName(), err))return err}return nil
}
Get k8s资源对象
// GetObjectWithRetry will get given kubernetes resource with retry
func GetObjectWithRetry(ctx context.Context, kubeclient client.Client, key client.ObjectKey, object client.Object, resourceName string) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeclient.Get(ctx, key, object)})if err != nil {log.Error(ctx, fmt.Sprintf("failed to get resource(%s/%s/%s): %v", resourceName, key.Namespace, key.Name, err))return err}return nil
}
List k8s资源对象
// ListObjectsWithRetry will list given object with retry.
func ListObjectsWithRetry(ctx context.Context, kubeclient client.Client, objectList client.ObjectList, resourceName string, opts ...client.ListOption) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeclient.List(ctx, objectList, opts...)})if err != nil {log.Error(ctx, fmt.Sprintf("failed to list resource(%s): %v", resourceName, err))return err}return nil
}
Merge Patch k8s资源对象
// PatchObjectWithRetry will patch given kubernetes resource with retry
func PatchObjectWithRetry(ctx context.Context, kubeClient client.Client, patchObject client.Object, currentObject client.Object, resourceName string) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeClient.Patch(ctx, patchObject, client.MergeFrom(currentObject))})if err != nil {log.Error(fmt.Sprintf("failed to patch resource(%s/%s/%s): %v", resourceName, currentObject.GetNamespace(), currentObject.GetName(), err))return err}return nil
}
Json Patch k8s资源对象
// JsonPatchObjectWithRetry will patch given kubernetes resource with retry
func JsonPatchObjectWithRetry(ctx context.Context, kubeClient client.Client, patchObject client.Object, patchData []byte, resourceName string) error {err := retry.ExecuteFuncWithRetry(func() (err error) {return kubeClient.Patch(ctx, patchObject, client.RawPatch(types.JSONPatchType, patchData))})if err != nil {log.Error(fmt.Sprintf("failed to patch resource(%s/%s/%s): %v", resourceName, patchObject.GetNamespace(), patchObject.GetName(), err))return err}return nil
}