diff --git a/pkg/ccl/sqlproxyccl/server.go b/pkg/ccl/sqlproxyccl/server.go index 0ce49691d128..a20adc5c9557 100644 --- a/pkg/ccl/sqlproxyccl/server.go +++ b/pkg/ccl/sqlproxyccl/server.go @@ -108,9 +108,10 @@ func NewServer(ctx context.Context, stopper *stop.Stopper, options ProxyOptions) } s.mu.errorLogLimiter = cache.NewUnorderedCache(cacheConfig) - // /_status/{healthz,vars} matches CRDB's healthcheck and metrics + // /metrics and /_status/{healthz,vars} matches CRDB's healthcheck and metrics // endpoints. - mux.HandleFunc("/_status/vars/", s.handleVars) + mux.HandleFunc("/metrics", s.handleMetricsWithLabels) + mux.HandleFunc("/_status/vars/", s.handleMetricsWithoutLabels) mux.HandleFunc("/_status/healthz/", s.handleHealth) mux.HandleFunc("/_status/cancel/", s.handleCancel) @@ -145,16 +146,25 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("OK")) } -func (s *Server) handleVars(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleMetricsWithLabels(w http.ResponseWriter, r *http.Request) { + s.handleMetricsHelper(w, r, true) +} + +func (s *Server) handleMetricsWithoutLabels(w http.ResponseWriter, r *http.Request) { + s.handleMetricsHelper(w, r, false) +} + +func (s *Server) handleMetricsHelper(w http.ResponseWriter, r *http.Request, useStaticLabels bool) { contentType := expfmt.Negotiate(r.Header) w.Header().Set(httputil.ContentTypeHeader, string(contentType)) scrape := func(pm *metric.PrometheusExporter) { - pm.ScrapeRegistry(s.metricsRegistry, metric.WithIncludeChildMetrics(true), metric.WithIncludeAggregateMetrics(true)) + pm.ScrapeRegistry(s.metricsRegistry, metric.WithIncludeChildMetrics(true), metric.WithIncludeAggregateMetrics(true), metric.WithUseStaticLabels(useStaticLabels)) } if err := s.prometheusExporter.ScrapeAndPrintAsText(w, contentType, scrape); err != nil { log.Errorf(r.Context(), "%v", err) http.Error(w, err.Error(), http.StatusInternalServerError) } + } // handleCancel processes a cancel request that has been forwarded from another @@ -200,7 +210,7 @@ func (s *Server) handleCancel(w http.ResponseWriter, r *http.Request) { } // ServeHTTP starts the proxy's HTTP server on the given listener. -// The server provides Prometheus metrics at /_status/vars, +// The server provides Prometheus metrics at /metrics and/_status/vars, // a health check endpoint at /_status/healthz, and pprof debug // endpoints at /debug/pprof. func (s *Server) ServeHTTP(ctx context.Context, ln net.Listener) error { diff --git a/pkg/server/apiconstants/constants.go b/pkg/server/apiconstants/constants.go index 5e38e31eacb3..9a53cc3839a7 100644 --- a/pkg/server/apiconstants/constants.go +++ b/pkg/server/apiconstants/constants.go @@ -23,6 +23,9 @@ const ( // StatusVars exposes Prometheus metrics for monitoring consumption. StatusVars = StatusPrefix + "vars" + // Metrics exposes Prometheus metrics for monitoring consumption. + MetricsPath = "/metrics" + // LoadStatusVars exposes prometheus metrics for instant monitoring of CPU load. LoadStatusVars = StatusPrefix + "load" diff --git a/pkg/server/application_api/metrics_test.go b/pkg/server/application_api/metrics_test.go index d8c510e24b6f..5f252f7d76cd 100644 --- a/pkg/server/application_api/metrics_test.go +++ b/pkg/server/application_api/metrics_test.go @@ -151,8 +151,13 @@ func TestStatusVars(t *testing.T) { if body, err := srvtestutils.GetText(s, s.AdminURL().WithPath(apiconstants.StatusPrefix+"vars").String()); err != nil { t.Fatal(err) - } else if !bytes.Contains(body, []byte("# TYPE sql_bytesout counter\nsql_bytesout")) { - t.Errorf("expected sql_bytesout, got: %s", body) + } else { + if !bytes.Contains(body, []byte("# TYPE sql_bytesout counter\nsql_bytesout")) { + t.Errorf("expected sql_bytesout, got: %s", body) + } + if !bytes.Contains(body, []byte(`# TYPE sql_insert_count counter`)) { + t.Errorf("expected sql_insert_count, got: %s", body) + } } if body, err := srvtestutils.GetText(s, s.AdminURL().WithPath(apiconstants.StatusPrefix+"load").String()); err != nil { t.Fatal(err) @@ -161,6 +166,26 @@ func TestStatusVars(t *testing.T) { } } +func TestMetricsEndpoint(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + srv := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(context.Background()) + + s := srv.ApplicationLayer() + + if body, err := srvtestutils.GetText(s, s.AdminURL().WithPath("/metrics").String()); err != nil { + t.Fatal(err) + } else { + if !bytes.Contains(body, []byte(`# TYPE sql_bytesout counter`)) { + t.Errorf("expected sql_bytesout, got: %s", body) + } + if !bytes.Contains(body, []byte(`# TYPE sql_count counter`)) { + t.Errorf("expected sql_count, got: %s", body) + } + } +} + // TestStatusVarsTxnMetrics verifies that the metrics from the /_status/vars // endpoint for txns and the special cockroach_restart savepoint are correct. func TestStatusVarsTxnMetrics(t *testing.T) { diff --git a/pkg/server/server_http.go b/pkg/server/server_http.go index 8689315d02c1..c58cc0f8d95e 100644 --- a/pkg/server/server_http.go +++ b/pkg/server/server_http.go @@ -213,8 +213,9 @@ func (s *httpServer) setupRoutes( // Exempt the 2nd health check endpoint from authentication. // (This simply mirrors /health and exists for backward compatibility.) s.mux.Handle(apiconstants.AdminHealth, handleRequestsUnauthenticated) - // The /_status/vars endpoint is not authenticated either. Useful for monitoring. - s.mux.Handle(apiconstants.StatusVars, http.HandlerFunc(varsHandler{metricSource, s.cfg.Settings}.handleVars)) + // The /_status/vars and /metrics endpoint is not authenticated either. Useful for monitoring. + s.mux.Handle(apiconstants.StatusVars, http.HandlerFunc(varsHandler{metricSource, s.cfg.Settings, false /* useStaticLabels */}.handleVars)) + s.mux.Handle(apiconstants.MetricsPath, http.HandlerFunc(varsHandler{metricSource, s.cfg.Settings, true /* useStaticLabels */}.handleVars)) // Same for /_status/load. le, err := newLoadEndpoint(runtimeStatSampler, metricSource) if err != nil { diff --git a/pkg/server/status.go b/pkg/server/status.go index cb9400edadc5..7343c7db71a2 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -122,7 +122,7 @@ const ( type metricMarshaler interface { json.Marshaler - PrintAsText(io.Writer, expfmt.Format) error + PrintAsText(io.Writer, expfmt.Format, bool) error ScrapeIntoPrometheus(pm *metric.PrometheusExporter) } @@ -2388,8 +2388,9 @@ func (s *systemStatusServer) RaftDebug( } type varsHandler struct { - metricSource metricMarshaler - st *cluster.Settings + metricSource metricMarshaler + st *cluster.Settings + useStaticLabels bool } func (h varsHandler) handleVars(w http.ResponseWriter, r *http.Request) { @@ -2397,7 +2398,7 @@ func (h varsHandler) handleVars(w http.ResponseWriter, r *http.Request) { contentType := expfmt.Negotiate(r.Header) w.Header().Set(httputil.ContentTypeHeader, string(contentType)) - err := h.metricSource.PrintAsText(w, contentType) + err := h.metricSource.PrintAsText(w, contentType, h.useStaticLabels) if err != nil { log.Errorf(ctx, "%v", err) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index 01bf765108d2..1a7aa7bbf58d 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -336,34 +336,50 @@ func (mr *MetricsRecorder) MarshalJSON() ([]byte, error) { // ScrapeIntoPrometheus updates the passed-in prometheusExporter's metrics // snapshot. func (mr *MetricsRecorder) ScrapeIntoPrometheus(pm *metric.PrometheusExporter) { - mr.mu.RLock() - defer mr.mu.RUnlock() - if mr.mu.nodeRegistry == nil { - // We haven't yet processed initialization information; output nothing. - if log.V(1) { - log.Warning(context.TODO(), "MetricsRecorder asked to scrape metrics before NodeID allocation") + mr.ScrapeIntoPrometheusWithStaticLabels(false)(pm) +} + +func (mr *MetricsRecorder) ScrapeIntoPrometheusWithStaticLabels( + useStaticLabels bool, +) func(pm *metric.PrometheusExporter) { + return func(pm *metric.PrometheusExporter) { + mr.mu.RLock() + defer mr.mu.RUnlock() + + includeChildMetrics := ChildMetricsEnabled.Get(&mr.settings.SV) + includeAggregateMetrics := includeAggregateMetricsEnabled.Get(&mr.settings.SV) + scrapeOptions := []metric.ScrapeOption{ + metric.WithIncludeChildMetrics(includeChildMetrics), + metric.WithIncludeAggregateMetrics(includeAggregateMetrics), + metric.WithUseStaticLabels(useStaticLabels), + } + if mr.mu.nodeRegistry == nil { + // We haven't yet processed initialization information; output nothing. + if log.V(1) { + log.Warning(context.TODO(), "MetricsRecorder asked to scrape metrics before NodeID allocation") + } + } + pm.ScrapeRegistry(mr.mu.nodeRegistry, scrapeOptions...) + pm.ScrapeRegistry(mr.mu.appRegistry, scrapeOptions...) + pm.ScrapeRegistry(mr.mu.logRegistry, scrapeOptions...) + pm.ScrapeRegistry(mr.mu.sysRegistry, scrapeOptions...) + for _, reg := range mr.mu.storeRegistries { + pm.ScrapeRegistry(reg, scrapeOptions...) + } + for _, tenantRegistry := range mr.mu.tenantRegistries { + pm.ScrapeRegistry(tenantRegistry, scrapeOptions...) } - } - includeChildMetrics := ChildMetricsEnabled.Get(&mr.settings.SV) - includeAggregateMetrics := includeAggregateMetricsEnabled.Get(&mr.settings.SV) - pm.ScrapeRegistry(mr.mu.nodeRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics)) - pm.ScrapeRegistry(mr.mu.appRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics)) - pm.ScrapeRegistry(mr.mu.logRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics)) - pm.ScrapeRegistry(mr.mu.sysRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics)) - for _, reg := range mr.mu.storeRegistries { - pm.ScrapeRegistry(reg, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics)) - } - for _, tenantRegistry := range mr.mu.tenantRegistries { - pm.ScrapeRegistry(tenantRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics)) } } // PrintAsText writes the current metrics values as plain-text to the writer. // We write metrics to a temporary buffer which is then copied to the writer. // This is to avoid hanging requests from holding the lock. -func (mr *MetricsRecorder) PrintAsText(w io.Writer, contentType expfmt.Format) error { +func (mr *MetricsRecorder) PrintAsText( + w io.Writer, contentType expfmt.Format, useStaticLabels bool, +) error { var buf bytes.Buffer - if err := mr.prometheusExporter.ScrapeAndPrintAsText(&buf, contentType, mr.ScrapeIntoPrometheus); err != nil { + if err := mr.prometheusExporter.ScrapeAndPrintAsText(&buf, contentType, mr.ScrapeIntoPrometheusWithStaticLabels(useStaticLabels)); err != nil { return err } _, err := buf.WriteTo(w) diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index fb6bcf440e87..9fa205fb6269 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -160,14 +160,14 @@ func TestMetricsRecorderLabels(t *testing.T) { recorder.AddTenantRegistry(tenantID, regTenant) buf := bytes.NewBuffer([]byte{}) - err = recorder.PrintAsText(buf, expfmt.FmtText) + err = recorder.PrintAsText(buf, expfmt.FmtText, false) require.NoError(t, err) require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="system"} 123`) require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="application"} 456`) bufTenant := bytes.NewBuffer([]byte{}) - err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText) + err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText, false) require.NoError(t, err) require.NotContains(t, bufTenant.String(), `some_metric{node_id="7",tenant="system"} 123`) @@ -178,14 +178,14 @@ func TestMetricsRecorderLabels(t *testing.T) { appNameContainer.Set("application2") buf = bytes.NewBuffer([]byte{}) - err = recorder.PrintAsText(buf, expfmt.FmtText) + err = recorder.PrintAsText(buf, expfmt.FmtText, false) require.NoError(t, err) require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="system"} 123`) require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="application2"} 456`) bufTenant = bytes.NewBuffer([]byte{}) - err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText) + err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText, false) require.NoError(t, err) require.NotContains(t, bufTenant.String(), `some_metric{node_id="7",tenant="system"} 123`) @@ -724,7 +724,7 @@ func TestMetricsRecorder(t *testing.T) { if _, err := recorder.MarshalJSON(); err != nil { t.Error(err) } - _ = recorder.PrintAsText(io.Discard, expfmt.FmtText) + _ = recorder.PrintAsText(io.Discard, expfmt.FmtText, false) _ = recorder.GetTimeSeriesData() wg.Done() }() diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 77f78fe3fac0..1e7f9220e3c7 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1070,28 +1070,36 @@ var ( Unit: metric.Unit_COUNT, } MetaSelectExecuted = metric.Metadata{ - Name: "sql.select.count", - Help: "Number of SQL SELECT statements successfully executed", - Measurement: "SQL Statements", - Unit: metric.Unit_COUNT, + Name: "sql.select.count", + Help: "Number of SQL SELECT statements successfully executed", + Measurement: "SQL Statements", + Unit: metric.Unit_COUNT, + LabeledName: "sql.count", + StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "select"), } MetaUpdateExecuted = metric.Metadata{ - Name: "sql.update.count", - Help: "Number of SQL UPDATE statements successfully executed", - Measurement: "SQL Statements", - Unit: metric.Unit_COUNT, + Name: "sql.update.count", + Help: "Number of SQL UPDATE statements successfully executed", + Measurement: "SQL Statements", + Unit: metric.Unit_COUNT, + LabeledName: "sql.count", + StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "update"), } MetaInsertExecuted = metric.Metadata{ - Name: "sql.insert.count", - Help: "Number of SQL INSERT statements successfully executed", - Measurement: "SQL Statements", - Unit: metric.Unit_COUNT, + Name: "sql.insert.count", + Help: "Number of SQL INSERT statements successfully executed", + Measurement: "SQL Statements", + Unit: metric.Unit_COUNT, + LabeledName: "sql.count", + StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "insert"), } MetaDeleteExecuted = metric.Metadata{ - Name: "sql.delete.count", - Help: "Number of SQL DELETE statements successfully executed", - Measurement: "SQL Statements", - Unit: metric.Unit_COUNT, + Name: "sql.delete.count", + Help: "Number of SQL DELETE statements successfully executed", + Measurement: "SQL Statements", + Unit: metric.Unit_COUNT, + LabeledName: "sql.count", + StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "delete"), } MetaCRUDExecuted = metric.Metadata{ Name: "sql.crud_query.count", @@ -1276,6 +1284,9 @@ func getMetricMeta(meta metric.Metadata, internal bool) metric.Metadata { meta.Name += ".internal" meta.Help += " (internal queries)" meta.Measurement = "SQL Internal Statements" + if meta.LabeledName != "" { + meta.StaticLabels = append(meta.StaticLabels, metric.MakeLabelPairs(metric.LabelQueryInternal, "true")...) + } } return meta } diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index ac49df718775..813aafc6ce0d 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -39,6 +39,13 @@ const ( CardinalityLimit = 2000 ) +// Maintaining a list of static label names here to avoid duplication and +// encourage reuse of label names across the codebase. +const ( + LabelQueryType = "query_type" + LabelQueryInternal = "query_internal" +) + // Iterable provides a method for synchronized access to interior objects. type Iterable interface { // GetName returns the fully-qualified name of the metric. @@ -1562,3 +1569,17 @@ func (hv *HistogramVec) ToPrometheusMetrics() []*prometheusgo.Metric { return metrics } + +func MakeLabelPairs(labelNamesAndValues ...string) []*LabelPair { + if len(labelNamesAndValues)%2 != 0 { + panic("labelNamesAndValues must be a list with even length of label names and values") + } + labelPairs := make([]*LabelPair, 0, len(labelNamesAndValues)/2) + for i := 0; i < len(labelNamesAndValues); i += 2 { + labelPairs = append(labelPairs, &LabelPair{ + Name: &labelNamesAndValues[i], + Value: &labelNamesAndValues[i+1], + }) + } + return labelPairs +} diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index 5d26b2f379af..982077880fb5 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -1068,3 +1068,59 @@ func TestMetadataGetLabels(t *testing.T) { }) } } + +func TestMakeLabelPairs(t *testing.T) { + tests := []struct { + name string + args []string + want []*LabelPair + expectPanic bool + }{ + { + name: "empty args", + args: []string{}, + want: []*LabelPair{}, + }, + { + name: "single arg", + args: []string{"label1"}, + expectPanic: true, + }, + { + name: "odd number of args", + args: []string{"label1", "value1", "label2", "value2", "label3"}, + expectPanic: true, + }, + { + name: "even number of args", + args: []string{"label1", "value1", "label2", "value2"}, + want: []*LabelPair{ + {Name: proto.String("label1"), Value: proto.String("value1")}, + {Name: proto.String("label2"), Value: proto.String("value2")}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.expectPanic { + require.Panics(t, func() { MakeLabelPairs(tt.args...) }) + return + } + + got := MakeLabelPairs(tt.args...) + if len(got) != len(tt.want) { + t.Errorf("MakeLabelPairs() returned %d pairs, want %d", len(got), len(tt.want)) + return + } + for i := range got { + if *got[i].Name != *tt.want[i].Name { + t.Errorf("pair %d: got name %q, want %q", i, *got[i].Name, *tt.want[i].Name) + } + if *got[i].Value != *tt.want[i].Value { + t.Errorf("pair %d: got value %q, want %q", i, *got[i].Value, *tt.want[i].Value) + } + } + }) + } +}