Skip to content

Commit

Permalink
add more features
Browse files Browse the repository at this point in the history
  • Loading branch information
edeNFed committed Nov 10, 2024
1 parent 9f5d04e commit cf86ff2
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 50 deletions.
55 changes: 44 additions & 11 deletions cli/cmd/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ package cmd
import (
"bufio"
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"

Expand All @@ -25,10 +29,11 @@ import (
)

const (
excludeNamespacesFileFlag = "exclude-namespaces-file"
excludeAppsFileFlag = "exclude-apps-file"
skipPreflightCheckFlag = "skip-preflight-checks"
dryRunFlag = "dry-run"
excludeNamespacesFileFlag = "exclude-namespaces-file"
excludeAppsFileFlag = "exclude-apps-file"
skipPreflightCheckFlag = "skip-preflight-checks"
dryRunFlag = "dry-run"
instrumentationCollOffFlag = "instrumentation-cool-off"
)

// instrumentCmd represents the instrument command
Expand All @@ -46,6 +51,16 @@ var clusterCmd = &cobra.Command{
Long: `Instrument entire cluster with Odigos. This command will instrument the entire Kubernetes cluster with
Odigos CLI and monitor the instrumentation status.`,
Run: func(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(cmd.Context())
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(ch)

go func() {
<-ch
cancel()
}()

excludedNs, err := readFileLines(cmd.Flag(excludeNamespacesFileFlag).Value.String())
if err != nil {
fmt.Printf("\033[31mERROR\033[0m Cannot read exclude-namespaces-file: %s\n", err)
Expand All @@ -58,6 +73,12 @@ Odigos CLI and monitor the instrumentation status.`,
os.Exit(1)
}
dryRun := cmd.Flag(dryRunFlag).Changed && cmd.Flag(dryRunFlag).Value.String() == "true"
coolOffStr := cmd.Flag(instrumentationCollOffFlag).Value.String()
coolOff, err := time.ParseDuration(coolOffStr)
if err != nil {
fmt.Printf("\033[31mERROR\033[0m Invalid duration for instrumentation-cool-off: %s\n", err)
os.Exit(1)
}

fmt.Printf("About to instrument an entire cluster with Odigos\n")
if dryRun {
Expand All @@ -75,14 +96,14 @@ Odigos CLI and monitor the instrumentation status.`,
fmt.Printf("\u001B[32mPASS\u001B[0m\n\n")
}

runPreflightChecks(cmd.Context(), cmd, client)
runPreflightChecks(ctx, cmd, client)

fmt.Printf("Starting instrumentation ...\n")
instrumentCluster(cmd.Context(), client, excludedNs, excludedApps, dryRun)
instrumentCluster(ctx, client, excludedNs, excludedApps, dryRun, coolOff)
},
}

func instrumentCluster(ctx context.Context, client *kube.Client, excludedNs, excludedApps map[string]struct{}, dryRun bool) {
func instrumentCluster(ctx context.Context, client *kube.Client, excludedNs, excludedApps map[string]struct{}, dryRun bool, coolOff time.Duration) {
systemNs := sliceToMap(consts.SystemNamespaces)
nsList, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -105,15 +126,18 @@ func instrumentCluster(ctx context.Context, client *kube.Client, excludedNs, exc
continue
}

instrumentNamespace(ctx, client, ns.Name, excludedApps, orchestrator, dryRun)
err = instrumentNamespace(ctx, client, ns.Name, excludedApps, orchestrator, dryRun, coolOff)
if errors.Is(err, context.Canceled) {
return
}
}
}

func instrumentNamespace(ctx context.Context, client *kube.Client, ns string, excludedApps map[string]struct{}, orchestrator *lifecycle.Orchestrator, dryRun bool) {
func instrumentNamespace(ctx context.Context, client *kube.Client, ns string, excludedApps map[string]struct{}, orchestrator *lifecycle.Orchestrator, dryRun bool, coolOff time.Duration) error {
deps, err := client.AppsV1().Deployments(ns).List(ctx, metav1.ListOptions{})
if err != nil {
fmt.Printf(" - \033[31mERROR\033[0m Cannot list deployments: %s\n", err)
return
return nil
}

for _, dep := range deps.Items {
Expand All @@ -129,14 +153,22 @@ func instrumentNamespace(ctx context.Context, client *kube.Client, ns string, ex
continue
}

orchestrator.Apply(ctx, &dep, func(ctx context.Context, name string, namespace string) (*corev1.PodTemplateSpec, error) {
err = orchestrator.Apply(ctx, &dep, func(ctx context.Context, name string, namespace string) (*corev1.PodTemplateSpec, error) {
dep, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return &dep.Spec.Template, nil
})

if errors.Is(err, context.Canceled) {
return err
}

time.Sleep(coolOff)
}

return nil
}

func runPreflightChecks(ctx context.Context, cmd *cobra.Command, client *kube.Client) {
Expand Down Expand Up @@ -191,6 +223,7 @@ func init() {
clusterCmd.Flags().String(excludeAppsFileFlag, "", "File containing applications to exclude from instrumentation. Each application should be on a new line.")
clusterCmd.Flags().Bool(skipPreflightCheckFlag, false, "Skip preflight checks")
clusterCmd.Flags().Bool(dryRunFlag, false, "Dry run mode")
clusterCmd.Flags().Duration(instrumentationCollOffFlag, 0, "Cool-off period for instrumentation")
}

func sliceToMap(slice []string) map[string]struct{} {
Expand Down
135 changes: 135 additions & 0 deletions cli/pkg/lifecycle/rollback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package lifecycle

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/types"

"github.com/odigos-io/odigos/common/consts"

"github.com/odigos-io/odigos/k8sutils/pkg/utils"
"k8s.io/apimachinery/pkg/util/wait"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/kubernetes"

"github.com/odigos-io/odigos/k8sutils/pkg/workload"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (o *Orchestrator) rollBack(obj client.Object, templateSpecFetcher PodTemplateSpecFetcher) error {
// We create a new context for the rollback operation to ensure that the operation is not cancelled by the parent context
ctx := context.Background()

o.log("Rolling back changes to deployment")
templateSpec, err := templateSpecFetcher(ctx, obj.GetName(), obj.GetNamespace())
if err != nil {
o.log("Error fetching template spec")
return err
}

if isObjectModifiedByOdigos(obj, templateSpec) {
err := patchOdigosLabel(ctx, o.Client, obj)
if err != nil {
return err
}

err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Minute, true, func(ctx context.Context) (bool, error) {
templateSpec, err := templateSpecFetcher(ctx, obj.GetName(), obj.GetNamespace())
if err != nil {
o.log("Error fetching template spec")
return false, err
}

for _, container := range templateSpec.Spec.Containers {
if workload.IsContainerInstrumented(&container) {
return false, nil
}
}

rolloutCompleted, err := utils.VerifyAllPodsAreNOTInstrumented(ctx, o.Client, obj)
if err != nil {
o.log("Error verifying all pods are not instrumented")
return false, err
}

if rolloutCompleted {
o.log("Rollout completed, all running pods does not contains instrumentation")
}

return rolloutCompleted, nil
})

} else {
o.log("No changes made by Odigos, skipping rollback")
}
return nil
}

func patchOdigosLabel(ctx context.Context, client kubernetes.Interface, obj client.Object) error {
labels := obj.GetLabels()
if labels != nil {
if _, ok := labels[consts.OdigosInstrumentationLabel]; !ok {
return nil
}
}
patch := fmt.Sprintf(`{"metadata":{"labels":{"%s":null}}}`, consts.OdigosInstrumentationLabel)

switch obj.(type) {
case *appsv1.Deployment:
_, err := client.AppsV1().Deployments(obj.GetNamespace()).Patch(
ctx,
obj.GetName(),
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
if err != nil {
return err
}
case *appsv1.StatefulSet:
_, err := client.AppsV1().StatefulSets(obj.GetNamespace()).Patch(
ctx,
obj.GetName(),
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
if err != nil {
return err
}
case *appsv1.DaemonSet:
_, err := client.AppsV1().DaemonSets(obj.GetNamespace()).Patch(
ctx,
obj.GetName(),
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
if err != nil {
return err
}
}
return nil

}

func isObjectModifiedByOdigos(obj client.Object, templateSpec *v1.PodTemplateSpec) bool {
if workload.IsObjectLabeledForInstrumentation(obj) {
return true
}

for _, container := range templateSpec.Spec.Containers {
if workload.IsContainerInstrumented(&container) {
return true
}
}

return false
}
103 changes: 83 additions & 20 deletions cli/pkg/lifecycle/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,100 @@ func NewOrchestrator(client *kube.Client, ctx context.Context) (*Orchestrator, e
}, nil
}

func (o *Orchestrator) Apply(ctx context.Context, obj client.Object, templateSpecFetcher PodTemplateSpecFetcher) {
templateSpec, err := templateSpecFetcher(ctx, obj.GetName(), obj.GetNamespace())
if err != nil {
o.log(fmt.Sprintf("Error fetching pod template spec: %s", err))
return
}
func (o *Orchestrator) Apply(ctx context.Context, obj client.Object, templateSpecFetcher PodTemplateSpecFetcher) error {
// Create a channel to handle cancellation
done := make(chan struct{})
var finalErr error

go func() {
defer close(done)

state := o.getCurrentState(ctx, obj, templateSpec)
o.log(fmt.Sprintf("Current state: %s", state))
nextTransition := o.TransitionsMap[string(state)]
for nextTransition != nil {
templateSpec, err := templateSpecFetcher(ctx, obj.GetName(), obj.GetNamespace())
if err != nil {
o.log(fmt.Sprintf("Error fetching pod template spec: %s", err))
finalErr = fmt.Errorf("failed to fetch template spec: %w", err)
return
}

err = nextTransition.Execute(ctx, obj, templateSpec)
if err != nil {
o.log(fmt.Sprintf("Error executing transition: %s", err))
state := o.getCurrentState(ctx, obj, templateSpec)
o.log(fmt.Sprintf("Current state: %s", state))

if state == UnknownState {
if err := o.rollBack(obj, templateSpecFetcher); err != nil {
o.log(fmt.Sprintf("Error rolling back: %s", err))
finalErr = fmt.Errorf("failed to rollback from unknown state: %w", err)
return
}
return
}

// Special case: PreflightCheck change state manualy
if nextTransition.To() == PreflightChecksPassed {
state = PreflightChecksPassed
} else {
state = o.getCurrentState(ctx, obj, templateSpec)
nextTransition := o.TransitionsMap[string(state)]
for nextTransition != nil {
select {
case <-ctx.Done():
// Context was cancelled, perform rollback
o.log("Context cancelled, rolling back current object")
if err := o.rollBack(obj, templateSpecFetcher); err != nil {
o.log(fmt.Sprintf("Error rolling back after context cancellation: %s", err))
finalErr = fmt.Errorf("failed to rollback after context cancellation: %w", err)
return
}
finalErr = ctx.Err()
return
default:
templateSpec, err := templateSpecFetcher(ctx, obj.GetName(), obj.GetNamespace())
if err != nil {
o.log(fmt.Sprintf("Error fetching pod template spec: %s", err))
finalErr = fmt.Errorf("failed to fetch template spec during transition: %w", err)
return
}

if err := nextTransition.Execute(ctx, obj, templateSpec); err != nil {
o.log(fmt.Sprintf("Error executing transition: %s", err))
// Attempt rollback on execution error
if rbErr := o.rollBack(obj, templateSpecFetcher); rbErr != nil {
o.log(fmt.Sprintf("Error rolling back after failed execution: %s", rbErr))
finalErr = fmt.Errorf("failed to rollback after execution error: %w", rbErr)
return
}
finalErr = fmt.Errorf("failed to execute transition: %w", err)
return
}

// Special case: PreflightCheck change state manually
if nextTransition.To() == PreflightChecksPassed {
state = PreflightChecksPassed
} else {
state = o.getCurrentState(ctx, obj, templateSpec)
}

o.log(fmt.Sprintf("Current state: %s", state))

if state == UnknownState {
if err := o.rollBack(obj, templateSpecFetcher); err != nil {
o.log(fmt.Sprintf("Error rolling back: %s", err))
finalErr = fmt.Errorf("failed to rollback from unknown state during transition: %w", err)
return
}
return
}

nextTransition = o.TransitionsMap[string(state)]
}
}
o.log(fmt.Sprintf("Current state: %s", state))
nextTransition = o.TransitionsMap[string(state)]
}()

// Wait for either completion or context cancellation
select {
case <-ctx.Done():
// Wait for the goroutine to finish rollback
<-done
if finalErr == nil {
return ctx.Err()
}
return finalErr
case <-done:
return finalErr
}
}

Expand Down
Loading

0 comments on commit cf86ff2

Please sign in to comment.