From 4b16b89ab449ac4c208d2aefda730d92e8212bd2 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Fri, 8 Dec 2023 18:32:18 +0530 Subject: [PATCH 1/4] fix: update the cluster connection status in the clusterinfoupdater Signed-off-by: Chetan Banavikalmutt --- controller/cache/cache.go | 2 ++ controller/clusterinfoupdater.go | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/controller/cache/cache.go b/controller/cache/cache.go index d3e8a67cf3dc2..e9db25c88dcda 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -524,6 +524,8 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...) + clusterCache.StartClusterConnectionStatusMonitoring(context.Background()) + _ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) { toNotify := make(map[string]bool) var ref v1.ObjectReference diff --git a/controller/clusterinfoupdater.go b/controller/clusterinfoupdater.go index d87cdad6be85d..bf07f8cc709e5 100644 --- a/controller/clusterinfoupdater.go +++ b/controller/clusterinfoupdater.go @@ -143,12 +143,12 @@ func (c *clusterInfoUpdater) getUpdatedClusterInfo(ctx context.Context, apps []* ConnectionState: appv1.ConnectionState{ModifiedAt: &now}, ApplicationsCount: appCount, } - if info != nil { + if info != nil && appCount != 0 { clusterInfo.ServerVersion = info.K8SVersion clusterInfo.APIVersions = argo.APIResourcesToStrings(info.APIResources, true) if info.LastCacheSyncTime == nil { clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown - } else if info.SyncError == nil { + } else if info.SyncError == nil && info.ConnectionStatus == cache.ConnectionStatusSuccessful { clusterInfo.ConnectionState.Status = appv1.ConnectionStatusSuccessful syncTime := metav1.NewTime(*info.LastCacheSyncTime) clusterInfo.CacheInfo.LastCacheSyncTime = &syncTime @@ -156,7 +156,9 @@ func (c *clusterInfoUpdater) getUpdatedClusterInfo(ctx context.Context, apps []* clusterInfo.CacheInfo.ResourcesCount = int64(info.ResourcesCount) } else { clusterInfo.ConnectionState.Status = appv1.ConnectionStatusFailed - clusterInfo.ConnectionState.Message = info.SyncError.Error() + if info.SyncError != nil { + clusterInfo.ConnectionState.Message = info.SyncError.Error() + } } } else { clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown From 6733999b6318fbd4b225cd2fe01844abb382876b Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 4 Apr 2024 16:35:09 +0530 Subject: [PATCH 2/4] Add a retry to handle transient network errors Signed-off-by: Chetan Banavikalmutt --- controller/cache/cache.go | 50 +++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/controller/cache/cache.go b/controller/cache/cache.go index e9db25c88dcda..c529d5ee5293e 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -176,15 +176,17 @@ func NewLiveStateCache( resourceTracking argo.ResourceTracking) LiveStateCache { return &liveStateCache{ - appInformer: appInformer, - db: db, - clusters: make(map[string]clustercache.ClusterCache), - onObjectUpdated: onObjectUpdated, - kubectl: kubectl, - settingsMgr: settingsMgr, - metricsServer: metricsServer, - clusterSharding: clusterSharding, - resourceTracking: resourceTracking, + appInformer: appInformer, + db: db, + clusters: make(map[string]clustercache.ClusterCache), + clusterStatusCancel: make(map[string]context.CancelFunc), + onObjectUpdated: onObjectUpdated, + kubectl: kubectl, + settingsMgr: settingsMgr, + metricsServer: metricsServer, + clusterSharding: clusterSharding, + clusterFilter: clusterFilter, + resourceTracking: resourceTracking, } } @@ -210,9 +212,10 @@ type liveStateCache struct { resourceTracking argo.ResourceTracking ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts - clusters map[string]clustercache.ClusterCache - cacheSettings cacheSettings - lock sync.RWMutex + clusterStatusCancel map[string]context.CancelFunc + clusters map[string]clustercache.ClusterCache + cacheSettings cacheSettings + lock sync.RWMutex } func (c *liveStateCache) loadCacheSettings() (*cacheSettings, error) { @@ -520,11 +523,20 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e clustercache.SetLogr(logutils.NewLogrusLogger(log.WithField("server", cluster.Server))), clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError), clustercache.SetRespectRBAC(respectRBAC), + clustercache.SetClusterStatusRetryFunc(isTransientNetworkErr), + clustercache.SetClusterConnectionInterval(10 * time.Second), } clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...) - clusterCache.StartClusterConnectionStatusMonitoring(context.Background()) + // Make sure to check if the monitoring interval is disabled + + ctx, cancel := context.WithCancel(context.Background()) + if c.clusterStatusCancel == nil { + c.clusterStatusCancel = make(map[string]context.CancelFunc) + } + c.clusterStatusCancel[server] = cancel + clusterCache.StartClusterConnectionStatusMonitoring(ctx) _ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) { toNotify := make(map[string]bool) @@ -779,6 +791,12 @@ func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *a if !c.canHandleCluster(newCluster) { cluster.Invalidate() c.lock.Lock() + cancel, ok := c.clusterStatusCancel[newCluster.Server] + if ok { + // stop the cluster status monitoring goroutine + cancel() + delete(c.clusterStatusCancel, newCluster.Server) + } delete(c.clusters, newCluster.Server) c.lock.Unlock() return @@ -820,6 +838,12 @@ func (c *liveStateCache) handleDeleteEvent(clusterServer string) { if ok { cluster.Invalidate() c.lock.Lock() + cancel, ok := c.clusterStatusCancel[clusterServer] + if ok { + // stop the cluster status monitoring goroutine + cancel() + delete(c.clusterStatusCancel, clusterServer) + } delete(c.clusters, clusterServer) c.lock.Unlock() } From 8dde21223beaf8d96d717a4afd1fb94b50c606a2 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 9 May 2024 17:24:57 +0530 Subject: [PATCH 3/4] Make monitoring interval configurable Signed-off-by: Chetan Banavikalmutt --- controller/cache/cache.go | 24 +++++++++++++++--------- go.mod | 1 + go.sum | 4 ++-- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/controller/cache/cache.go b/controller/cache/cache.go index c529d5ee5293e..bf0ad42dcf5f4 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -69,6 +69,9 @@ const ( // EnvClusterCacheRetryUseBackoff is the env variable to control whether to use a backoff strategy with the retry during cluster cache sync EnvClusterCacheRetryUseBackoff = "ARGOCD_CLUSTER_CACHE_RETRY_USE_BACKOFF" + + // EnvClusterConnectionMonitoringInterval is the env variable to configure the cluster status monitoring interval. + EnvClusterConnectionMonitoringInterval = "ARGOCD_CLUSTER_STATUS_MONITORING_INTERVAL" ) // GitOps engine cluster cache tuning options @@ -100,6 +103,9 @@ var ( // clusterCacheRetryUseBackoff specifies whether to use a backoff strategy on cluster cache sync, if retry is enabled clusterCacheRetryUseBackoff bool = false + + // clusterStatusMonitoringInterval specifies the interval used by Argo CD to monitor the cluster connection status. + clusterStatusMonitoringInterval = 10 * time.Second ) func init() { @@ -111,6 +117,7 @@ func init() { clusterCacheListSemaphoreSize = env.ParseInt64FromEnv(EnvClusterCacheListSemaphore, clusterCacheListSemaphoreSize, 0, math.MaxInt64) clusterCacheAttemptLimit = int32(env.ParseNumFromEnv(EnvClusterCacheAttemptLimit, int(clusterCacheAttemptLimit), 1, math.MaxInt32)) clusterCacheRetryUseBackoff = env.ParseBoolFromEnv(EnvClusterCacheRetryUseBackoff, false) + clusterStatusMonitoringInterval = env.ParseDurationFromEnv(EnvClusterConnectionMonitoringInterval, clusterStatusMonitoringInterval, 0, math.MaxInt64) } type LiveStateCache interface { @@ -185,7 +192,6 @@ func NewLiveStateCache( settingsMgr: settingsMgr, metricsServer: metricsServer, clusterSharding: clusterSharding, - clusterFilter: clusterFilter, resourceTracking: resourceTracking, } } @@ -524,19 +530,19 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError), clustercache.SetRespectRBAC(respectRBAC), clustercache.SetClusterStatusRetryFunc(isTransientNetworkErr), - clustercache.SetClusterConnectionInterval(10 * time.Second), + clustercache.SetClusterConnectionInterval(clusterStatusMonitoringInterval), } clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...) - // Make sure to check if the monitoring interval is disabled - - ctx, cancel := context.WithCancel(context.Background()) - if c.clusterStatusCancel == nil { - c.clusterStatusCancel = make(map[string]context.CancelFunc) + if clusterStatusMonitoringInterval != 0 { + ctx, cancel := context.WithCancel(context.Background()) + if c.clusterStatusCancel == nil { + c.clusterStatusCancel = make(map[string]context.CancelFunc) + } + c.clusterStatusCancel[server] = cancel + clusterCache.StartClusterConnectionStatusMonitoring(ctx) } - c.clusterStatusCancel[server] = cancel - clusterCache.StartClusterConnectionStatusMonitoring(ctx) _ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) { toNotify := make(map[string]bool) diff --git a/go.mod b/go.mod index c6e1bb004bf7c..f4ddffedc9854 100644 --- a/go.mod +++ b/go.mod @@ -295,6 +295,7 @@ require ( ) replace ( + github.com/argoproj/gitops-engine => github.com/chetan-rns/gitops-engine v0.1.3-0.20240509130717-b3e1c67fec67 // https://github.com/golang/go/issues/33546#issuecomment-519656923 github.com/go-check/check => github.com/go-check/check v0.0.0-20180628173108-788fd7840127 diff --git a/go.sum b/go.sum index c9209abedde49..3ef70d58bab20 100644 --- a/go.sum +++ b/go.sum @@ -695,8 +695,6 @@ github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/appscode/go v0.0.0-20191119085241-0887d8ec2ecc/go.mod h1:OawnOmAL4ZX3YaPdN+8HTNwBveT1jMsqP74moa9XUbE= -github.com/argoproj/gitops-engine v0.7.1-0.20240514190100-8a3ce6d85caa h1:RcIYoAbkaGA7yzpY1YItaTLgKYABDfkITyQ4jUl3Y6c= -github.com/argoproj/gitops-engine v0.7.1-0.20240514190100-8a3ce6d85caa/go.mod h1:Vet2xN0akQpggQJZGmThA8Lozpn26RLagZFmLXw/oSI= github.com/argoproj/notifications-engine v0.4.1-0.20240403133627-f48567108f01 h1:/V8+HM0VPPTrdjTwUrkIj5a+SjaU//tJwfIXJ1QAOvg= github.com/argoproj/notifications-engine v0.4.1-0.20240403133627-f48567108f01/go.mod h1:N0A4sEws2soZjEpY4hgZpQS8mRIEw6otzwfkgc3g9uQ= github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1 h1:qsHwwOJ21K2Ao0xPju1sNuqphyMnMYkyB3ZLoLtxWpo= @@ -788,6 +786,8 @@ github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNS github.com/chai2010/gettext-go v1.0.2/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHeQQ+5AjwawxA= github.com/chainguard-dev/git-urls v1.0.2 h1:pSpT7ifrpc5X55n4aTTm7FFUE+ZQHKiqpiwNkJrVcKQ= github.com/chainguard-dev/git-urls v1.0.2/go.mod h1:rbGgj10OS7UgZlbzdUQIQpT0k/D4+An04HJY7Ol+Y/o= +github.com/chetan-rns/gitops-engine v0.1.3-0.20240509130717-b3e1c67fec67 h1:MTqQCeBINPIXxuhynmozQA6Y3Ijv59TemYmImd3UFy8= +github.com/chetan-rns/gitops-engine v0.1.3-0.20240509130717-b3e1c67fec67/go.mod h1:Vet2xN0akQpggQJZGmThA8Lozpn26RLagZFmLXw/oSI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= From cd0f05216763768bb63b1e4222afd57f60af9e77 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 30 May 2024 16:47:46 +0530 Subject: [PATCH 4/4] Fix unit tests for clusterinfoupdater Signed-off-by: Chetan Banavikalmutt --- controller/clusterinfoupdater.go | 3 ++- controller/clusterinfoupdater_test.go | 23 ++++++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/controller/clusterinfoupdater.go b/controller/clusterinfoupdater.go index bf07f8cc709e5..6dced80b14bd9 100644 --- a/controller/clusterinfoupdater.go +++ b/controller/clusterinfoupdater.go @@ -3,9 +3,10 @@ package controller import ( "context" "fmt" - "github.com/argoproj/argo-cd/v2/common" "time" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/util/env" "github.com/argoproj/gitops-engine/pkg/cache" "github.com/argoproj/gitops-engine/pkg/utils/kube" diff --git a/controller/clusterinfoupdater_test.go b/controller/clusterinfoupdater_test.go index d11d4412bf30c..f67e6fc7775cb 100644 --- a/controller/clusterinfoupdater_test.go +++ b/controller/clusterinfoupdater_test.go @@ -36,11 +36,13 @@ func TestClusterSecretUpdater(t *testing.T) { var tests = []struct { LastCacheSyncTime *time.Time SyncError error + ConnectionStatus clustercache.ConnectionStatus ExpectedStatus v1alpha1.ConnectionStatus }{ - {nil, nil, v1alpha1.ConnectionStatusUnknown}, - {&now, nil, v1alpha1.ConnectionStatusSuccessful}, - {&now, fmt.Errorf("sync failed"), v1alpha1.ConnectionStatusFailed}, + {nil, nil, clustercache.ConnectionStatusUnknown, v1alpha1.ConnectionStatusUnknown}, + {&now, nil, clustercache.ConnectionStatusSuccessful, v1alpha1.ConnectionStatusSuccessful}, + {&now, fmt.Errorf("sync failed"), clustercache.ConnectionStatusSuccessful, v1alpha1.ConnectionStatusFailed}, + {&now, nil, clustercache.ConnectionStatusFailed, v1alpha1.ConnectionStatusFailed}, } emptyArgoCDConfigMap := &v1.ConfigMap{ @@ -78,12 +80,27 @@ func TestClusterSecretUpdater(t *testing.T) { cluster, err := argoDB.CreateCluster(ctx, &v1alpha1.Cluster{Server: "http://minikube"}) assert.NoError(t, err, "Test prepare test data create cluster failed") + fakeApp := &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-app", + Namespace: fakeNamespace, + }, + Spec: v1alpha1.ApplicationSpec{ + Destination: v1alpha1.ApplicationDestination{ + Server: cluster.Server, + }, + }, + } + err = appInformer.GetIndexer().Add(fakeApp) + assert.NoError(t, err) + for _, test := range tests { info := &clustercache.ClusterInfo{ Server: cluster.Server, K8SVersion: updatedK8sVersion, LastCacheSyncTime: test.LastCacheSyncTime, SyncError: test.SyncError, + ConnectionStatus: test.ConnectionStatus, } lister := applisters.NewApplicationLister(appInformer.GetIndexer()).Applications(fakeNamespace)