Skip to content

Commit

Permalink
fix: retry manifest updates in upgrade-k8s
Browse files Browse the repository at this point in the history
This showed up recently frequently in integration-provision tests
(might be related to Kubernetes upgrade), but anyways errors should be
retried.

Refactored the function to extract the retryable part.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Apr 1, 2022
1 parent eeb7561 commit 2ca5279
Showing 1 changed file with 77 additions and 44 deletions.
121 changes: 77 additions & 44 deletions pkg/cluster/kubernetes/talos_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type UpgradeProvider interface {
}

var deprecations = map[string][]string{
// https://kubernetes.io/blog/2021/07/14/upcoming-changes-in-kubernetes-1-22/#api-changes
// https://kubernetes.io/docs/reference/using-api/deprecation-guide/
"1.21->1.22": {
"validatingwebhookconfigurations.v1beta1.admissionregistration.k8s.io",
"mutatingwebhookconfigurations.v1beta1.admissionregistration.k8s.io",
Expand Down Expand Up @@ -394,7 +394,61 @@ func getManifests(ctx context.Context, cluster UpgradeProvider) ([]*unstructured
}
}

//nolint:gocyclo,cyclop
func updateManifest(
ctx context.Context,
mapper *restmapper.DeferredDiscoveryRESTMapper,
k8sClient dynamic.Interface,
obj *unstructured.Unstructured,
dryRun bool,
) (
resp *unstructured.Unstructured,
diff string,
skipped bool,
err error,
) {
mapping, err := mapper.RESTMapping(obj.GroupVersionKind().GroupKind(), obj.GroupVersionKind().Version)
if err != nil {
err = fmt.Errorf("error creating mapping for object %s: %w", obj.GetName(), err)

return nil, "", false, err
}

var dr dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
// namespaced resources should specify the namespace
dr = k8sClient.Resource(mapping.Resource).Namespace(obj.GetNamespace())
} else {
// for cluster-wide resources
dr = k8sClient.Resource(mapping.Resource)
}

exists := true

diff, err = getResourceDiff(ctx, dr, obj)
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, "", false, err
}

exists = false
diff = "resource is going to be created"
}

switch {
case dryRun:
return nil, diff, exists, nil
case !exists:
resp, err = dr.Create(ctx, obj, metav1.CreateOptions{})
case diff != "":
resp, err = dr.Update(ctx, obj, metav1.UpdateOptions{})
default:
skipped = true
}

return resp, diff, skipped, err
}

//nolint:gocyclo
func syncManifests(ctx context.Context, objects []*unstructured.Unstructured, cluster UpgradeProvider, options UpgradeOptions) error {
config, err := cluster.K8sRestConfig(ctx)
if err != nil {
Expand All @@ -418,70 +472,49 @@ func syncManifests(ctx context.Context, objects []*unstructured.Unstructured, cl

mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))

var (
resp *unstructured.Unstructured
mapping *meta.RESTMapping
// list of deployments to wait for to become ready after update
deployments []*unstructured.Unstructured
)
// list of deployments to wait for to become ready after update
var deployments []*unstructured.Unstructured

options.Log("updating manifests")

for _, obj := range objects {
mapping, err = mapper.RESTMapping(obj.GroupVersionKind().GroupKind(), obj.GroupVersionKind().Version)
if err != nil {
return fmt.Errorf("error creating mapping for object %s: %w", obj.GetName(), err)
}

var dr dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
// namespaced resources should specify the namespace
dr = k8sClient.Resource(mapping.Resource).Namespace(obj.GetNamespace())
} else {
// for cluster-wide resources
dr = k8sClient.Resource(mapping.Resource)
}

var diff string
options.Log(" > processing manifest %s %s", obj.GetKind(), obj.GetName())

var (
resp *unstructured.Unstructured
diff string
skipped bool
)

err = retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second), retry.WithErrorLogging(true)).RetryWithContext(ctx, func(ctx context.Context) error {
resp, diff, skipped, err = updateManifest(ctx, mapper, k8sClient, obj, options.DryRun)
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

exists := true
return err
})

diff, err = getResourceDiff(ctx, dr, obj)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}

exists = false
diff = "resource is going to be created"
return err
}

options.Log(" > apply manifest %s %s", obj.GetKind(), obj.GetName())

switch {
case options.DryRun:
var diffInfo string
if diff != "" {
diffInfo = fmt.Sprintf(", diff:\n%s", diff)
}

options.Log(" > apply skipped in dry run%s", diffInfo)
options.Log(" < apply skipped in dry run%s", diffInfo)

continue
case !exists:
resp, err = dr.Create(ctx, obj, metav1.CreateOptions{})
case diff != "":
resp, err = dr.Update(ctx, obj, metav1.UpdateOptions{})
default:
options.Log(" > apply skipped: nothing to update")
case skipped:
options.Log(" < apply skipped: nothing to update")

continue
}

if err != nil {
return err
}

if resp.GetKind() == "Deployment" {
deployments = append(deployments, resp)
}
Expand Down

0 comments on commit 2ca5279

Please sign in to comment.