Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Graceful shutdown for the API server (#18642) #20981

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewCommand() *cobra.Command {
enableK8sEvent,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer(), nil)

stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion cmd/argocd-repo-server/commands/argocd_repo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewCommand() *cobra.Command {

askPassServer := askpass.NewServer(askpass.SocketPath)
metricsServer := metrics.NewMetricsServer()
cacheutil.CollectMetrics(redisClient, metricsServer)
cacheutil.CollectMetrics(redisClient, metricsServer, nil)
server, err := reposerver.NewServer(metricsServer, cache, tlsConfigCustomizer, repository.RepoServerInitConstants{
ParallelismLimit: parallelismLimit,
PauseGenerationAfterFailedGenerationAttempts: pauseGenerationAfterFailedGenerationAttempts,
Expand Down
15 changes: 9 additions & 6 deletions cmd/argocd-server/commands/argocd_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,22 +258,25 @@ func NewCommand() *cobra.Command {
stats.RegisterHeapDumper("memprofile")
argocd := server.NewServer(ctx, argoCDOpts, appsetOpts)
argocd.Init(ctx)
lns, err := argocd.Listen()
errors.CheckError(err)
for {
var closer func()
ctx, cancel := context.WithCancel(ctx)
serverCtx, cancel := context.WithCancel(ctx)
lns, err := argocd.Listen()
errors.CheckError(err)
if otlpAddress != "" {
closer, err = traceutil.InitTracer(ctx, "argocd-server", otlpAddress, otlpInsecure, otlpHeaders, otlpAttrs)
closer, err = traceutil.InitTracer(serverCtx, "argocd-server", otlpAddress, otlpInsecure, otlpHeaders, otlpAttrs)
if err != nil {
log.Fatalf("failed to initialize tracing: %v", err)
}
}
argocd.Run(ctx, lns)
cancel()
argocd.Run(serverCtx, lns)
if closer != nil {
closer()
}
cancel()
if argocd.TerminateRequested() {
break
}
}
},
Example: templates.Examples(`
Expand Down
176 changes: 143 additions & 33 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"net/url"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"reflect"
"regexp"
go_runtime "runtime"
"runtime/debug"
"strings"
gosync "sync"
"sync/atomic"
"syscall"
"time"

// nolint:staticcheck
Expand Down Expand Up @@ -187,17 +191,20 @@ type ArgoCDServer struct {
db db.ArgoDB

// stopCh is the channel which when closed, will shutdown the Argo CD server
stopCh chan struct{}
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
stopCh chan os.Signal
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
shutdown func()
terminateRequested atomic.Bool
available atomic.Bool
}

type ArgoCDServerOpts struct {
Expand Down Expand Up @@ -329,6 +336,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio
pg := extension.NewDefaultProjectGetter(projLister, dbInstance)
ug := extension.NewDefaultUserGetter(policyEnf)
em := extension.NewManager(logger, opts.Namespace, sg, ag, pg, enf, ug)
noopShutdown := func() {
log.Error("API Server Shutdown function called but server is not started yet.")
}

a := &ArgoCDServer{
ArgoCDServerOpts: opts,
Expand All @@ -352,6 +362,8 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio
secretInformer: secretInformer,
configMapInformer: configMapInformer,
extensionManager: em,
shutdown: noopShutdown,
stopCh: make(chan os.Signal, 1),
}

err = a.logInClusterWarnings()
Expand All @@ -369,6 +381,12 @@ const (
)

func (a *ArgoCDServer) healthCheck(r *http.Request) error {
if a.terminateRequested.Load() {
return errors.New("API Server is terminating and unable to serve requests.")
}
if !a.available.Load() {
return errors.New("API Server is not available. It either hasn't started or is restarting.")
}
if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" {
argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset)
_, err := argoDB.ListClusters(r.Context())
Expand Down Expand Up @@ -515,11 +533,19 @@ func (a *ArgoCDServer) Init(ctx context.Context) {
// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of
// golang/protobuf).
func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
defer func() {
if r := recover(); r != nil {
log.WithField("trace", string(debug.Stack())).Error("Recovered from panic: ", r)
a.terminateRequested.Store(true)
a.shutdown()
}
}()

a.userStateStorage.Init(ctx)

metricsServ := metrics.NewMetricsServer(a.MetricsHost, a.MetricsPort)
if a.RedisClient != nil {
cacheutil.CollectMetrics(a.RedisClient, metricsServ)
cacheutil.CollectMetrics(a.RedisClient, metricsServ, a.userStateStorage.GetLockObject())
}

svcSet := newArgoCDServiceSet(a)
Expand Down Expand Up @@ -601,35 +627,118 @@ func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
log.Fatal("Timed out waiting for project cache to sync")
}

a.stopCh = make(chan struct{})
<-a.stopCh
shutdownFunc := func() {
log.Info("API Server shutdown initiated. Shutting down servers...")
a.available.Store(false)
shutdownCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
var wg gosync.WaitGroup

// Shutdown http server
wg.Add(1)
go func() {
defer wg.Done()
err := httpS.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down http server: %s", err)
}
}()

if a.useTLS() {
// Shutdown https server
wg.Add(1)
go func() {
defer wg.Done()
err := httpsS.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down https server: %s", err)
}
}()
}

// Shutdown gRPC server
wg.Add(1)
go func() {
defer wg.Done()
grpcS.GracefulStop()
}()

// Shutdown metrics server
wg.Add(1)
go func() {
defer wg.Done()
err := metricsServ.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down metrics server: %s", err)
}
}()

if a.useTLS() {
// Shutdown tls server
wg.Add(1)
go func() {
defer wg.Done()
tlsm.Close()
}()
}

// Shutdown tcp server
wg.Add(1)
go func() {
defer wg.Done()
tcpm.Close()
}()

c := make(chan struct{})
// This goroutine will wait for all servers to conclude the shutdown
// process
go func() {
defer close(c)
wg.Wait()
}()

select {
case <-c:
log.Info("All servers were gracefully shutdown. Exiting...")
case <-shutdownCtx.Done():
log.Warn("Graceful shutdown timeout. Exiting...")
}
}
a.shutdown = shutdownFunc
signal.Notify(a.stopCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
a.available.Store(true)

select {
case signal := <-a.stopCh:
log.Infof("API Server received signal: %s", signal.String())
// SIGUSR1 is used for triggering a server restart
if signal != syscall.SIGUSR1 {
a.terminateRequested.Store(true)
}
a.shutdown()
case <-ctx.Done():
log.Infof("API Server: %s", ctx.Err())
a.terminateRequested.Store(true)
a.shutdown()
}
}

func (a *ArgoCDServer) Initialized() bool {
return a.projInformer.HasSynced() && a.appInformer.HasSynced()
}

// TerminateRequested returns whether a shutdown was initiated by a signal or context cancel
// as opposed to a watch.
func (a *ArgoCDServer) TerminateRequested() bool {
return a.terminateRequested.Load()
}

// checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown
func (a *ArgoCDServer) checkServeErr(name string, err error) {
if err != nil {
if a.stopCh == nil {
// a nil stopCh indicates a graceful shutdown
log.Infof("graceful shutdown %s: %v", name, err)
} else {
log.Fatalf("%s: %v", name, err)
}
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorf("Error received from server %s: %v", name, err)
} else {
log.Infof("graceful shutdown %s", name)
}
}

// Shutdown stops the Argo CD server
func (a *ArgoCDServer) Shutdown() {
log.Info("Shut down requested")
stopCh := a.stopCh
a.stopCh = nil
if stopCh != nil {
close(stopCh)
log.Infof("Graceful shutdown of %s initiated", name)
}
}

Expand Down Expand Up @@ -734,9 +843,10 @@ func (a *ArgoCDServer) watchSettings() {
}
}
log.Info("shutting down settings watch")
a.Shutdown()
a.settingsMgr.Unsubscribe(updateCh)
close(updateCh)
// Triggers server restart
a.stopCh <- syscall.SIGUSR1
}

func (a *ArgoCDServer) rbacPolicyLoader(ctx context.Context) {
Expand Down
68 changes: 68 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"path/filepath"
"strings"
gosync "sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -419,6 +421,72 @@ func TestCertsAreNotGeneratedInInsecureMode(t *testing.T) {
assert.Nil(t, s.settings.Certificate)
}

func TestGracefulShutdown(t *testing.T) {
port, err := test.GetFreePort()
require.NoError(t, err)
mockRepoClient := &mocks.Clientset{RepoServerServiceClient: &mocks.RepoServerServiceClient{}}
kubeclientset := fake.NewSimpleClientset(test.NewFakeConfigMap(), test.NewFakeSecret())
redis, redisCloser := test.NewInMemoryRedis()
defer redisCloser()
s := NewServer(
context.Background(),
ArgoCDServerOpts{
ListenPort: port,
Namespace: test.FakeArgoCDNamespace,
KubeClientset: kubeclientset,
AppClientset: apps.NewSimpleClientset(),
RepoClientset: mockRepoClient,
RedisClient: redis,
},
ApplicationSetOpts{},
)

projInformerCancel := test.StartInformer(s.projInformer)
defer projInformerCancel()
appInformerCancel := test.StartInformer(s.appInformer)
defer appInformerCancel()
appsetInformerCancel := test.StartInformer(s.appsetInformer)
defer appsetInformerCancel()

lns, err := s.Listen()
require.NoError(t, err)

shutdown := false
runCtx, runCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer runCancel()

err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}})
require.Error(t, err, "API Server is not running. It either hasn't started or is restarting.")

var wg gosync.WaitGroup
wg.Add(1)
go func(shutdown *bool) {
defer wg.Done()
s.Run(runCtx, lns)
*shutdown = true
}(&shutdown)

for {
if s.available.Load() {
err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}})
require.NoError(t, err)
break
}
time.Sleep(10 * time.Millisecond)
}

s.stopCh <- syscall.SIGINT

wg.Wait()

err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}})
require.Error(t, err, "API Server is terminating and unable to serve requests.")

assert.True(t, s.terminateRequested.Load())
assert.False(t, s.available.Load())
assert.True(t, shutdown)
}

func TestAuthenticate(t *testing.T) {
type testData struct {
test string
Expand Down
Loading
Loading