Skip to content

Commit

Permalink
feat(hydrator): enable controller
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
Co-authored-by: Omer Azmon <omer_azmon@intuit.com>
Co-authored-by: daengdaengLee <gunho1020@gmail.com>
Co-authored-by: Juwon Hwang (Kevin) <juwon8891@gmail.com>
Co-authored-by: thisishwan2 <feel000617@gmail.com>
Co-authored-by: mirageoasis <kimhw0820@naver.com>
Co-authored-by: Robin Lieb <robin.j.lieb@gmail.com>
Co-authored-by: miiiinju1 <gms07073@ynu.ac.kr>
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
  • Loading branch information
9 people committed Sep 19, 2024
1 parent 5f33bd8 commit e67e3ef
Show file tree
Hide file tree
Showing 30 changed files with 1,069 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/app-controller} HOSTNAME=testappcontroller-1 FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''} --server-side-diff-enabled=${ARGOCD_APPLICATION_CONTROLLER_SERVER_SIDE_DIFF:-'false'}"
controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/app-controller} HOSTNAME=testappcontroller-1 FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --commit-server localhost:${ARGOCD_E2E_COMMITSERVER_PORT:-8086} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''} --server-side-diff-enabled=${ARGOCD_APPLICATION_CONTROLLER_SERVER_SIDE_DIFF:-'false'}"
api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/api-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}"
dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v2/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml"
redis: hack/start-redis-with-password.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/client-go/tools/clientcmd"

