Skip to content

Commit

Permalink
fix: retry migration in case of errors (#2349)
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Jan 30, 2025
1 parent 572eb63 commit ca13f98
Showing 1 changed file with 79 additions and 64 deletions.
143 changes: 79 additions & 64 deletions instrumentor/runtimemigration/runtimemigration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/odigos-io/odigos/api/odigos/v1alpha1"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -22,9 +24,9 @@ type MigrationRunnable struct {
Logger logr.Logger
}

// This code ensures that migrationRunnable is categorized as an `Other` Runnable.
func (m *MigrationRunnable) NeedLeaderElection() bool {
return false
// make sure we run it only from one instance of an instrumentor
return true
}

func (m *MigrationRunnable) Start(ctx context.Context) error {
Expand Down Expand Up @@ -100,6 +102,68 @@ func odigosOriginalAnnotationFound(annotations map[string]string) bool {
return annotationFound
}

func (m *MigrationRunnable) handleSingleDeployment(ctx context.Context, dep *appsv1.Deployment) error {

freshDep := appsv1.Deployment{}
err := m.KubeClient.Get(ctx, client.ObjectKey{
Namespace: dep.Namespace,
Name: dep.Name,
}, &freshDep)
if err != nil {
m.Logger.Error(err, "Failed to fresh copy of a deployment", "Name", dep.GetName(), "Namespace", dep.GetNamespace())
return client.IgnoreNotFound(err)
}

instConfigName := workload.CalculateWorkloadRuntimeObjectName(freshDep.Name, workload.WorkloadKindDeployment)
freshInstConfig := v1alpha1.InstrumentationConfig{}
err = m.KubeClient.Get(ctx, client.ObjectKey{
Namespace: freshDep.Namespace,
Name: instConfigName,
}, &freshInstConfig)
if err != nil {
m.Logger.Error(err, "Failed to get fresh InstrumentationConfig", "Name", instConfigName, "Namespace", freshDep.Namespace)
return err
}

originalWorkloadEnvVar, err := envoverwrite.NewOrigWorkloadEnvValues(freshDep.Annotations)
if err != nil {
m.Logger.Error(err, "Failed to get original workload environment variables")
return err
}
runtimeDetailsByContainer := freshInstConfig.Status.RuntimeDetailsByContainer

for _, containerObject := range freshDep.Spec.Template.Spec.Containers {
err, _ = handleContainerRuntimeDetailsUpdate(
containerObject,
originalWorkloadEnvVar,
runtimeDetailsByContainer,
)
if err != nil {
return fmt.Errorf("failed to process container %s in deployment %s: %v", containerObject.Name, freshDep.Name, err)
}
}

if odigosOriginalAnnotationFound(freshDep.Annotations) {
deleteOriginalEnvAnnotationInPlace(&freshDep)
revertOriginalEnvAnnotationInPlace(originalWorkloadEnvVar, &freshDep.Spec.Template.Spec)
err = m.KubeClient.Update(ctx, &freshDep)
if err != nil {
m.Logger.Error(err, "Failed to revert deployment", "Name", freshDep.GetName(), "Namespace", freshDep.GetNamespace())
}
}

err = m.KubeClient.Status().Update(
ctx,
&freshInstConfig,
)
if err != nil {
m.Logger.Error(err, "Failed to update InstrumentationConfig status", "Name", freshDep.Name, "Namespace", freshDep.Namespace)
return err
}

return nil
}

func (m *MigrationRunnable) fetchAndProcessDeployments(ctx context.Context, kubeClient client.Client, namespaces map[string]map[string]*v1alpha1.InstrumentationConfig) error {
for namespace, workloadNames := range namespaces {
var deployments appsv1.DeploymentList
Expand All @@ -115,72 +179,23 @@ func (m *MigrationRunnable) fetchAndProcessDeployments(ctx context.Context, kube
continue
}

freshDep := appsv1.Deployment{}
err := kubeClient.Get(ctx, client.ObjectKey{
Namespace: dep.Namespace,
Name: dep.Name,
}, &freshDep)
if err != nil {
m.Logger.Error(err, "Failed to fresh copy of a deployment", "Name", dep.GetName(), "Namespace", dep.GetNamespace())
continue
}

originalWorkloadEnvVar, err := envoverwrite.NewOrigWorkloadEnvValues(freshDep.Annotations)
if err != nil {
m.Logger.Error(err, "Failed to get original workload environment variables")
continue
}

workloadInstrumentationConfigReference := workloadNames[freshDep.Name]
if workloadInstrumentationConfigReference == nil {
m.Logger.Error(err, "Failed to get InstrumentationConfig reference")
continue
}

// Fetching the latest state of the InstrumentationConfig resource from the Kubernetes API.
// This is necessary to ensure we work with the most up-to-date version of the resource, as it may
// have been modified by other processes or controllers in the cluster. Without this step, there is
// a risk of encountering conflicts or using stale data during operations on the InstrumentationConfig object.
err = m.KubeClient.Get(ctx, client.ObjectKey{
Namespace: workloadInstrumentationConfigReference.Namespace,
Name: workloadInstrumentationConfigReference.Name,
}, workloadInstrumentationConfigReference)

if err != nil {
m.Logger.Error(err, "Failed to get InstrumentationConfig", "Name", workloadInstrumentationConfigReference.Name,
"Namespace", workloadInstrumentationConfigReference.Namespace)
continue
}

runtimeDetailsByContainer := workloadInstrumentationConfigReference.Status.RuntimeDetailsByContainer

for _, containerObject := range freshDep.Spec.Template.Spec.Containers {
err, _ = handleContainerRuntimeDetailsUpdate(
containerObject,
originalWorkloadEnvVar,
runtimeDetailsByContainer,
)
// Retry logic for handling conflicts
err := wait.ExponentialBackoff(wait.Backoff{
Duration: 100 * time.Millisecond, // Initial wait time
Factor: 2.0, // Exponential factor
Jitter: 0.1, // Add randomness
Steps: 5, // Max retries
}, func() (bool, error) {
err := m.handleSingleDeployment(ctx, &dep)
if err != nil {
return fmt.Errorf("failed to process container %s in deployment %s: %v", containerObject.Name, freshDep.Name, err)
m.Logger.Info("Error during env migration, retrying", "Deployment", dep.Name)
return false, nil // Retry
}
}
return true, nil // Success, stop retrying
})

if odigosOriginalAnnotationFound(freshDep.Annotations) {
deleteOriginalEnvAnnotationInPlace(&freshDep)
revertOriginalEnvAnnotationInPlace(originalWorkloadEnvVar, &freshDep.Spec.Template.Spec)
err = kubeClient.Update(ctx, &freshDep)
if err != nil {
m.Logger.Error(err, "Failed to revert deployment", "Name", freshDep.GetName(), "Namespace", freshDep.GetNamespace())
}
}

err = kubeClient.Status().Update(
ctx,
workloadInstrumentationConfigReference,
)
if err != nil {
m.Logger.Error(err, "Failed to update InstrumentationConfig status", "Name", freshDep.Name, "Namespace", freshDep.Namespace)
continue
m.Logger.Error(err, "Failed to handle deployment with retries", "Name", dep.GetName(), "Namespace", dep.GetNamespace())
}
}
}
Expand Down

0 comments on commit ca13f98

Please sign in to comment.