Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions pkg/ccl/sqlproxyccl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ func NewServer(ctx context.Context, stopper *stop.Stopper, options ProxyOptions)
}
s.mu.errorLogLimiter = cache.NewUnorderedCache(cacheConfig)

// /_status/{healthz,vars} matches CRDB's healthcheck and metrics
// /metrics and /_status/{healthz,vars} matches CRDB's healthcheck and metrics
// endpoints.
mux.HandleFunc("/_status/vars/", s.handleVars)
mux.HandleFunc("/metrics", s.handleMetricsWithLabels)
mux.HandleFunc("/_status/vars/", s.handleMetricsWithoutLabels)
mux.HandleFunc("/_status/healthz/", s.handleHealth)
mux.HandleFunc("/_status/cancel/", s.handleCancel)

Expand Down Expand Up @@ -145,16 +146,25 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("OK"))
}

func (s *Server) handleVars(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleMetricsWithLabels(w http.ResponseWriter, r *http.Request) {
s.handleMetricsHelper(w, r, true)
}

func (s *Server) handleMetricsWithoutLabels(w http.ResponseWriter, r *http.Request) {
s.handleMetricsHelper(w, r, false)
}

func (s *Server) handleMetricsHelper(w http.ResponseWriter, r *http.Request, useStaticLabels bool) {
contentType := expfmt.Negotiate(r.Header)
w.Header().Set(httputil.ContentTypeHeader, string(contentType))
scrape := func(pm *metric.PrometheusExporter) {
pm.ScrapeRegistry(s.metricsRegistry, metric.WithIncludeChildMetrics(true), metric.WithIncludeAggregateMetrics(true))
pm.ScrapeRegistry(s.metricsRegistry, metric.WithIncludeChildMetrics(true), metric.WithIncludeAggregateMetrics(true), metric.WithUseStaticLabels(useStaticLabels))
}
if err := s.prometheusExporter.ScrapeAndPrintAsText(w, contentType, scrape); err != nil {
log.Errorf(r.Context(), "%v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

}

// handleCancel processes a cancel request that has been forwarded from another
Expand Down Expand Up @@ -200,7 +210,7 @@ func (s *Server) handleCancel(w http.ResponseWriter, r *http.Request) {
}

// ServeHTTP starts the proxy's HTTP server on the given listener.
// The server provides Prometheus metrics at /_status/vars,
// The server provides Prometheus metrics at /metrics and/_status/vars,
// a health check endpoint at /_status/healthz, and pprof debug
// endpoints at /debug/pprof.
func (s *Server) ServeHTTP(ctx context.Context, ln net.Listener) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/apiconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (
// StatusVars exposes Prometheus metrics for monitoring consumption.
StatusVars = StatusPrefix + "vars"

// Metrics exposes Prometheus metrics for monitoring consumption.
MetricsPath = "/metrics"

// LoadStatusVars exposes prometheus metrics for instant monitoring of CPU load.
LoadStatusVars = StatusPrefix + "load"

Expand Down
29 changes: 27 additions & 2 deletions pkg/server/application_api/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ func TestStatusVars(t *testing.T) {

if body, err := srvtestutils.GetText(s, s.AdminURL().WithPath(apiconstants.StatusPrefix+"vars").String()); err != nil {
t.Fatal(err)
} else if !bytes.Contains(body, []byte("# TYPE sql_bytesout counter\nsql_bytesout")) {
t.Errorf("expected sql_bytesout, got: %s", body)
} else {
if !bytes.Contains(body, []byte("# TYPE sql_bytesout counter\nsql_bytesout")) {
t.Errorf("expected sql_bytesout, got: %s", body)
}
if !bytes.Contains(body, []byte(`# TYPE sql_insert_count counter`)) {
t.Errorf("expected sql_insert_count, got: %s", body)
}
}
if body, err := srvtestutils.GetText(s, s.AdminURL().WithPath(apiconstants.StatusPrefix+"load").String()); err != nil {
t.Fatal(err)
Expand All @@ -161,6 +166,26 @@ func TestStatusVars(t *testing.T) {
}
}

func TestMetricsEndpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer srv.Stopper().Stop(context.Background())

s := srv.ApplicationLayer()

if body, err := srvtestutils.GetText(s, s.AdminURL().WithPath("/metrics").String()); err != nil {
t.Fatal(err)
} else {
if !bytes.Contains(body, []byte(`# TYPE sql_bytesout counter`)) {
t.Errorf("expected sql_bytesout, got: %s", body)
}
if !bytes.Contains(body, []byte(`# TYPE sql_count counter`)) {
t.Errorf("expected sql_count, got: %s", body)
}
}
}

// TestStatusVarsTxnMetrics verifies that the metrics from the /_status/vars
// endpoint for txns and the special cockroach_restart savepoint are correct.
func TestStatusVarsTxnMetrics(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/server_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ func (s *httpServer) setupRoutes(
// Exempt the 2nd health check endpoint from authentication.
// (This simply mirrors /health and exists for backward compatibility.)
s.mux.Handle(apiconstants.AdminHealth, handleRequestsUnauthenticated)
// The /_status/vars endpoint is not authenticated either. Useful for monitoring.
s.mux.Handle(apiconstants.StatusVars, http.HandlerFunc(varsHandler{metricSource, s.cfg.Settings}.handleVars))
// The /_status/vars and /metrics endpoint is not authenticated either. Useful for monitoring.
s.mux.Handle(apiconstants.StatusVars, http.HandlerFunc(varsHandler{metricSource, s.cfg.Settings, false /* useStaticLabels */}.handleVars))
s.mux.Handle(apiconstants.MetricsPath, http.HandlerFunc(varsHandler{metricSource, s.cfg.Settings, true /* useStaticLabels */}.handleVars))
// Same for /_status/load.
le, err := newLoadEndpoint(runtimeStatSampler, metricSource)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ const (

type metricMarshaler interface {
json.Marshaler
PrintAsText(io.Writer, expfmt.Format) error
PrintAsText(io.Writer, expfmt.Format, bool) error
ScrapeIntoPrometheus(pm *metric.PrometheusExporter)
}

Expand Down Expand Up @@ -2388,16 +2388,17 @@ func (s *systemStatusServer) RaftDebug(
}

type varsHandler struct {
metricSource metricMarshaler
st *cluster.Settings
metricSource metricMarshaler
st *cluster.Settings
useStaticLabels bool
}

func (h varsHandler) handleVars(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

contentType := expfmt.Negotiate(r.Header)
w.Header().Set(httputil.ContentTypeHeader, string(contentType))
err := h.metricSource.PrintAsText(w, contentType)
err := h.metricSource.PrintAsText(w, contentType, h.useStaticLabels)
if err != nil {
log.Errorf(ctx, "%v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
56 changes: 36 additions & 20 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,34 +336,50 @@ func (mr *MetricsRecorder) MarshalJSON() ([]byte, error) {
// ScrapeIntoPrometheus updates the passed-in prometheusExporter's metrics
// snapshot.
func (mr *MetricsRecorder) ScrapeIntoPrometheus(pm *metric.PrometheusExporter) {
mr.mu.RLock()
defer mr.mu.RUnlock()
if mr.mu.nodeRegistry == nil {
// We haven't yet processed initialization information; output nothing.
if log.V(1) {
log.Warning(context.TODO(), "MetricsRecorder asked to scrape metrics before NodeID allocation")
mr.ScrapeIntoPrometheusWithStaticLabels(false)(pm)
}

func (mr *MetricsRecorder) ScrapeIntoPrometheusWithStaticLabels(
useStaticLabels bool,
) func(pm *metric.PrometheusExporter) {
return func(pm *metric.PrometheusExporter) {
mr.mu.RLock()
defer mr.mu.RUnlock()

includeChildMetrics := ChildMetricsEnabled.Get(&mr.settings.SV)
includeAggregateMetrics := includeAggregateMetricsEnabled.Get(&mr.settings.SV)
scrapeOptions := []metric.ScrapeOption{
metric.WithIncludeChildMetrics(includeChildMetrics),
metric.WithIncludeAggregateMetrics(includeAggregateMetrics),
metric.WithUseStaticLabels(useStaticLabels),
}
if mr.mu.nodeRegistry == nil {
// We haven't yet processed initialization information; output nothing.
if log.V(1) {
log.Warning(context.TODO(), "MetricsRecorder asked to scrape metrics before NodeID allocation")
}
}
pm.ScrapeRegistry(mr.mu.nodeRegistry, scrapeOptions...)
pm.ScrapeRegistry(mr.mu.appRegistry, scrapeOptions...)
pm.ScrapeRegistry(mr.mu.logRegistry, scrapeOptions...)
pm.ScrapeRegistry(mr.mu.sysRegistry, scrapeOptions...)
for _, reg := range mr.mu.storeRegistries {
pm.ScrapeRegistry(reg, scrapeOptions...)
}
for _, tenantRegistry := range mr.mu.tenantRegistries {
pm.ScrapeRegistry(tenantRegistry, scrapeOptions...)
}
}
includeChildMetrics := ChildMetricsEnabled.Get(&mr.settings.SV)
includeAggregateMetrics := includeAggregateMetricsEnabled.Get(&mr.settings.SV)
pm.ScrapeRegistry(mr.mu.nodeRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics))
pm.ScrapeRegistry(mr.mu.appRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics))
pm.ScrapeRegistry(mr.mu.logRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics))
pm.ScrapeRegistry(mr.mu.sysRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics))
for _, reg := range mr.mu.storeRegistries {
pm.ScrapeRegistry(reg, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics))
}
for _, tenantRegistry := range mr.mu.tenantRegistries {
pm.ScrapeRegistry(tenantRegistry, metric.WithIncludeChildMetrics(includeChildMetrics), metric.WithIncludeAggregateMetrics(includeAggregateMetrics))
}
}

// PrintAsText writes the current metrics values as plain-text to the writer.
// We write metrics to a temporary buffer which is then copied to the writer.
// This is to avoid hanging requests from holding the lock.
func (mr *MetricsRecorder) PrintAsText(w io.Writer, contentType expfmt.Format) error {
func (mr *MetricsRecorder) PrintAsText(
w io.Writer, contentType expfmt.Format, useStaticLabels bool,
) error {
var buf bytes.Buffer
if err := mr.prometheusExporter.ScrapeAndPrintAsText(&buf, contentType, mr.ScrapeIntoPrometheus); err != nil {
if err := mr.prometheusExporter.ScrapeAndPrintAsText(&buf, contentType, mr.ScrapeIntoPrometheusWithStaticLabels(useStaticLabels)); err != nil {
return err
}
_, err := buf.WriteTo(w)
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/status/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ func TestMetricsRecorderLabels(t *testing.T) {
recorder.AddTenantRegistry(tenantID, regTenant)

buf := bytes.NewBuffer([]byte{})
err = recorder.PrintAsText(buf, expfmt.FmtText)
err = recorder.PrintAsText(buf, expfmt.FmtText, false)
require.NoError(t, err)

require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="system"} 123`)
require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="application"} 456`)

bufTenant := bytes.NewBuffer([]byte{})
err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText)
err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText, false)
require.NoError(t, err)

require.NotContains(t, bufTenant.String(), `some_metric{node_id="7",tenant="system"} 123`)
Expand All @@ -178,14 +178,14 @@ func TestMetricsRecorderLabels(t *testing.T) {
appNameContainer.Set("application2")

buf = bytes.NewBuffer([]byte{})
err = recorder.PrintAsText(buf, expfmt.FmtText)
err = recorder.PrintAsText(buf, expfmt.FmtText, false)
require.NoError(t, err)

require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="system"} 123`)
require.Contains(t, buf.String(), `some_metric{node_id="7",tenant="application2"} 456`)

bufTenant = bytes.NewBuffer([]byte{})
err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText)
err = recorderTenant.PrintAsText(bufTenant, expfmt.FmtText, false)
require.NoError(t, err)

require.NotContains(t, bufTenant.String(), `some_metric{node_id="7",tenant="system"} 123`)
Expand Down Expand Up @@ -724,7 +724,7 @@ func TestMetricsRecorder(t *testing.T) {
if _, err := recorder.MarshalJSON(); err != nil {
t.Error(err)
}
_ = recorder.PrintAsText(io.Discard, expfmt.FmtText)
_ = recorder.PrintAsText(io.Discard, expfmt.FmtText, false)
_ = recorder.GetTimeSeriesData()
wg.Done()
}()
Expand Down
43 changes: 27 additions & 16 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,28 +1070,36 @@ var (
Unit: metric.Unit_COUNT,
}
MetaSelectExecuted = metric.Metadata{
Name: "sql.select.count",
Help: "Number of SQL SELECT statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
Name: "sql.select.count",
Help: "Number of SQL SELECT statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
LabeledName: "sql.count",
StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "select"),
}
MetaUpdateExecuted = metric.Metadata{
Name: "sql.update.count",
Help: "Number of SQL UPDATE statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
Name: "sql.update.count",
Help: "Number of SQL UPDATE statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
LabeledName: "sql.count",
StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "update"),
}
MetaInsertExecuted = metric.Metadata{
Name: "sql.insert.count",
Help: "Number of SQL INSERT statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
Name: "sql.insert.count",
Help: "Number of SQL INSERT statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
LabeledName: "sql.count",
StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "insert"),
}
MetaDeleteExecuted = metric.Metadata{
Name: "sql.delete.count",
Help: "Number of SQL DELETE statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
Name: "sql.delete.count",
Help: "Number of SQL DELETE statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
LabeledName: "sql.count",
StaticLabels: metric.MakeLabelPairs(metric.LabelQueryType, "delete"),
}
MetaCRUDExecuted = metric.Metadata{
Name: "sql.crud_query.count",
Expand Down Expand Up @@ -1276,6 +1284,9 @@ func getMetricMeta(meta metric.Metadata, internal bool) metric.Metadata {
meta.Name += ".internal"
meta.Help += " (internal queries)"
meta.Measurement = "SQL Internal Statements"
if meta.LabeledName != "" {
meta.StaticLabels = append(meta.StaticLabels, metric.MakeLabelPairs(metric.LabelQueryInternal, "true")...)
}
}
return meta
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ const (
CardinalityLimit = 2000
)

// Maintaining a list of static label names here to avoid duplication and
// encourage reuse of label names across the codebase.
const (
LabelQueryType = "query_type"
LabelQueryInternal = "query_internal"
)

// Iterable provides a method for synchronized access to interior objects.
type Iterable interface {
// GetName returns the fully-qualified name of the metric.
Expand Down Expand Up @@ -1562,3 +1569,17 @@ func (hv *HistogramVec) ToPrometheusMetrics() []*prometheusgo.Metric {

return metrics
}

func MakeLabelPairs(labelNamesAndValues ...string) []*LabelPair {
if len(labelNamesAndValues)%2 != 0 {
panic("labelNamesAndValues must be a list with even length of label names and values")
}
labelPairs := make([]*LabelPair, 0, len(labelNamesAndValues)/2)
for i := 0; i < len(labelNamesAndValues); i += 2 {
labelPairs = append(labelPairs, &LabelPair{
Name: &labelNamesAndValues[i],
Value: &labelNamesAndValues[i+1],
})
}
return labelPairs
}
Loading
Loading