cmdutil "github.com/argoproj/argo-cd/v2/cmd/util"
commitclient "github.com/argoproj/argo-cd/v2/commitserver/apiclient"
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/controller"
"github.com/argoproj/argo-cd/v2/controller/sharding"
Expand Down Expand Up @@ -55,6 +56,7 @@ func NewCommand() *cobra.Command {
repoErrorGracePeriod int64
repoServerAddress string
repoServerTimeoutSeconds int
commitServerAddress string
selfHealTimeoutSeconds int
statusProcessors int
operationProcessors int
Expand Down Expand Up @@ -140,6 +142,8 @@ func NewCommand() *cobra.Command {

repoClientset := apiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig)

commitClientset := commitclient.NewCommitServerClientset(commitServerAddress)

cache, err := cacheSource()
errors.CheckError(err)
cache.Cache.SetClient(cacheutil.NewTwoLevelClient(cache.Cache.GetClient(), 10*time.Minute))
Expand All @@ -158,6 +162,7 @@ func NewCommand() *cobra.Command {
kubeClient,
appClient,
repoClientset,
commitClientset,
cache,
kubectl,
resyncDuration,
Expand Down Expand Up @@ -219,6 +224,7 @@ func NewCommand() *cobra.Command {
command.Flags().Int64Var(&repoErrorGracePeriod, "repo-error-grace-period-seconds", int64(env.ParseDurationFromEnv("ARGOCD_REPO_ERROR_GRACE_PERIOD_SECONDS", defaultAppResyncPeriod*time.Second, 0, math.MaxInt64).Seconds()), "Grace period in seconds for ignoring consecutive errors while communicating with repo server.")
command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address.")
command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.")
command.Flags().StringVar(&commitServerAddress, "commit-server", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_COMMIT_SERVER", common.DefaultCommitServerAddr), "Commit server address.")
command.Flags().IntVar(&statusProcessors, "status-processors", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_STATUS_PROCESSORS", 20, 0, math.MaxInt32), "Number of application status processors")
command.Flags().IntVar(&operationProcessors, "operation-processors", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_OPERATION_PROCESSORS", 10, 0, math.MaxInt32), "Number of application operation processors")
command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_LOGFORMAT", "text"), "Set the logging format. One of: text|json")
Expand Down
13 changes: 9 additions & 4 deletions cmd/argocd/commands/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type watchOpts struct {
suspended bool
degraded bool
delete bool
hydrated bool
}

// NewApplicationCreateCommand returns a new instance of an `argocd app create` command
Expand Down Expand Up @@ -1788,6 +1789,7 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
command.Flags().BoolVar(&watch.suspended, "suspended", false, "Wait for suspended")
command.Flags().BoolVar(&watch.degraded, "degraded", false, "Wait for degraded")
command.Flags().BoolVar(&watch.delete, "delete", false, "Wait for delete")
command.Flags().BoolVar(&watch.hydrated, "hydrated", false, "Wait for hydration operations")
command.Flags().StringVarP(&selector, "selector", "l", "", "Wait for apps by label. Supports '=', '==', '!=', in, notin, exists & not exists. Matching apps must satisfy all of the specified label constraints.")
command.Flags().StringArrayVar(&resources, "resource", []string{}, fmt.Sprintf("Sync only specific resources as GROUP%[1]sKIND%[1]sNAME or %[2]sGROUP%[1]sKIND%[1]sNAME. Fields may be blank and '*' can be used. This option may be specified repeatedly", resourceFieldDelimiter, resourceExcludeIndicator))
command.Flags().BoolVar(&watch.operation, "operation", false, "Wait for pending operations")
Expand Down Expand Up @@ -2324,7 +2326,7 @@ func groupResourceStates(app *argoappv1.Application, selectedResources []*argoap
}

// check if resource health, sync and operation statuses matches watch options
func checkResourceStatus(watch watchOpts, healthStatus string, syncStatus string, operationStatus *argoappv1.Operation) bool {
func checkResourceStatus(watch watchOpts, healthStatus string, syncStatus string, operationStatus *argoappv1.Operation, hydrationFinished bool) bool {
if watch.delete {
return false
}
Expand Down Expand Up @@ -2354,7 +2356,8 @@ func checkResourceStatus(watch watchOpts, healthStatus string, syncStatus string

synced := !watch.sync || syncStatus == string(argoappv1.SyncStatusCodeSynced)
operational := !watch.operation || operationStatus == nil
return synced && healthCheckPassed && operational
hydration := !watch.hydrated || hydrationFinished
return synced && healthCheckPassed && operational && hydration
}

// resourceParentChild gets the latest state of the app and the latest state of the app's resource tree and then
Expand Down Expand Up @@ -2518,21 +2521,23 @@ func waitOnApplicationStatus(ctx context.Context, acdClient argocdclient.Client,
}
}

hydrationFinished := app.Status.SourceHydrator.CurrentOperation != nil && app.Status.SourceHydrator.CurrentOperation.Phase == argoappv1.HydrateOperationPhaseHydrated && app.Status.SourceHydrator.CurrentOperation.SourceHydrator.DeepEquals(app.Status.SourceHydrator.LastSuccessfulOperation.SourceHydrator) && app.Status.SourceHydrator.CurrentOperation.DrySHA == app.Status.SourceHydrator.LastSuccessfulOperation.DrySHA

var selectedResourcesAreReady bool

// If selected resources are included, wait only on those resources, otherwise wait on the application as a whole.
if len(selectedResources) > 0 {
selectedResourcesAreReady = true
for _, state := range getResourceStates(app, selectedResources) {
resourceIsReady := checkResourceStatus(watch, state.Health, state.Status, appEvent.Application.Operation)
resourceIsReady := checkResourceStatus(watch, state.Health, state.Status, appEvent.Application.Operation, hydrationFinished)
if !resourceIsReady {
selectedResourcesAreReady = false
break
}
}
} else {
// Wait on the application as a whole
selectedResourcesAreReady = checkResourceStatus(watch, string(app.Status.Health.Status), string(app.Status.Sync.Status), appEvent.Application.Operation)
selectedResourcesAreReady = checkResourceStatus(watch, string(app.Status.Health.Status), string(app.Status.Sync.Status), appEvent.Application.Operation, hydrationFinished)
}

if selectedResourcesAreReady && (!operationInProgress || !watch.operation) {
Expand Down
24 changes: 12 additions & 12 deletions cmd/argocd/commands/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,81 +1762,81 @@ func TestCheckResourceStatus(t *testing.T) {
suspended: true,
health: true,
degraded: true,
}, string(health.HealthStatusHealthy), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusHealthy), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Degraded, Suspended and health status failed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: true,
health: true,
degraded: true,
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.False(t, res)
})
t.Run("Suspended and health status passed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: true,
health: true,
}, string(health.HealthStatusHealthy), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusHealthy), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Suspended and health status failed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: true,
health: true,
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.False(t, res)
})
t.Run("Suspended passed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: true,
health: false,
}, string(health.HealthStatusSuspended), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusSuspended), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Suspended failed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: true,
health: false,
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.False(t, res)
})
t.Run("Health passed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: false,
health: true,
}, string(health.HealthStatusHealthy), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusHealthy), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Health failed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: false,
health: true,
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.False(t, res)
})
t.Run("Synced passed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
res := checkResourceStatus(watchOpts{}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Synced failed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeOutOfSync), &v1alpha1.Operation{})
res := checkResourceStatus(watchOpts{}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeOutOfSync), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Degraded passed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: false,
health: false,
degraded: true,
}, string(health.HealthStatusDegraded), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusDegraded), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.True(t, res)
})
t.Run("Degraded failed", func(t *testing.T) {
res := checkResourceStatus(watchOpts{
suspended: false,
health: false,
degraded: true,
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{})
}, string(health.HealthStatusProgressing), string(v1alpha1.SyncStatusCodeSynced), &v1alpha1.Operation{}, true)
assert.False(t, res)
})
}
Expand Down
89 changes: 73 additions & 16 deletions cmd/util/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ type AppOptions struct {
retryBackoffMaxDuration time.Duration
retryBackoffFactor int64
ref string
drySourceRepo string
drySourceRevision string
drySourcePath string
syncSourceBranch string
syncSourcePath string
hydrateToBranch string
}

