Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Metrics struct {
JobMetrics [jobspb.NumJobTypes]*JobTypeMetrics

RowLevelTTL metric.Struct
Changefeed metric.Struct
StreamIngest metric.Struct

Expand Down Expand Up @@ -179,6 +180,9 @@ func (Metrics) MetricStruct() {}

// init initializes the metrics for job monitoring.
func (m *Metrics) init(histogramWindowInterval time.Duration) {
if MakeRowLevelTTLMetricsHook != nil {
m.RowLevelTTL = MakeRowLevelTTLMetricsHook(histogramWindowInterval)
}
if MakeChangefeedMetricsHook != nil {
m.Changefeed = MakeChangefeedMetricsHook(histogramWindowInterval)
}
Expand Down Expand Up @@ -215,6 +219,9 @@ var MakeChangefeedMetricsHook func(time.Duration) metric.Struct
// ccl code.
var MakeStreamIngestMetricsHook func(duration time.Duration) metric.Struct

// MakeRowLevelTTLMetricsHook allows for registration of row-level TTL metrics.
var MakeRowLevelTTLMetricsHook func(time.Duration) metric.Struct

// JobTelemetryMetrics is a telemetry metrics for individual job types.
type JobTelemetryMetrics struct {
Successful telemetry.Counter
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_prometheus_client_model//go",
],
)

Expand Down
185 changes: 180 additions & 5 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package ttljob
import (
"context"
"math"
"regexp"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -31,9 +32,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
)

var (
Expand Down Expand Up @@ -72,12 +77,133 @@ type rowLevelTTLResumer struct {
st *cluster.Settings
}

// RowLevelTTLAggMetrics are the row-level TTL job agg metrics.
type RowLevelTTLAggMetrics struct {
RangeTotalDuration *aggmetric.AggHistogram
SelectDuration *aggmetric.AggHistogram
DeleteDuration *aggmetric.AggHistogram
RowSelections *aggmetric.AggCounter
RowDeletions *aggmetric.AggCounter
NumActiveRanges *aggmetric.AggGauge

mu struct {
syncutil.Mutex
m map[string]rowLevelTTLMetrics
}
}

type rowLevelTTLMetrics struct {
RangeTotalDuration *aggmetric.Histogram
SelectDuration *aggmetric.Histogram
DeleteDuration *aggmetric.Histogram
RowSelections *aggmetric.Counter
RowDeletions *aggmetric.Counter
NumActiveRanges *aggmetric.Gauge
}

// MetricStruct implements the metric.Struct interface.
func (m *RowLevelTTLAggMetrics) MetricStruct() {}

var invalidPrometheusRe = regexp.MustCompile(`[^a-zA-Z0-9_]`)

func (m *RowLevelTTLAggMetrics) loadMetrics(relation string) rowLevelTTLMetrics {
m.mu.Lock()
defer m.mu.Unlock()

relation = invalidPrometheusRe.ReplaceAllString(relation, "_")
if ret, ok := m.mu.m[relation]; ok {
return ret
}
ret := rowLevelTTLMetrics{
RangeTotalDuration: m.RangeTotalDuration.AddChild(relation),
SelectDuration: m.SelectDuration.AddChild(relation),
DeleteDuration: m.DeleteDuration.AddChild(relation),
RowSelections: m.RowSelections.AddChild(relation),
RowDeletions: m.RowDeletions.AddChild(relation),
NumActiveRanges: m.NumActiveRanges.AddChild(relation),
}
m.mu.m[relation] = ret
return ret
}

func makeRowLevelTTLAggMetrics(histogramWindowInterval time.Duration) metric.Struct {
sigFigs := 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, are we sure we want sigFigs=2? Wouldn't 1 be sufficient?

b := aggmetric.MakeBuilder("relation")
ret := &RowLevelTTLAggMetrics{
RangeTotalDuration: b.Histogram(
metric.Metadata{
Name: "jobs.row_level_ttl.range_total_duration",
Help: "Duration for processing a range during row level TTL.",
Measurement: "nanoseconds",
Unit: metric.Unit_NANOSECONDS,
MetricType: io_prometheus_client.MetricType_HISTOGRAM,
},
histogramWindowInterval,
time.Hour.Nanoseconds(),
sigFigs,
),
SelectDuration: b.Histogram(
metric.Metadata{
Name: "jobs.row_level_ttl.select_duration",
Help: "Duration for select requests during row level TTL.",
Measurement: "nanoseconds",
Unit: metric.Unit_NANOSECONDS,
MetricType: io_prometheus_client.MetricType_HISTOGRAM,
},
histogramWindowInterval,
time.Minute.Nanoseconds(),
sigFigs,
),
DeleteDuration: b.Histogram(
metric.Metadata{
Name: "jobs.row_level_ttl.delete_duration",
Help: "Duration for delete requests during row level TTL.",
Measurement: "nanoseconds",
Unit: metric.Unit_NANOSECONDS,
MetricType: io_prometheus_client.MetricType_HISTOGRAM,
},
histogramWindowInterval,
time.Minute.Nanoseconds(),
sigFigs,
),
RowSelections: b.Counter(
metric.Metadata{
Name: "jobs.row_level_ttl.rows_selected",
Help: "Number of rows selected for deletion by the row level TTL job.",
Measurement: "num_rows",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
},
),
RowDeletions: b.Counter(
metric.Metadata{
Name: "jobs.row_level_ttl.rows_deleted",
Help: "Number of rows deleted by the row level TTL job.",
Measurement: "num_rows",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
},
),
NumActiveRanges: b.Gauge(
metric.Metadata{
Name: "jobs.row_level_ttl.num_active_ranges",
Help: "Number of active workers attempting to delete for row level TTL.",
Measurement: "num_active_workers",
Unit: metric.Unit_COUNT,
},
),
}
ret.mu.m = make(map[string]rowLevelTTLMetrics)
return ret
}

var _ jobs.Resumer = (*rowLevelTTLResumer)(nil)

// Resume implements the jobs.Resumer interface.
func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
descs := p.ExtendedEvalContext().Descs
var knobs sql.TTLTestingKnobs
if ttlKnobs := p.ExecCfg().TTLTestingKnobs; ttlKnobs != nil {
knobs = *ttlKnobs
Expand All @@ -103,8 +229,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
var pkTypes []*types.T
var pkDirs []descpb.IndexDescriptor_Direction
var ranges []kv.KeyValue
var name string
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID(
desc, err := descs.GetImmutableTableByID(
ctx,
txn,
details.TableID,
Expand Down Expand Up @@ -136,17 +263,49 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
if ttl == nil {
return errors.Newf("unable to find TTL on table %s", desc.GetName())
}
ttlSettings = *ttl

ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec))
if err != nil {
return err
}
ttlSettings = *ttl

_, dbDesc, err := descs.GetImmutableDatabaseByID(
ctx,
txn,
desc.GetParentID(),
tree.CommonLookupFlags{
Required: true,
},
)
if err != nil {
return err
}
schemaDesc, err := descs.GetImmutableSchemaByID(
ctx,
txn,
desc.GetParentSchemaID(),
tree.CommonLookupFlags{
Required: true,
},
)
if err != nil {
return err
}

tn := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()),
tree.Name(schemaDesc.GetName()),
tree.Name(desc.GetName()),
)
name = tn.FQString()
return nil
}); err != nil {
return err
}

metrics := p.ExecCfg().JobRegistry.MetricsStruct().RowLevelTTL.(*RowLevelTTLAggMetrics).loadMetrics(name)

var rangeDesc roachpb.RangeDescriptor
var alloc tree.DatumAlloc
type rangeToProcess struct {
Expand All @@ -169,11 +328,13 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
for i := 0; i < rangeConcurrency; i++ {
g.GoCtx(func(ctx context.Context) error {
for r := range ch {
if err := runTTLOnRange(
start := timeutil.Now()
err := runTTLOnRange(
ctx,
p.ExecCfg(),
details,
p.ExtendedEvalContext().Descs,
metrics,
initialVersion,
r.startPK,
r.endPK,
Expand All @@ -182,7 +343,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
deleteBatchSize,
deleteRateLimiter,
*aost,
); err != nil {
)
metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
// Otherwise, the keys input will be blocked.
for r = range ch {
Expand Down Expand Up @@ -261,6 +424,7 @@ func runTTLOnRange(
execCfg *sql.ExecutorConfig,
details jobspb.RowLevelTTLDetails,
descriptors *descs.Collection,
metrics rowLevelTTLMetrics,
tableVersion descpb.DescriptorVersion,
startPK tree.Datums,
endPK tree.Datums,
Expand All @@ -269,6 +433,9 @@ func runTTLOnRange(
deleteRateLimiter *quotapool.RateLimiter,
aost tree.DTimestampTZ,
) error {
metrics.NumActiveRanges.Inc(1)
defer metrics.NumActiveRanges.Dec(1)

ie := execCfg.InternalExecutor
db := execCfg.DB

Expand Down Expand Up @@ -297,11 +464,14 @@ func runTTLOnRange(

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
start := timeutil.Now()
expiredRowsPKs, err = selectBuilder.run(ctx, ie, txn)
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return err
}); err != nil {
return errors.Wrapf(err, "error selecting rows to delete")
}
metrics.RowSelections.Inc(int64(len(expiredRowsPKs)))

// Step 2. Delete the rows which have expired.

Expand Down Expand Up @@ -341,10 +511,14 @@ func runTTLOnRange(
defer tokens.Consume()

// TODO(#75428): configure admission priority
return deleteBuilder.run(ctx, ie, txn, deleteBatch)
start := timeutil.Now()
err = deleteBuilder.run(ctx, ie, txn, deleteBatch)
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return err
}); err != nil {
return errors.Wrapf(err, "error during row deletion")
}
metrics.RowDeletions.Inc(int64(len(deleteBatch)))
}

// Step 3. Early exit if necessary.
Expand Down Expand Up @@ -408,4 +582,5 @@ func init() {
st: settings,
}
})
jobs.MakeRowLevelTTLMetricsHook = makeRowLevelTTLAggMetrics
}
35 changes: 35 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,41 @@ var charts = []sectionDescription{
},
},
},
{
Organization: [][]string{{SQLLayer, "SQL", "Row Level TTL"}},
Charts: []chartDescription{
{
Title: "Active Range Deletes",
Metrics: []string{
"jobs.row_level_ttl.num_active_ranges",
},
AxisLabel: "Num Running",
},
{
Title: "Processing Count",
Metrics: []string{
"jobs.row_level_ttl.rows_selected",
"jobs.row_level_ttl.rows_deleted",
},
AxisLabel: "Count",
},
{
Title: "Processing Latency",
Metrics: []string{
"jobs.row_level_ttl.select_duration",
"jobs.row_level_ttl.delete_duration",
},
AxisLabel: "Latency (nanoseconds)",
},
{
Title: "Net Processing Latency",
Metrics: []string{
"jobs.row_level_ttl.range_total_duration",
},
AxisLabel: "Latency (nanoseconds)",
},
},
},
{
Organization: [][]string{{SQLLayer, "SQL", "Feature Flag"}},
Charts: []chartDescription{
Expand Down