diff --git a/agent/metrics_test.go b/agent/metrics_test.go index 6f75a4d233b3..e474281ee552 100644 --- a/agent/metrics_test.go +++ b/agent/metrics_test.go @@ -9,32 +9,44 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/rpc/middleware" - "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" + testretry "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" ) +var metricsPrefixCounter atomic.Uint64 + +// getUniqueMetricsPrefix generates a unique ID for each test to use as a metrics prefix. +// This is needed because go-metrics is effectively a global variable. +func getUniqueMetricsPrefix() string { + return fmt.Sprint("metrics_", metricsPrefixCounter.Add(1)) +} + func skipIfShortTesting(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } } -func recordPromMetrics(t *testing.T, a *TestAgent, respRec *httptest.ResponseRecorder) { - t.Helper() +func recordPromMetrics(t require.TestingT, a *TestAgent, respRec *httptest.ResponseRecorder) { + if tt, ok := t.(*testing.T); ok { + tt.Helper() + } + req, err := http.NewRequest("GET", "/v1/agent/metrics?format=prometheus", nil) require.NoError(t, err, "Failed to generate new http request.") - _, err = a.srv.AgentMetrics(respRec, req) - require.NoError(t, err, "Failed to serve agent metrics") - + a.srv.h.ServeHTTP(respRec, req) + require.Equalf(t, 200, respRec.Code, "expected 200, got %d, body: %s", respRec.Code, respRec.Body.String()) } func assertMetricExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) { @@ -175,7 +187,7 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) { // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance t.Run("Check that 1.12 rpc metrics are not emitted by default.", func(t *testing.T) { - metricsPrefix := "new_rpc_metrics" + metricsPrefix := getUniqueMetricsPrefix() hcl := fmt.Sprintf(` telemetry = { prometheus_retention_time = "5s" @@ -198,7 +210,7 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) { }) t.Run("Check that 1.12 rpc metrics are emitted when specified by operator.", func(t *testing.T) { - metricsPrefix := "new_rpc_metrics_2" + metricsPrefix := getUniqueMetricsPrefix() allowRPCMetricRule := metricsPrefix + "." + strings.Join(middleware.OneTwelveRPCSummary[0].Name, ".") hcl := fmt.Sprintf(` telemetry = { @@ -237,44 +249,63 @@ func TestHTTPHandlers_AgentMetrics_LeaderShipMetrics(t *testing.T) { // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance t.Run("check that metric isLeader is set properly on server", func(t *testing.T) { - hcl := ` + metricsPrefix1 := getUniqueMetricsPrefix() + metricsPrefix2 := getUniqueMetricsPrefix() + + hcl1 := fmt.Sprintf(` + server = true telemetry = { - prometheus_retention_time = "5s", - metrics_prefix = "agent_is_leader" + prometheus_retention_time = "25s", + disable_hostname = true + metrics_prefix = "%s" } - ` + `, metricsPrefix1) - a := StartTestAgent(t, TestAgent{HCL: hcl}) - defer a.Shutdown() + hcl2 := fmt.Sprintf(` + server = true + telemetry = { + prometheus_retention_time = "25s", + disable_hostname = true + metrics_prefix = "%s" + } + `, metricsPrefix2) - retryWithBackoff := func(expectedStr string) error { - waiter := &retry.Waiter{ - MaxWait: 1 * time.Minute, - } - ctx := context.Background() - for { - if waiter.Failures() > 7 { - return fmt.Errorf("reach max failure: %d", waiter.Failures()) - } - respRec := httptest.NewRecorder() - recordPromMetrics(t, a, respRec) + overrides := ` + bootstrap = false + bootstrap_expect = 2 + ` + + s1 := StartTestAgent(t, TestAgent{Name: "s1", HCL: hcl1, Overrides: overrides}) + s2 := StartTestAgent(t, TestAgent{Name: "s2", HCL: hcl2, Overrides: overrides}) + defer s1.Shutdown() + defer s2.Shutdown() - out := respRec.Body.String() - if strings.Contains(out, expectedStr) { - return nil - } - waiter.Wait(ctx) - } - } // agent hasn't become a leader - err := retryWithBackoff("isLeader 0") - require.NoError(t, err, "non-leader server should have isLeader 0") - testrpc.WaitForLeader(t, a.RPC, "dc1") + retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: time.Second}, t, func(r *testretry.R) { + respRec := httptest.NewRecorder() + recordPromMetrics(r, s1, respRec) + found := strings.Contains(respRec.Body.String(), metricsPrefix1+"_server_isLeader 0") + require.True(r, found, "non-leader server should have isLeader 0") + }) + + _, err := s2.JoinLAN([]string{s1.Config.SerfBindAddrLAN.String()}, nil) + require.NoError(t, err) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc1") // Verify agent's isLeader metrics is 1 - err = retryWithBackoff("isLeader 1") - require.NoError(t, err, "leader should have isLeader 1") + retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: time.Second}, t, func(r *testretry.R) { + respRec1 := httptest.NewRecorder() + recordPromMetrics(r, s1, respRec1) + found1 := strings.Contains(respRec1.Body.String(), metricsPrefix1+"_server_isLeader 1") + + respRec2 := httptest.NewRecorder() + recordPromMetrics(r, s2, respRec2) + found2 := strings.Contains(respRec2.Body.String(), metricsPrefix2+"_server_isLeader 1") + + require.True(r, found1 || found2, "leader server should have isLeader 1") + }) }) } @@ -285,15 +316,16 @@ func TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus(t *testing.T) { // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance t.Run("Check consul_autopilot_* are not emitted metrics on clients", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` telemetry = { prometheus_retention_time = "5s" disable_hostname = true - metrics_prefix = "agent_1" + metrics_prefix = "%s" } bootstrap = false server = false - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -301,21 +333,22 @@ func TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - assertMetricNotExists(t, respRec, "agent_1_autopilot_healthy") - assertMetricNotExists(t, respRec, "agent_1_autopilot_failure_tolerance") + assertMetricNotExists(t, respRec, metricsPrefix+"_autopilot_healthy") + assertMetricNotExists(t, respRec, metricsPrefix+"_autopilot_failure_tolerance") }) t.Run("Check consul_autopilot_healthy metric value on startup", func(t *testing.T) { + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` // don't bootstrap agent so as not to // become a leader - hcl := ` telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_2" + metrics_prefix = "%s" } bootstrap = false - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -323,8 +356,8 @@ func TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - assertMetricExistsWithValue(t, respRec, "agent_2_autopilot_healthy", "1") - assertMetricExistsWithValue(t, respRec, "agent_2_autopilot_failure_tolerance", "0") + assertMetricExistsWithValue(t, respRec, metricsPrefix+"_autopilot_healthy", "1") + assertMetricExistsWithValue(t, respRec, metricsPrefix+"_autopilot_failure_tolerance", "0") }) } @@ -360,16 +393,17 @@ func TestHTTPHandlers_AgentMetrics_TLSCertExpiry_Prometheus(t *testing.T) { err = os.WriteFile(keyPath, []byte(key), 0600) require.NoError(t, err) + metricsPrefix := getUniqueMetricsPrefix() hcl := fmt.Sprintf(` telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_3" + metrics_prefix = "%s" } ca_file = "%s" cert_file = "%s" key_file = "%s" - `, caPath, certPath, keyPath) + `, metricsPrefix, caPath, certPath, keyPath) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -377,7 +411,7 @@ func TestHTTPHandlers_AgentMetrics_TLSCertExpiry_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - require.Contains(t, respRec.Body.String(), "agent_3_agent_tls_cert_expiry 1.7") + require.Contains(t, respRec.Body.String(), metricsPrefix+"_agent_tls_cert_expiry 1.7") } func TestHTTPHandlers_AgentMetrics_CACertExpiry_Prometheus(t *testing.T) { @@ -385,17 +419,18 @@ func TestHTTPHandlers_AgentMetrics_CACertExpiry_Prometheus(t *testing.T) { // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance t.Run("non-leader emits NaN", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_4" + metrics_prefix = "%s" } connect { enabled = true } bootstrap = false - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -403,21 +438,22 @@ func TestHTTPHandlers_AgentMetrics_CACertExpiry_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - require.Contains(t, respRec.Body.String(), "agent_4_mesh_active_root_ca_expiry NaN") - require.Contains(t, respRec.Body.String(), "agent_4_mesh_active_signing_ca_expiry NaN") + require.Contains(t, respRec.Body.String(), metricsPrefix+"_mesh_active_root_ca_expiry NaN") + require.Contains(t, respRec.Body.String(), metricsPrefix+"_mesh_active_signing_ca_expiry NaN") }) t.Run("leader emits a value", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_5" + metrics_prefix = "%s" } connect { enabled = true } - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -427,8 +463,8 @@ func TestHTTPHandlers_AgentMetrics_CACertExpiry_Prometheus(t *testing.T) { recordPromMetrics(t, a, respRec) out := respRec.Body.String() - require.Contains(t, out, "agent_5_mesh_active_root_ca_expiry 3.15") - require.Contains(t, out, "agent_5_mesh_active_signing_ca_expiry 3.15") + require.Contains(t, out, metricsPrefix+"_mesh_active_root_ca_expiry 3.15") + require.Contains(t, out, metricsPrefix+"_mesh_active_signing_ca_expiry 3.15") }) } @@ -438,18 +474,19 @@ func TestHTTPHandlers_AgentMetrics_WAL_Prometheus(t *testing.T) { // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance t.Run("client agent emits nothing", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` server = false telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_4" + metrics_prefix = "%s" } raft_logstore { backend = "wal" } bootstrap = false - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -457,17 +494,18 @@ func TestHTTPHandlers_AgentMetrics_WAL_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - require.NotContains(t, respRec.Body.String(), "agent_4_raft_wal") + require.NotContains(t, respRec.Body.String(), metricsPrefix+"_raft_wal") }) t.Run("server with WAL enabled emits WAL metrics", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` server = true bootstrap = true telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_5" + metrics_prefix = "%s" } connect { enabled = true @@ -475,37 +513,41 @@ func TestHTTPHandlers_AgentMetrics_WAL_Prometheus(t *testing.T) { raft_logstore { backend = "wal" } - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") - respRec := httptest.NewRecorder() - recordPromMetrics(t, a, respRec) + testretry.Run(t, func(r *testretry.R) { + respRec := httptest.NewRecorder() + recordPromMetrics(r, a, respRec) + + out := respRec.Body.String() + require.Contains(r, out, metricsPrefix+"_raft_wal_head_truncations") + require.Contains(r, out, metricsPrefix+"_raft_wal_last_segment_age_seconds") + require.Contains(r, out, metricsPrefix+"_raft_wal_log_appends") + require.Contains(r, out, metricsPrefix+"_raft_wal_log_entries_read") + require.Contains(r, out, metricsPrefix+"_raft_wal_log_entries_written") + require.Contains(r, out, metricsPrefix+"_raft_wal_log_entry_bytes_read") + require.Contains(r, out, metricsPrefix+"_raft_wal_log_entry_bytes_written") + require.Contains(r, out, metricsPrefix+"_raft_wal_segment_rotations") + require.Contains(r, out, metricsPrefix+"_raft_wal_stable_gets") + require.Contains(r, out, metricsPrefix+"_raft_wal_stable_sets") + require.Contains(r, out, metricsPrefix+"_raft_wal_tail_truncations") + }) - out := respRec.Body.String() - require.Contains(t, out, "agent_5_raft_wal_head_truncations") - require.Contains(t, out, "agent_5_raft_wal_last_segment_age_seconds") - require.Contains(t, out, "agent_5_raft_wal_log_appends") - require.Contains(t, out, "agent_5_raft_wal_log_entries_read") - require.Contains(t, out, "agent_5_raft_wal_log_entries_written") - require.Contains(t, out, "agent_5_raft_wal_log_entry_bytes_read") - require.Contains(t, out, "agent_5_raft_wal_log_entry_bytes_written") - require.Contains(t, out, "agent_5_raft_wal_segment_rotations") - require.Contains(t, out, "agent_5_raft_wal_stable_gets") - require.Contains(t, out, "agent_5_raft_wal_stable_sets") - require.Contains(t, out, "agent_5_raft_wal_tail_truncations") }) t.Run("server without WAL enabled emits no WAL metrics", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` server = true bootstrap = true telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_6" + metrics_prefix = "%s" } connect { enabled = true @@ -513,7 +555,7 @@ func TestHTTPHandlers_AgentMetrics_WAL_Prometheus(t *testing.T) { raft_logstore { backend = "boltdb" } - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -522,7 +564,7 @@ func TestHTTPHandlers_AgentMetrics_WAL_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - require.NotContains(t, respRec.Body.String(), "agent_6_raft_wal") + require.NotContains(t, respRec.Body.String(), metricsPrefix+"_raft_wal") }) } @@ -532,12 +574,13 @@ func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) { // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance t.Run("client agent emits nothing", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` server = false telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_4" + metrics_prefix = "%s" } raft_logstore { verification { @@ -546,7 +589,7 @@ func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) { } } bootstrap = false - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -554,17 +597,18 @@ func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - require.NotContains(t, respRec.Body.String(), "agent_4_raft_logstore_verifier") + require.NotContains(t, respRec.Body.String(), metricsPrefix+"_raft_logstore_verifier") }) t.Run("server with verifier enabled emits all metrics", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` server = true bootstrap = true telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_5" + metrics_prefix = "%s" } connect { enabled = true @@ -575,31 +619,34 @@ func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) { interval = "1s" } } - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") - respRec := httptest.NewRecorder() - recordPromMetrics(t, a, respRec) - - out := respRec.Body.String() - require.Contains(t, out, "agent_5_raft_logstore_verifier_checkpoints_written") - require.Contains(t, out, "agent_5_raft_logstore_verifier_dropped_reports") - require.Contains(t, out, "agent_5_raft_logstore_verifier_ranges_verified") - require.Contains(t, out, "agent_5_raft_logstore_verifier_read_checksum_failures") - require.Contains(t, out, "agent_5_raft_logstore_verifier_write_checksum_failures") + testretry.Run(t, func(r *testretry.R) { + respRec := httptest.NewRecorder() + recordPromMetrics(r, a, respRec) + + out := respRec.Body.String() + require.Contains(r, out, metricsPrefix+"_raft_logstore_verifier_checkpoints_written") + require.Contains(r, out, metricsPrefix+"_raft_logstore_verifier_dropped_reports") + require.Contains(r, out, metricsPrefix+"_raft_logstore_verifier_ranges_verified") + require.Contains(r, out, metricsPrefix+"_raft_logstore_verifier_read_checksum_failures") + require.Contains(r, out, metricsPrefix+"_raft_logstore_verifier_write_checksum_failures") + }) }) t.Run("server with verifier disabled emits no extra metrics", func(t *testing.T) { - hcl := ` + metricsPrefix := getUniqueMetricsPrefix() + hcl := fmt.Sprintf(` server = true bootstrap = true telemetry = { prometheus_retention_time = "5s", disable_hostname = true - metrics_prefix = "agent_6" + metrics_prefix = "%s" } connect { enabled = true @@ -609,7 +656,7 @@ func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) { enabled = false } } - ` + `, metricsPrefix) a := StartTestAgent(t, TestAgent{HCL: hcl}) defer a.Shutdown() @@ -618,7 +665,7 @@ func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - require.NotContains(t, respRec.Body.String(), "agent_6_raft_logstore_verifier") + require.NotContains(t, respRec.Body.String(), metricsPrefix+"_raft_logstore_verifier") }) } diff --git a/agent/snapshot_endpoint_test.go b/agent/snapshot_endpoint_test.go index 968aa8fe74cd..a08b393e3378 100644 --- a/agent/snapshot_endpoint_test.go +++ b/agent/snapshot_endpoint_test.go @@ -71,6 +71,7 @@ func TestSnapshot_Options(t *testing.T) { t.Run(method, func(t *testing.T) { a := NewTestAgent(t, TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") body := bytes.NewBuffer(nil) req, _ := http.NewRequest(method, "/v1/snapshot", body) @@ -85,6 +86,7 @@ func TestSnapshot_Options(t *testing.T) { t.Run(method, func(t *testing.T) { a := NewTestAgent(t, TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") body := bytes.NewBuffer(nil) req, _ := http.NewRequest(method, "/v1/snapshot?dc=nope", body) @@ -98,6 +100,7 @@ func TestSnapshot_Options(t *testing.T) { t.Run(method, func(t *testing.T) { a := NewTestAgent(t, TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") body := bytes.NewBuffer(nil) req, _ := http.NewRequest(method, "/v1/snapshot?stale", body)