Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): handle race when starting metrics server. Fixes #10807 #13731

Merged
merged 6 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewRootCommand() *cobra.Command {
log.WithField("id", "single-instance").Info("starting leading")

go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers)
go wfController.RunPrometheusServer(ctx, false)
wfController.RunPrometheusServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
Expand All @@ -136,7 +136,7 @@ func NewRootCommand() *cobra.Command {
// for controlling the dummy metrics server
dummyCtx, dummyCancel := context.WithCancel(context.Background())
defer dummyCancel()
go wfController.RunPrometheusServer(dummyCtx, true)
wg := wfController.RunPrometheusServer(dummyCtx, true)
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
Expand All @@ -150,12 +150,14 @@ func NewRootCommand() *cobra.Command {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
wg.Wait()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers)
go wfController.RunPrometheusServer(ctx, false)
},
OnStoppedLeading: func() {
log.WithField("id", nodeID).Info("stopped leading")
cancel()
wg.Wait()
go wfController.RunPrometheusServer(dummyCtx, true)
},
OnNewLeader: func(identity string) {
Expand Down
4 changes: 3 additions & 1 deletion util/telemetry/exporter_prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"sync"
"time"

promgo "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -53,7 +54,8 @@ func (config *Config) port() int {

// RunPrometheusServer starts a prometheus metrics server
// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started
func (m *Metrics) RunPrometheusServer(ctx context.Context, isDummy bool) {
func (m *Metrics) RunPrometheusServer(ctx context.Context, isDummy bool, wg *sync.WaitGroup) {
defer wg.Done()
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
if !m.config.Enabled {
return
}
Expand Down
23 changes: 15 additions & 8 deletions util/telemetry/exporter_prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"testing"
"time"

Expand All @@ -18,6 +19,7 @@ import (
const testScopeName string = "argo-workflows-test"

func TestDisablePrometheusServer(t *testing.T) {
var wg sync.WaitGroup
config := Config{
Enabled: false,
Path: DefaultPrometheusServerPath,
Expand All @@ -27,8 +29,9 @@ func TestDisablePrometheusServer(t *testing.T) {
defer cancel()
m, err := NewMetrics(ctx, testScopeName, testScopeName, &config)
require.NoError(t, err)
go m.RunPrometheusServer(ctx, false)
time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait
wg.Add(1)
go m.RunPrometheusServer(ctx, false, &wg)
wg.Wait()
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath))
if resp != nil {
defer resp.Body.Close()
Expand All @@ -38,6 +41,7 @@ func TestDisablePrometheusServer(t *testing.T) {
}

func TestPrometheusServer(t *testing.T) {
var wg sync.WaitGroup
config := Config{
Enabled: true,
Path: DefaultPrometheusServerPath,
Expand All @@ -47,7 +51,8 @@ func TestPrometheusServer(t *testing.T) {
defer cancel()
m, err := NewMetrics(ctx, testScopeName, testScopeName, &config)
require.NoError(t, err)
go m.RunPrometheusServer(ctx, false)
wg.Add(1)
go m.RunPrometheusServer(ctx, false, &wg)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath))
require.NoError(t, err)
Expand All @@ -61,11 +66,12 @@ func TestPrometheusServer(t *testing.T) {
bodyString := string(bodyBytes)
assert.NotEmpty(t, bodyString)

cancel() // Explicit cancel as sometimes in github CI port 9090 is still busy
time.Sleep(1 * time.Second) // Wait for prometheus server
cancel() // cancel and wait for server shutdown to prevent port conflicts with subsequent tests
wg.Wait()
}

func TestDummyPrometheusServer(t *testing.T) {
var wg sync.WaitGroup
config := Config{
Enabled: true,
Path: DefaultPrometheusServerPath,
Expand All @@ -76,7 +82,8 @@ func TestDummyPrometheusServer(t *testing.T) {
defer cancel()
m, err := NewMetrics(ctx, testScopeName, testScopeName, &config)
require.NoError(t, err)
go m.RunPrometheusServer(ctx, true)
wg.Add(1)
go m.RunPrometheusServer(ctx, true, &wg)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath))
require.NoError(t, err)
Expand All @@ -91,6 +98,6 @@ func TestDummyPrometheusServer(t *testing.T) {

assert.Empty(t, bodyString) // expect the dummy metrics server to provide no metrics responses

cancel() // Explicit cancel as sometimes in github CI port 9090 is still busy
time.Sleep(1 * time.Second) // Wait for prometheus server
cancel() // cancel and wait for server shutdown to prevent port conflicts with subsequent tests
wg.Wait()
}
7 changes: 5 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
<-ctx.Done()
}

func (wfc *WorkflowController) RunPrometheusServer(ctx context.Context, isDummy bool) {
go wfc.metrics.RunPrometheusServer(ctx, isDummy)
func (wfc *WorkflowController) RunPrometheusServer(ctx context.Context, isDummy bool) *gosync.WaitGroup {
var wg gosync.WaitGroup
wg.Add(1)
go wfc.metrics.RunPrometheusServer(ctx, isDummy, &wg)
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
return &wg
}

// Create and the Synchronization Manager
Expand Down
Loading