func AddAppFlags(command *cobra.Command, opts *AppOptions) {
Expand All @@ -94,6 +100,12 @@ func AddAppFlags(command *cobra.Command, opts *AppOptions) {
command.Flags().StringVar(&opts.chart, "helm-chart", "", "Helm Chart name")
command.Flags().StringVar(&opts.env, "env", "", "Application environment to monitor")
command.Flags().StringVar(&opts.revision, "revision", "", "The tracking source branch, tag, commit or Helm chart version the application will sync to")
command.Flags().StringVar(&opts.drySourceRepo, "dry-source-repo", "", "Repository URL of the app dry source")
command.Flags().StringVar(&opts.drySourceRevision, "dry-source-revision", "", "Revision of the app dry source")
command.Flags().StringVar(&opts.drySourcePath, "dry-source-path", "", "Path in repository to the app directory for the dry source")
command.Flags().StringVar(&opts.syncSourceBranch, "sync-source-branch", "", "The branch from which the app will sync")
command.Flags().StringVar(&opts.syncSourcePath, "sync-source-path", "", "The path in the repository from which the app will sync")
command.Flags().StringVar(&opts.hydrateToBranch, "hydrate-to-branch", "", "The branch to hydrate the app to")
command.Flags().IntVar(&opts.revisionHistoryLimit, "revision-history-limit", argoappv1.RevisionHistoryLimit, "How many items to keep in revision history")
command.Flags().StringVar(&opts.destServer, "dest-server", "", "K8s cluster URL (e.g. https://kubernetes.default.svc)")
command.Flags().StringVar(&opts.destName, "dest-name", "", "K8s cluster Name (e.g. minikube)")
Expand Down Expand Up @@ -154,21 +166,27 @@ func SetAppSpecOptions(flags *pflag.FlagSet, spec *argoappv1.ApplicationSpec, ap
if flags == nil {
return visited
}
source := spec.GetSourcePtrByPosition(sourcePosition)
if source == nil {
source = &argoappv1.ApplicationSource{}
}
source, visited = ConstructSource(source, *appOpts, flags)
if spec.HasMultipleSources() {
if sourcePosition == 0 {
spec.Sources[sourcePosition] = *source
} else if sourcePosition > 0 {
spec.Sources[sourcePosition-1] = *source
var h *argoappv1.SourceHydrator
h, hasHydratorFlag := constructSourceHydrator(spec.SourceHydrator, *appOpts, flags)
if hasHydratorFlag {
spec.SourceHydrator = h
} else {
source := spec.GetSourcePtrByPosition(sourcePosition)
if source == nil {
source = &argoappv1.ApplicationSource{}
}
source, visited = ConstructSource(source, *appOpts, flags)
if spec.HasMultipleSources() {
if sourcePosition == 0 {
spec.Sources[sourcePosition] = *source
} else if sourcePosition > 0 {
spec.Sources[sourcePosition-1] = *source
} else {
spec.Sources = append(spec.Sources, *source)
}
} else {
spec.Sources = append(spec.Sources, *source)
spec.Source = source
}
} else {
spec.Source = source
}
flags.Visit(func(f *pflag.Flag) {
visited++
Expand Down Expand Up @@ -563,9 +581,7 @@ func constructAppsBaseOnName(appName string, labels, annotations, args []string,
Name: appName,
Namespace: appNs,
},
Spec: argoappv1.ApplicationSpec{
Source: &argoappv1.ApplicationSource{},
},
Spec: argoappv1.ApplicationSpec{},
}
SetAppSpecOptions(flags, &app.Spec, &appOpts, 0)
SetParameterOverrides(app, appOpts.Parameters, 0)
Expand Down Expand Up @@ -733,6 +749,47 @@ func ConstructSource(source *argoappv1.ApplicationSource, appOpts AppOptions, fl
return source, visited
}

// constructSourceHydrator constructs a source hydrator from the command line flags. It returns the modified source
// hydrator and a boolean indicating if any hydrator flags were set. We return instead of just modifying the source
// hydrator in place because the given hydrator `h` might be nil. In that case, we need to create a new source hydrator
// and return it.
func constructSourceHydrator(h *argoappv1.SourceHydrator, appOpts AppOptions, flags *pflag.FlagSet) (*argoappv1.SourceHydrator, bool) {
hasHydratorFlag := false
ensureNotNil := func(notEmpty bool) {
hasHydratorFlag = true
if notEmpty && h == nil {
h = &argoappv1.SourceHydrator{}
}
}
flags.Visit(func(f *pflag.Flag) {
switch f.Name {
case "dry-source-repo":
ensureNotNil(appOpts.drySourceRepo != "")
h.DrySource.RepoURL = appOpts.drySourceRepo
case "dry-source-path":
ensureNotNil(appOpts.drySourcePath != "")
h.DrySource.Path = appOpts.drySourcePath
case "dry-source-revision":
ensureNotNil(appOpts.drySourceRevision != "")
h.DrySource.TargetRevision = appOpts.drySourceRevision
case "sync-source-branch":
ensureNotNil(appOpts.syncSourceBranch != "")
h.SyncSource.TargetBranch = appOpts.syncSourceBranch
case "sync-source-path":
ensureNotNil(appOpts.syncSourcePath != "")
h.SyncSource.Path = appOpts.syncSourcePath
case "hydrate-to-branch":
ensureNotNil(appOpts.hydrateToBranch != "")
if appOpts.hydrateToBranch == "" {
h.HydrateTo = nil
} else {
h.HydrateTo = &argoappv1.HydrateTo{TargetBranch: appOpts.hydrateToBranch}
}
}
})
return h, hasHydratorFlag
}

func mergeLabels(app *argoappv1.Application, labels []string) {
mapLabels, err := label.Parse(labels)
errors.CheckError(err)
Expand Down
Loading

0 comments on commit e67e3ef

Please sign in to comment.