From 227866eee45d3704b7c25f133e35c48513ef863b Mon Sep 17 00:00:00 2001 From: sakai Date: Wed, 5 Jul 2023 19:29:24 +0900 Subject: [PATCH 1/6] fix(controller): Enable dummy metrics server on non-leader workflow controller Signed-off-by: sakai --- cmd/workflow-controller/main.go | 9 ++++++ workflow/controller/controller.go | 7 +++-- workflow/metrics/server.go | 33 ++++++++++++++------ workflow/metrics/server_test.go | 51 +++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 13 deletions(-) create mode 100644 workflow/metrics/server_test.go diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index c45e3dbabd95..f9abfea00330 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -117,6 +117,7 @@ func NewRootCommand() *cobra.Command { log.Info("Leader election is turned off. Running in single-instance mode") log.WithField("id", "single-instance").Info("starting leading") go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers) + go wfController.RunMetricsServer(ctx, false) } else { nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY") if !ok { @@ -128,6 +129,11 @@ func NewRootCommand() *cobra.Command { leaderName = fmt.Sprintf("%s-%s", leaderName, wfController.Config.InstanceID) } + // for controlling the dummy metrics server + dummyCtx, dummyCancel := context.WithCancel(context.Background()) + defer dummyCancel() + go wfController.RunMetricsServer(dummyCtx, true) + go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: namespace}, Client: kubeclientset.CoordinationV1(), @@ -139,11 +145,14 @@ func NewRootCommand() *cobra.Command { RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second), Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { + dummyCancel() go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers) + go wfController.RunMetricsServer(ctx, false) }, OnStoppedLeading: func() { log.WithField("id", nodeID).Info("stopped leading") cancel() + go wfController.RunMetricsServer(dummyCtx, true) }, OnNewLeader: func(identity string) { log.WithField("leader", identity).Info("new leader") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 9810e367ed04..28e276a685bd 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -298,9 +298,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo log.Fatal("Timed out waiting for caches to sync") } - // Start the metrics server - go wfc.metrics.RunServer(ctx) - for i := 0; i < podCleanupWorkers; i++ { go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second) } @@ -323,6 +320,10 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo <-ctx.Done() } +func (wfc *WorkflowController) RunMetricsServer(ctx context.Context, isDummy bool) { + go wfc.metrics.RunServer(ctx, isDummy) +} + // Create and the Synchronization Manager func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) { getSyncLimit := func(lockKey string) (int, error) { diff --git a/workflow/metrics/server.go b/workflow/metrics/server.go index 46bcdff8ee08..cde6ac4bf3b8 100644 --- a/workflow/metrics/server.go +++ b/workflow/metrics/server.go @@ -18,7 +18,8 @@ import ( ) // RunServer starts a metrics server -func (m *Metrics) RunServer(ctx context.Context) { +// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started +func (m *Metrics) RunServer(ctx context.Context, isDummy bool) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) if !m.metricsConfig.Enabled { @@ -36,23 +37,33 @@ func (m *Metrics) RunServer(ctx context.Context) { // If the telemetry server is different -- and it's enabled -- run each on its own instance telemetryRegistry := prometheus.NewRegistry() telemetryRegistry.MustRegister(collectors.NewGoCollector()) - go runServer(m.telemetryConfig, telemetryRegistry, ctx) + go runServer(m.telemetryConfig, telemetryRegistry, ctx, isDummy) } // Run the metrics server - go runServer(m.metricsConfig, metricsRegistry, ctx) + go runServer(m.metricsConfig, metricsRegistry, ctx, isDummy) go m.garbageCollector(ctx) } -func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context) { +func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context, isDummy bool) { var handlerOpts promhttp.HandlerOpts if config.IgnoreErrors { handlerOpts.ErrorHandling = promhttp.ContinueOnError } + name := "" mux := http.NewServeMux() - mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts)) + if isDummy { + // dummy metrics server responds to all requests with a 200 status, but without providing any metrics data + name = "dummy metrics server" + mux.HandleFunc(config.Path, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + } else { + name = "prometheus metrics server" + mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts)) + } srv := &http.Server{Addr: fmt.Sprintf(":%v", config.Port), Handler: mux} if config.Secure { @@ -67,15 +78,15 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C } srv.TLSConfig = tlsConfig go func() { - log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path) - if err := srv.ListenAndServeTLS("", ""); err != nil { + log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path) + if err := srv.ListenAndServeTLS("", ""); err != http.ErrServerClosed { panic(err) } }() } else { go func() { - log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path) - if err := srv.ListenAndServe(); err != nil { + log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path) + if err := srv.ListenAndServe(); err != http.ErrServerClosed { panic(err) } }() @@ -88,7 +99,9 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { - log.Infof("Unable to shutdown metrics server at localhost:%v%s", config.Port, config.Path) + log.Infof("Unable to shutdown %s at localhost:%v%s", name, config.Port, config.Path) + } else { + log.Infof("Successfully shutdown %s at localhost:%v%s", name, config.Port, config.Path) } } diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go new file mode 100644 index 000000000000..da1745402995 --- /dev/null +++ b/workflow/metrics/server_test.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "context" + "fmt" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRunServer(t *testing.T) { + config := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + } + m := New(config, config) + + server := func(isDummy bool, shouldBodyBeEmpty bool) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go m.RunServer(ctx, isDummy) + + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + assert.NoError(t, err) + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + + bodyString := string(bodyBytes) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + if shouldBodyBeEmpty { + assert.Empty(t, bodyString) + } else { + assert.NotEmpty(t, bodyString) + } + } + + t.Run("dummy metrics server", func(t *testing.T) { + server(true, true) // dummy metrics server does not provide any metrics responses + }) + + t.Run("prometheus metrics server", func(t *testing.T) { + server(false, false) // prometheus metrics server provides responses for any metrics + }) +} From 02ce31940e48988391088832a8fb018a507391c7 Mon Sep 17 00:00:00 2001 From: sakai Date: Fri, 7 Jul 2023 10:45:39 +0900 Subject: [PATCH 2/6] chore: refactor tests and add tests as much as I can Signed-off-by: sakai --- workflow/metrics/server_test.go | 111 +++++++++++++++++++++++++------- 1 file changed, 88 insertions(+), 23 deletions(-) diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go index da1745402995..5d075677ca24 100644 --- a/workflow/metrics/server_test.go +++ b/workflow/metrics/server_test.go @@ -10,7 +10,23 @@ import ( "github.com/stretchr/testify/assert" ) -func TestRunServer(t *testing.T) { +func TestDisableMetricsServer(t *testing.T) { + config := ServerConfig{ + Enabled: false, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + } + m := New(config, config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go m.RunServer(ctx, false) + _, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start +} + +func TestSameMetricsServer(t *testing.T) { config := ServerConfig{ Enabled: true, Path: DefaultMetricsServerPath, @@ -18,34 +34,83 @@ func TestRunServer(t *testing.T) { } m := New(config, config) - server := func(isDummy bool, shouldBodyBeEmpty bool) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go m.RunServer(ctx, false) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + + bodyString := string(bodyBytes) + assert.NotEmpty(t, bodyString) +} + +func TestOwnMetricsServer(t *testing.T) { + metricsConfig := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + } + telemetryConfig := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: 9091, + } + m := New(metricsConfig, telemetryConfig) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go m.RunServer(ctx, false) + mresp, merr := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + tresp, terr := http.Get(fmt.Sprintf("http://localhost:%d%s", 9091, DefaultMetricsServerPath)) - go m.RunServer(ctx, isDummy) + assert.NoError(t, merr) + assert.NoError(t, terr) + assert.Equal(t, http.StatusOK, mresp.StatusCode) + assert.Equal(t, http.StatusOK, tresp.StatusCode) - resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) - assert.NoError(t, err) - defer resp.Body.Close() + defer mresp.Body.Close() + defer tresp.Body.Close() - bodyBytes, err := io.ReadAll(resp.Body) - assert.NoError(t, err) + mbodyBytes, err := io.ReadAll(mresp.Body) + tbodyBytes, err := io.ReadAll(tresp.Body) + assert.NoError(t, err) - bodyString := string(bodyBytes) + mbodyString := string(mbodyBytes) + tbodyString := string(tbodyBytes) + assert.NotEmpty(t, mbodyString) + assert.NotEmpty(t, tbodyString) +} - assert.Equal(t, http.StatusOK, resp.StatusCode) - if shouldBodyBeEmpty { - assert.Empty(t, bodyString) - } else { - assert.NotEmpty(t, bodyString) - } +func TestDummyMetricsServer(t *testing.T) { + config := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, } + m := New(config, config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go m.RunServer(ctx, true) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) - t.Run("dummy metrics server", func(t *testing.T) { - server(true, true) // dummy metrics server does not provide any metrics responses - }) + bodyString := string(bodyBytes) - t.Run("prometheus metrics server", func(t *testing.T) { - server(false, false) // prometheus metrics server provides responses for any metrics - }) + assert.Empty(t, bodyString) // expect the dummy metrics server to provide no metrics responses } From 3a83baedec7103b5b2c6e17fbde4495e62e56a4e Mon Sep 17 00:00:00 2001 From: sakai Date: Fri, 7 Jul 2023 11:04:52 +0900 Subject: [PATCH 3/6] fix: Change processing from async to sync Signed-off-by: sakai --- workflow/metrics/server_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go index 5d075677ca24..e27a0689135d 100644 --- a/workflow/metrics/server_test.go +++ b/workflow/metrics/server_test.go @@ -21,7 +21,7 @@ func TestDisableMetricsServer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go m.RunServer(ctx, false) + m.RunServer(ctx, false) _, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start } @@ -37,7 +37,7 @@ func TestSameMetricsServer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go m.RunServer(ctx, false) + m.RunServer(ctx, false) resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) @@ -67,7 +67,7 @@ func TestOwnMetricsServer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go m.RunServer(ctx, false) + m.RunServer(ctx, false) mresp, merr := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) tresp, terr := http.Get(fmt.Sprintf("http://localhost:%d%s", 9091, DefaultMetricsServerPath)) @@ -100,7 +100,7 @@ func TestDummyMetricsServer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go m.RunServer(ctx, true) + m.RunServer(ctx, true) resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) From 458bc2f2d9e05fb8a8df9b7583774e24ded0cf64 Mon Sep 17 00:00:00 2001 From: sakai Date: Fri, 7 Jul 2023 11:14:45 +0900 Subject: [PATCH 4/6] fix: Remove own metrics server test as it fails in the GitHub Actions environment Signed-off-by: sakai --- workflow/metrics/server_test.go | 40 +-------------------------------- 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go index e27a0689135d..c9804a28522b 100644 --- a/workflow/metrics/server_test.go +++ b/workflow/metrics/server_test.go @@ -26,7 +26,7 @@ func TestDisableMetricsServer(t *testing.T) { assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start } -func TestSameMetricsServer(t *testing.T) { +func TestMetricsServer(t *testing.T) { config := ServerConfig{ Enabled: true, Path: DefaultMetricsServerPath, @@ -51,44 +51,6 @@ func TestSameMetricsServer(t *testing.T) { assert.NotEmpty(t, bodyString) } -func TestOwnMetricsServer(t *testing.T) { - metricsConfig := ServerConfig{ - Enabled: true, - Path: DefaultMetricsServerPath, - Port: DefaultMetricsServerPort, - } - telemetryConfig := ServerConfig{ - Enabled: true, - Path: DefaultMetricsServerPath, - Port: 9091, - } - m := New(metricsConfig, telemetryConfig) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - m.RunServer(ctx, false) - mresp, merr := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) - tresp, terr := http.Get(fmt.Sprintf("http://localhost:%d%s", 9091, DefaultMetricsServerPath)) - - assert.NoError(t, merr) - assert.NoError(t, terr) - assert.Equal(t, http.StatusOK, mresp.StatusCode) - assert.Equal(t, http.StatusOK, tresp.StatusCode) - - defer mresp.Body.Close() - defer tresp.Body.Close() - - mbodyBytes, err := io.ReadAll(mresp.Body) - tbodyBytes, err := io.ReadAll(tresp.Body) - assert.NoError(t, err) - - mbodyString := string(mbodyBytes) - tbodyString := string(tbodyBytes) - assert.NotEmpty(t, mbodyString) - assert.NotEmpty(t, tbodyString) -} - func TestDummyMetricsServer(t *testing.T) { config := ServerConfig{ Enabled: true, From f40256e3591a9aa1c2acfe4f0ad28329ffca5cba Mon Sep 17 00:00:00 2001 From: sakai Date: Fri, 7 Jul 2023 12:48:27 +0900 Subject: [PATCH 5/6] fix: fix for response body must be closed Signed-off-by: sakai --- workflow/metrics/server_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go index c9804a28522b..3649b8de529c 100644 --- a/workflow/metrics/server_test.go +++ b/workflow/metrics/server_test.go @@ -22,7 +22,12 @@ func TestDisableMetricsServer(t *testing.T) { defer cancel() m.RunServer(ctx, false) - _, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) + if resp != nil { + defer resp.Body.Close() + } + + assert.Error(t, err) assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start } From d7f1574974873640b621f2846df4821b98d1d8c1 Mon Sep 17 00:00:00 2001 From: sakai Date: Fri, 7 Jul 2023 13:46:36 +0900 Subject: [PATCH 6/6] fix: To address the issue of the test failing flakily Signed-off-by: sakai --- workflow/metrics/server_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/workflow/metrics/server_test.go b/workflow/metrics/server_test.go index 3649b8de529c..a810561d46fe 100644 --- a/workflow/metrics/server_test.go +++ b/workflow/metrics/server_test.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -22,6 +23,7 @@ func TestDisableMetricsServer(t *testing.T) { defer cancel() m.RunServer(ctx, false) + time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) if resp != nil { defer resp.Body.Close() @@ -43,6 +45,7 @@ func TestMetricsServer(t *testing.T) { defer cancel() m.RunServer(ctx, false) + time.Sleep(1 * time.Second) resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) @@ -68,6 +71,7 @@ func TestDummyMetricsServer(t *testing.T) { defer cancel() m.RunServer(ctx, true) + time.Sleep(1 * time.Second) resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath)) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode)