Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): Enable dummy metrics server on non-leader workflow controller #11295

Merged
merged 6 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func NewRootCommand() *cobra.Command {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.RunMetricsServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
Expand All @@ -128,6 +129,11 @@ func NewRootCommand() *cobra.Command {
leaderName = fmt.Sprintf("%s-%s", leaderName, wfController.Config.InstanceID)
}

// for controlling the dummy metrics server
dummyCtx, dummyCancel := context.WithCancel(context.Background())
defer dummyCancel()
go wfController.RunMetricsServer(dummyCtx, true)

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: namespace}, Client: kubeclientset.CoordinationV1(),
Expand All @@ -139,11 +145,14 @@ func NewRootCommand() *cobra.Command {
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.RunMetricsServer(ctx, false)
},
OnStoppedLeading: func() {
log.WithField("id", nodeID).Info("stopped leading")
cancel()
go wfController.RunMetricsServer(dummyCtx, true)
},
OnNewLeader: func(identity string) {
log.WithField("leader", identity).Info("new leader")
Expand Down
7 changes: 4 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
log.Fatal("Timed out waiting for caches to sync")
}

// Start the metrics server
go wfc.metrics.RunServer(ctx)

for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
Expand All @@ -323,6 +320,10 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
<-ctx.Done()
}

func (wfc *WorkflowController) RunMetricsServer(ctx context.Context, isDummy bool) {
go wfc.metrics.RunServer(ctx, isDummy)
}

// Create and the Synchronization Manager
func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) {
getSyncLimit := func(lockKey string) (int, error) {
Expand Down
33 changes: 23 additions & 10 deletions workflow/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

// RunServer starts a metrics server
func (m *Metrics) RunServer(ctx context.Context) {
// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started
func (m *Metrics) RunServer(ctx context.Context, isDummy bool) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

if !m.metricsConfig.Enabled {
Expand All @@ -36,23 +37,33 @@ func (m *Metrics) RunServer(ctx context.Context) {
// If the telemetry server is different -- and it's enabled -- run each on its own instance
telemetryRegistry := prometheus.NewRegistry()
telemetryRegistry.MustRegister(collectors.NewGoCollector())
go runServer(m.telemetryConfig, telemetryRegistry, ctx)
go runServer(m.telemetryConfig, telemetryRegistry, ctx, isDummy)
}

// Run the metrics server
go runServer(m.metricsConfig, metricsRegistry, ctx)
go runServer(m.metricsConfig, metricsRegistry, ctx, isDummy)

go m.garbageCollector(ctx)
}

func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context) {
func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context, isDummy bool) {
var handlerOpts promhttp.HandlerOpts
if config.IgnoreErrors {
handlerOpts.ErrorHandling = promhttp.ContinueOnError
}

name := ""
mux := http.NewServeMux()
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
if isDummy {
// dummy metrics server responds to all requests with a 200 status, but without providing any metrics data
name = "dummy metrics server"
mux.HandleFunc(config.Path, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
} else {
name = "prometheus metrics server"
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
}
srv := &http.Server{Addr: fmt.Sprintf(":%v", config.Port), Handler: mux}

if config.Secure {
Expand All @@ -67,15 +78,15 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
}
srv.TLSConfig = tlsConfig
go func() {
log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != nil {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != http.ErrServerClosed {
panic(err)
}
}()
} else {
go func() {
log.Infof("Starting prometheus metrics server at localhost:%v%s", config.Port, config.Path)
if err := srv.ListenAndServe(); err != nil {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
panic(err)
}
}()
Expand All @@ -88,7 +99,9 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Infof("Unable to shutdown metrics server at localhost:%v%s", config.Port, config.Path)
log.Infof("Unable to shutdown %s at localhost:%v%s", name, config.Port, config.Path)
} else {
log.Infof("Successfully shutdown %s at localhost:%v%s", name, config.Port, config.Path)
}
}

Expand Down
51 changes: 51 additions & 0 deletions workflow/metrics/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package metrics

import (
"context"
"fmt"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
)

func TestRunServer(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
}
m := New(config, config)

server := func(isDummy bool, shouldBodyBeEmpty bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go m.RunServer(ctx, isDummy)

resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultMetricsServerPort, DefaultMetricsServerPath))
assert.NoError(t, err)
defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)

bodyString := string(bodyBytes)

assert.Equal(t, http.StatusOK, resp.StatusCode)
if shouldBodyBeEmpty {
assert.Empty(t, bodyString)
} else {
assert.NotEmpty(t, bodyString)
}
}

t.Run("dummy metrics server", func(t *testing.T) {
server(true, true) // dummy metrics server does not provide any metrics responses
})

t.Run("prometheus metrics server", func(t *testing.T) {
server(false, false) // prometheus metrics server provides responses for any metrics
})
}