Skip to content

Commit

Permalink
fix(controller): Enable dummy metrics server on non-leader workflow c…
Browse files Browse the repository at this point in the history
…ontroller (#11295)

Signed-off-by: sakai <sakai.at24@gmail.com>
  • Loading branch information
sakai-ast authored Jul 7, 2023
1 parent 441f754 commit 137d5f8
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 13 deletions.
9 changes: 9 additions & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func NewRootCommand() *cobra.Command {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.RunMetricsServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
Expand All @@ -128,6 +129,11 @@ func NewRootCommand() *cobra.Command {
leaderName = fmt.Sprintf("%s-%s", leaderName, wfController.Config.InstanceID)
}

// for controlling the dummy metrics server
dummyCtx, dummyCancel := context.WithCancel(context.Background())
defer dummyCancel()
go wfController.RunMetricsServer(dummyCtx, true)

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: namespace}, Client: kubeclientset.CoordinationV1(),
Expand All @@ -139,11 +145,14 @@ func NewRootCommand() *cobra.Command {
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.RunMetricsServer(ctx, false)
},
OnStoppedLeading: func() {
log.WithField("id", nodeID).Info("stopped leading")
cancel()
go wfController.RunMetricsServer(dummyCtx, true)
},
OnNewLeader: func(identity string) {
log.WithField("leader", identity).Info("new leader")
Expand Down
7 changes: 4 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
log.Fatal("Timed out waiting for caches to sync")
}

// Start the metrics server
go wfc.metrics.RunServer(ctx)

for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
Expand All @@ -323,6 +320,10 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
<-ctx.Done()
}

func (wfc *WorkflowController) RunMetricsServer(ctx context.Context, isDummy bool) {
go wfc.metrics.RunServer(ctx, isDummy)
}

// Create and the Synchronization Manager
func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) {
getSyncLimit := func(lockKey string) (int, error) {
Expand Down
33 changes: 23 additions & 10 deletions workflow/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

// RunServer starts a metrics server
func (m *Metrics) RunServer(ctx context.Context) {
// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started
func (m *Metrics) RunServer(ctx context.Context, isDummy bool) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

if !m.metricsConfig.Enabled {
Expand All @@ -36,23 +37,33 @@ func (m *Metrics) RunServer(ctx context.Context) {
// If the telemetry server is different -- and it's enabled -- run each on its own instance
telemetryRegistry := prometheus.NewRegistry()
telemetryRegistry.MustRegister(collectors.NewGoCollector())
go runServer(m.telemetryConfig, telemetryRegistry, ctx)
go runServer(m.telemetryConfig, telemetryRegistry, ctx, isDummy)
}

// Run the metrics server
go runServer(m.metricsConfig, metricsRegistry, ctx)
go runServer(m.metricsConfig, metricsRegistry, ctx, isDummy)

go m.garbageCollector(ctx)
}

func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context) {
func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context, isDummy bool) {
var handlerOpts promhttp.HandlerOpts
if config.IgnoreErrors {
handlerOpts.ErrorHandling = promhttp.ContinueOnError
}

name := ""
mux := http.NewServeMux()
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
if isDummy {
// dummy metrics server responds to all requests with a 200 status, but without providing any metrics data
name = "dummy metrics server"
mux.HandleFunc(config.Path, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
} else {
name = "prometheus metrics server"
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
}
srv := &http.Server{Addr: fmt.Sprintf(":%v", config.Port), Handler: mux}

if config.Secure {
Expand All @@ -67,15 +78,15 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
}
srv.TLSConfig = tlsConfig
go func() {
log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != nil {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != http.ErrServerClosed {
panic(err)
}
}()
} else {
go func() {
log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path)
if err := srv.ListenAndServe(); err != nil {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
panic(err)
}
}()
Expand All @@ -88,7 +99,9 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Infof("Unable to shutdown metrics server at localhost:%v%s", config.Port, config.Path)
log.Infof("Unable to shutdown %s at localhost:%v%s", name, config.Port, config.Path)
} else {
log.Infof("Successfully shutdown %s at localhost:%v%s", name, config.Port, config.Path)
}
}

Expand Down
87 changes: 87 additions & 0 deletions workflow/metrics/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package metrics

import (
"context"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestDisableMetricsServer(t *testing.T) {
config := ServerConfig{
Enabled: false,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.RunServer(ctx, false)
time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
if resp != nil {
defer resp.Body.Close()
}

assert.Error(t, err)
assert.Contains(t, err.Error(), "connection refused") // expect that the metrics server not to start
}

func TestMetricsServer(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.RunServer(ctx, false)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)

bodyString := string(bodyBytes)
assert.NotEmpty(t, bodyString)
}

func TestDummyMetricsServer(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.RunServer(ctx, true)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)

bodyString := string(bodyBytes)

assert.Empty(t, bodyString) // expect the dummy metrics server to provide no metrics responses
}

0 comments on commit 137d5f8

Please sign in to comment.