From ac802981ab6a31bd1e9c36d261cd4cfb35da62d7 Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Wed, 27 Nov 2024 22:21:18 -0800 Subject: [PATCH] fix: Graceful shutdown for the API server (#18642) Closes #18642 Implements a graceful shutdown the the API server. Without this, ArgoCD API server will eventually return 502 during rolling update. However, healthcheck would return 503 if the server is terminating. Signed-off-by: Andrii Korotkov Co-authored-by: Leonardo Luz Almeida Co-authored-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- cmd/argocd-server/commands/argocd_server.go | 3 + server/server.go | 158 ++++++++++++++++---- server/server_test.go | 59 ++++++++ 3 files changed, 188 insertions(+), 32 deletions(-) diff --git a/cmd/argocd-server/commands/argocd_server.go b/cmd/argocd-server/commands/argocd_server.go index 3de5d517e7d233..453d8f0d03dd69 100644 --- a/cmd/argocd-server/commands/argocd_server.go +++ b/cmd/argocd-server/commands/argocd_server.go @@ -273,6 +273,9 @@ func NewCommand() *cobra.Command { if closer != nil { closer() } + if argocd.TerminateRequested() { + break + } } }, Example: templates.Examples(` diff --git a/server/server.go b/server/server.go index 6625461dfab030..1a9494945c74e9 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "os/exec" + "os/signal" "path" "path/filepath" "reflect" @@ -20,6 +21,7 @@ import ( go_runtime "runtime" "strings" gosync "sync" + "syscall" "time" // nolint:staticcheck @@ -187,17 +189,20 @@ type ArgoCDServer struct { db db.ArgoDB // stopCh is the channel which when closed, will shutdown the Argo CD server - stopCh chan struct{} - userStateStorage util_session.UserStateStorage - indexDataInit gosync.Once - indexData []byte - indexDataErr error - staticAssets http.FileSystem - apiFactory api.Factory - secretInformer cache.SharedIndexInformer - configMapInformer cache.SharedIndexInformer - serviceSet *ArgoCDServiceSet - extensionManager *extension.Manager + stopCh chan os.Signal + userStateStorage util_session.UserStateStorage + indexDataInit gosync.Once + indexData []byte + indexDataErr error + staticAssets http.FileSystem + apiFactory api.Factory + secretInformer cache.SharedIndexInformer + configMapInformer cache.SharedIndexInformer + serviceSet *ArgoCDServiceSet + extensionManager *extension.Manager + shutdown func() + terminateRequested bool + receivedSignal bool } type ArgoCDServerOpts struct { @@ -329,6 +334,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio pg := extension.NewDefaultProjectGetter(projLister, dbInstance) ug := extension.NewDefaultUserGetter(policyEnf) em := extension.NewManager(logger, opts.Namespace, sg, ag, pg, enf, ug) + noopShutdown := func() { + log.Error("API Server Shutdown function called but server is not started yet.") + } a := &ArgoCDServer{ ArgoCDServerOpts: opts, @@ -352,6 +360,10 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio secretInformer: secretInformer, configMapInformer: configMapInformer, extensionManager: em, + shutdown: noopShutdown, + stopCh: make(chan os.Signal, 1), + terminateRequested: false, + receivedSignal: false, } err = a.logInClusterWarnings() @@ -369,6 +381,9 @@ const ( ) func (a *ArgoCDServer) healthCheck(r *http.Request) error { + if a.terminateRequested { + return errors.New("API Server is terminating and unable to serve requests") + } if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" { argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) _, err := argoDB.ListClusters(r.Context()) @@ -601,35 +616,114 @@ func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) { log.Fatal("Timed out waiting for project cache to sync") } - a.stopCh = make(chan struct{}) - <-a.stopCh + shutdownFunc := func() { + log.Info("API Server shutdown initiated. Shutting down servers...") + sCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + var wg gosync.WaitGroup + + // Shutdown http server + wg.Add(1) + go func() { + defer wg.Done() + err := httpS.Shutdown(sCtx) + if err != nil { + log.Errorf("Error shutting down http server: %s", err) + } + }() + + if httpsS != nil { + // Shutdown https server + wg.Add(1) + go func() { + defer wg.Done() + err := httpsS.Shutdown(sCtx) + if err != nil { + log.Errorf("Error shutting down https server: %s", err) + } + }() + } + + // Shutdown gRPC server + wg.Add(1) + go func() { + defer wg.Done() + grpcS.GracefulStop() + }() + + // Shutdown metrics server + wg.Add(1) + go func() { + defer wg.Done() + err := metricsServ.Shutdown(sCtx) + if err != nil { + log.Errorf("Error shutting down metrics server: %s", err) + } + }() + + if tlsm != nil { + // Shutdown tls server + wg.Add(1) + go func() { + defer wg.Done() + tlsm.Close() + }() + } + + // Shutdown tcp server + wg.Add(1) + go func() { + defer wg.Done() + tcpm.Close() + }() + + c := make(chan struct{}) + // This goroutine will wait for all servers to conclude the shutdown + // process + go func() { + defer close(c) + wg.Wait() + }() + + select { + case <-c: + log.Info("All servers were gracefully shutdown. Exiting...") + case <-sCtx.Done(): + log.Warn("Graceful shutdown timeout. Exiting...") + } + } + a.shutdown = shutdownFunc + + signal.Notify(a.stopCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + select { + case signal := <-a.stopCh: + log.Infof("API Server received signal: %s", signal.String()) + a.terminateRequested = true + a.receivedSignal = true + a.shutdown() + case <-ctx.Done(): + log.Infof("API Server: %s", ctx.Err()) + a.terminateRequested = true + a.shutdown() + } } func (a *ArgoCDServer) Initialized() bool { return a.projInformer.HasSynced() && a.appInformer.HasSynced() } +// TerminateRequested returns whether a shutdown was initiated by a signal or context cancel +// as opposed to a watch. +func (a *ArgoCDServer) TerminateRequested() bool { + return a.terminateRequested +} + // checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown func (a *ArgoCDServer) checkServeErr(name string, err error) { - if err != nil { - if a.stopCh == nil { - // a nil stopCh indicates a graceful shutdown - log.Infof("graceful shutdown %s: %v", name, err) - } else { - log.Fatalf("%s: %v", name, err) - } + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Errorf("Error received from server %s: %v", name, err) } else { - log.Infof("graceful shutdown %s", name) - } -} - -// Shutdown stops the Argo CD server -func (a *ArgoCDServer) Shutdown() { - log.Info("Shut down requested") - stopCh := a.stopCh - a.stopCh = nil - if stopCh != nil { - close(stopCh) + log.Infof("Graceful shutdown of %s initiated", name) } } @@ -734,7 +828,7 @@ func (a *ArgoCDServer) watchSettings() { } } log.Info("shutting down settings watch") - a.Shutdown() + a.shutdown() a.settingsMgr.Unsubscribe(updateCh) close(updateCh) } diff --git a/server/server_test.go b/server/server_test.go index 1f715d00d4e918..2bcdca4c223be0 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -10,6 +10,8 @@ import ( "os" "path/filepath" "strings" + gosync "sync" + "syscall" "testing" "time" @@ -419,6 +421,63 @@ func TestCertsAreNotGeneratedInInsecureMode(t *testing.T) { assert.Nil(t, s.settings.Certificate) } +func TestGracefulShutdown(t *testing.T) { + port, err := test.GetFreePort() + require.NoError(t, err) + mockRepoClient := &mocks.Clientset{RepoServerServiceClient: &mocks.RepoServerServiceClient{}} + kubeclientset := fake.NewSimpleClientset(test.NewFakeConfigMap(), test.NewFakeSecret()) + redis, redisCloser := test.NewInMemoryRedis() + defer redisCloser() + s := NewServer( + context.Background(), + ArgoCDServerOpts{ + ListenPort: port, + Namespace: test.FakeArgoCDNamespace, + KubeClientset: kubeclientset, + AppClientset: apps.NewSimpleClientset(), + RepoClientset: mockRepoClient, + RedisClient: redis, + }, + ApplicationSetOpts{}, + ) + + projInformerCancel := test.StartInformer(s.projInformer) + defer projInformerCancel() + appInformerCancel := test.StartInformer(s.appInformer) + defer appInformerCancel() + appsetInformerCancel := test.StartInformer(s.appsetInformer) + defer appsetInformerCancel() + + lns, err := s.Listen() + require.NoError(t, err) + + shutdown := false + runCtx, runCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer runCancel() + + var wg gosync.WaitGroup + wg.Add(1) + go func(shutdown *bool) { + defer wg.Done() + s.Run(runCtx, lns) + *shutdown = true + }(&shutdown) + + err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}}) + require.NoError(t, err) + + s.stopCh <- syscall.SIGINT + + wg.Wait() + + err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}}) + require.Error(t, err, "API Server is terminating and unable to serve requests") + + assert.True(t, s.terminateRequested) + assert.True(t, s.receivedSignal) + assert.True(t, shutdown) +} + func TestAuthenticate(t *testing.T) { type testData struct { test string