From 137d5f8cce3ced586b1343541712cb0c1ae4ef53 Mon Sep 17 00:00:00 2001 From: sakai-ast <35858151+sakai-ast@users.noreply.github.com> Date: Fri, 7 Jul 2023 23:36:53 +0900 Subject: [PATCH] fix(controller): Enable dummy metrics server on non-leader workflow controller (#11295) Signed-off-by: sakai --- cmd/workflow-controller/main.go | 9 ++++ workflow/controller/controller.go | 7 +-- workflow/metrics/server.go | 33 ++++++++---- workflow/metrics/server_test.go | 87 +++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 13 deletions(-) create mode 100644 workflow/metrics/server_test.go diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index c45e3dbabd95..f9abfea00330 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -117,6 +117,7 @@ func NewRootCommand() *cobra.Command { log.Info("Leader election is turned off. Running in single-instance mode") log.WithField("id", "single-instance").Info("starting leading") go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers) + go wfController.RunMetricsServer(ctx, false) } else { nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY") if !ok { @@ -128,6 +129,11 @@ func NewRootCommand() *cobra.Command { leaderName = fmt.Sprintf("%s-%s", leaderName, wfController.Config.InstanceID) } + // for controlling the dummy metrics server + dummyCtx, dummyCancel := context.WithCancel(context.Background()) + defer dummyCancel() + go wfController.RunMetricsServer(dummyCtx, true) + go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: namespace}, Client: kubeclientset.CoordinationV1(), @@ -139,11 +145,14 @@ func NewRootCommand() *cobra.Command { RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second), Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { + dummyCancel() go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers) + go wfController.RunMetricsServer(ctx, false) }, OnStoppedLeading: func() { log.WithField("id", nodeID).Info("stopped leading") cancel() + go wfController.RunMetricsServer(dummyCtx, true) }, OnNewLeader: func(identity string) { log.WithField("leader", identity).Info("new leader") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 9810e367ed04..28e276a685bd 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -298,9 +298,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo log.Fatal("Timed out waiting for caches to sync") } - // Start the metrics server - go wfc.metrics.RunServer(ctx) - for i := 0; i < podCleanupWorkers; i++ { go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second) } @@ -323,6 +320,10 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo <-ctx.Done() } +func (wfc *WorkflowController) RunMetricsServer(ctx context.Context, isDummy bool) { + go wfc.metrics.RunServer(ctx, isDummy) +} + // Create and the Synchronization Manager func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) { getSyncLimit := func(lockKey string) (int, error) { diff --git a/workflow/metrics/server.go b/workflow/metrics/server.go index 46bcdff8ee08..cde6ac4bf3b8 100644 --- a/workflow/metrics/server.go +++ b/workflow/metrics/server.go @@ -18,7 +18,8 @@ import ( ) // RunServer starts a metrics server -func (m *Metrics) RunServer(ctx context.Context) { +// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started +func (m *Metrics) RunServer(ctx context.Context, isDummy bool) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) if !m.metricsConfig.Enabled { @@ -36,23 +37,33 @@ func (m *Metrics) RunServer(ctx context.Context) { // If the telemetry server is different -- and it's enabled -- run each on its own instance telemetryRegistry := prometheus.NewRegistry() telemetryRegistry.MustRegister(collectors.NewGoCollector()) - go runServer(m.telemetryConfig, telemetryRegistry, ctx) + go runServer(m.telemetryConfig, telemetryRegistry, ctx, isDummy) } // Run the metrics server - go runServer(m.metricsConfig, metricsRegistry, ctx) + go runServer(m.metricsConfig, metricsRegistry, ctx, isDummy) go m.garbageCollector(ctx) } -func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context) { +func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context, isDummy bool) { var handlerOpts promhttp.HandlerOpts if config.IgnoreErrors { handlerOpts.ErrorHandling = promhttp.ContinueOnError } + name := "" mux := http.NewServeMux() - mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts)) + if isDummy { + // dummy metrics server responds to all requests with a 200 status, but without providing any metrics data + name = "dummy metrics server" + mux.HandleFunc(config.Path, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + } else { + name = "prometheus metrics server" + mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts)) + } srv := &http.Server{Addr: fmt.Sprintf(":%v", config.Port), Handler: mux} if config.Secure { @@ -67,15 +78,15 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C } srv.TLSConfig = tlsConfig go func() { - log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path) - if err := srv.ListenAndServeTLS("", ""); err != nil { + log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path) + if err := srv.ListenAndServeTLS("", ""); err != http.ErrServerClosed { panic(err) } }() } else { go func() { - log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path) - if err := srv.ListenAndServe(); err != nil { + log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path) + if err := srv.ListenAndServe(); err != http.ErrServerClosed { panic(err) } }() @@ -88,7 +99,9 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { - log.Infof("Unable to shutdown metrics server at localhost:%v%s", config.Port, config.Path) + log.Infof("Unable to shutdown %s at localhost:%v%s", name, config.Port, config.Path) + } else { + log.Infof("Successfully shutdown %s at localhost:%v%s", name, config.Port, config.Path) } } diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go new file mode 100644 index 000000000000..a810561d46fe --- /dev/null +++ b/workflow/metrics/server_test.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "context" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDisableMetricsServer(t *testing.T) { + config := ServerConfig{ + Enabled: false, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + } + m := New(config, config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m.RunServer(ctx, false) + time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + if resp != nil { + defer resp.Body.Close() + } + + assert.Error(t, err) + assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start +} + +func TestMetricsServer(t *testing.T) { + config := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + } + m := New(config, config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m.RunServer(ctx, false) + time.Sleep(1 * time.Second) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + + bodyString := string(bodyBytes) + assert.NotEmpty(t, bodyString) +} + +func TestDummyMetricsServer(t *testing.T) { + config := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + } + m := New(config, config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m.RunServer(ctx, true) + time.Sleep(1 * time.Second) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + + bodyString := string(bodyBytes) + + assert.Empty(t, bodyString) // expect the dummy metrics server to provide no metrics responses +}