Skip to content

Commit

Permalink
sql,jobs: create SQL Stats Compaction Job and resumer
Browse files Browse the repository at this point in the history
This commit introduces the SQL Stats Compaction job type
and a barebones implementation of the SQL Stats compaction.

Release note: None
  • Loading branch information
Azhng committed Aug 17, 2021
1 parent 0dfd3fd commit ea43c86
Show file tree
Hide file tree
Showing 17 changed files with 1,430 additions and 361 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ sql.stats.flush.enabled boolean true if set, SQL execution statistics are period
sql.stats.flush.interval duration 1h0m0s the interval at which SQL execution statistics are flushed to disk
sql.stats.histogram_collection.enabled boolean true histogram collection mode
sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode
sql.stats.persisted_rows.max integer 10000 maximum number of rows of statement and transaction statistics that will be persisted in the system tables
sql.stats.post_events.enabled boolean false if set, an event is logged for every CREATE STATISTICS job
sql.temp_object_cleaner.cleanup_interval duration 30m0s how often to clean up orphaned temporary objects
sql.trace.log_statement_execute boolean false set to true to enable logging of executed statements
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
<tr><td><code>sql.stats.flush.interval</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the interval at which SQL execution statistics are flushed to disk</td></tr>
<tr><td><code>sql.stats.histogram_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>histogram collection mode</td></tr>
<tr><td><code>sql.stats.multi_column_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>multi-column statistics collection mode</td></tr>
<tr><td><code>sql.stats.persisted_rows.max</code></td><td>integer</td><td><code>10000</code></td><td>maximum number of rows of statement and transaction statistics that will be persisted in the system tables</td></tr>
<tr><td><code>sql.stats.post_events.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, an event is logged for every CREATE STATISTICS job</td></tr>
<tr><td><code>sql.temp_object_cleaner.cleanup_interval</code></td><td>duration</td><td><code>30m0s</code></td><td>how often to clean up orphaned temporary objects</td></tr>
<tr><td><code>sql.trace.log_statement_execute</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable logging of executed statements</td></tr>
Expand Down
1,110 changes: 757 additions & 353 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@ message MigrationProgress {

}

message SQLStatsCompactionDetails {
}

message SQLStatsCompactionProgress {

}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -716,10 +723,11 @@ message Payload {
StreamIngestionDetails streamIngestion = 23;
NewSchemaChangeDetails newSchemaChange = 24;
MigrationDetails migration = 25;
SQLStatsCompactionDetails sqlStatsCompaction = 27;
}
reserved 26;

// NEXT ID: 27.
// NEXT ID: 28.
}

message Progress {
Expand All @@ -742,6 +750,7 @@ message Progress {
StreamIngestionProgress streamIngest = 18;
NewSchemaChangeProgress newSchemaChange = 19;
MigrationProgress migration = 20;
SQLStatsCompactionProgress sqlStatsCompaction = 22;
}

uint64 trace_id = 21 [(gogoproto.customname) = "TraceID"];
Expand All @@ -766,6 +775,7 @@ enum Type {
STREAM_INGESTION = 10 [(gogoproto.enumvalue_customname) = "TypeStreamIngestion"];
NEW_SCHEMA_CHANGE = 11 [(gogoproto.enumvalue_customname) = "TypeNewSchemaChange"];
MIGRATION = 12 [(gogoproto.enumvalue_customname) = "TypeMigration"];
SQL_STATS_COMPACTION = 13 [(gogoproto.enumvalue_customname) = "TypeSQLStatsCompaction"];
}

message Job {
Expand Down
12 changes: 11 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func DetailsType(d isPayload_Details) Type {
return TypeNewSchemaChange
case *Payload_Migration:
return TypeMigration
case *Payload_SqlStatsCompaction:
return TypeSQLStatsCompaction
default:
panic(errors.AssertionFailedf("Payload.Type called on a payload with an unknown details type: %T", d))
}
Expand Down Expand Up @@ -128,6 +130,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_NewSchemaChange{NewSchemaChange: &d}
case MigrationProgress:
return &Progress_Migration{Migration: &d}
case SQLStatsCompactionProgress:
return &Progress_SqlStatsCompaction{SqlStatsCompaction: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -159,6 +163,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.NewSchemaChange
case *Payload_Migration:
return *d.Migration
case *Payload_SqlStatsCompaction:
return *d.SqlStatsCompaction
default:
return nil
}
Expand Down Expand Up @@ -190,6 +196,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.NewSchemaChange
case *Progress_Migration:
return *d.Migration
case *Progress_SqlStatsCompaction:
return *d.SqlStatsCompaction
default:
return nil
}
Expand Down Expand Up @@ -234,6 +242,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_NewSchemaChange{NewSchemaChange: &d}
case MigrationDetails:
return &Payload_Migration{Migration: &d}
case SQLStatsCompactionDetails:
return &Payload_SqlStatsCompaction{SqlStatsCompaction: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -269,7 +279,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 13
const NumJobTypes = 14

// MarshalJSONPB redacts sensitive sink URI parameters from ChangefeedDetails.
func (p ChangefeedDetails) MarshalJSONPB(x *jsonpb.Marshaler) ([]byte, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"comment_on_database.go",
"comment_on_index.go",
"comment_on_table.go",
"compact_sql_stats.go",
"conn_executor.go",
"conn_executor_exec.go",
"conn_executor_prepare.go",
Expand Down
64 changes: 64 additions & 0 deletions pkg/sql/compact_sql_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type sqlStatsCompactionResumer struct {
job *jobs.Job
st *cluster.Settings
}

var _ jobs.Resumer = &sqlStatsCompactionResumer{}

// Resume implements the jobs.Resumer interface.
func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interface{}) error {
log.Infof(ctx, "starting sql stats compaction job")
p := execCtx.(JobExecContext)
ie := p.ExecCfg().InternalExecutor
db := p.ExecCfg().DB

// We check for concurrently running SQL Stats compaction jobs. We only allow
// one job to be running at the same time.
if err := persistedsqlstats.CheckExistingCompactionJob(ctx, r.job, ie, nil /* txn */); err != nil {
if errors.Is(err, persistedsqlstats.ErrConcurrentSQLStatsCompaction) {
log.Infof(ctx, "exiting due to a running sql stats compaction job")
return nil
}
return err
}

statsCompactor := persistedsqlstats.NewStatsCompactor(r.st, ie, db, p.ExecCfg().SQLStatsTestingKnobs)
return statsCompactor.DeleteOldestEntries(ctx)
}

// OnFailOrCancel implements the jobs.Resumer interface.
func (r *sqlStatsCompactionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return nil
}

func init() {
jobs.RegisterConstructor(jobspb.TypeSQLStatsCompaction, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &sqlStatsCompactionResumer{
job: job,
st: settings,
}
})
}
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ type Server struct {

// sqlStats tracks per-application statistics for all applications on each
// node. Newly collected statistics flow into sqlStats.
sqlStats sqlstats.Provider
sqlStats *persistedsqlstats.PersistedSQLStats

// sqlStatsController is the control-plane interface for sqlStats.
sqlStatsController *sslocal.Controller
Expand Down Expand Up @@ -353,6 +353,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
InternalExecutor: &sqlStatsInternalExecutor,
KvDB: cfg.DB,
SQLIDContainer: cfg.NodeID,
JobRegistry: s.cfg.JobRegistry,
Knobs: cfg.SQLStatsTestingKnobs,
FlushCounter: metrics.StatsMetrics.SQLStatsFlushStarted,
FailureCounter: metrics.StatsMetrics.SQLStatsFlushFailure,
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go_library(
srcs = [
"cluster_settings.go",
"combined_iterator.go",
"compaction_exec.go",
"compaction_scheduling.go",
"flush.go",
"mem_iterator.go",
"provider.go",
Expand All @@ -16,6 +18,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
"//pkg/security",
Expand All @@ -41,22 +45,31 @@ go_library(
go_test(
name = "persistedsqlstats_test",
srcs = [
"compaction_test.go",
"flush_test.go",
"main_test.go",
"reader_test.go",
],
deps = [
":persistedsqlstats",
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlutil",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,12 @@ var SQLStatsFlushJitter = settings.RegisterFloatSetting(
return nil
},
)

// SQLStatsMaxPersistedRows specifies maximum number of rows that will be
// retained in system.statement_statistics and system.transaction_statistics.
var SQLStatsMaxPersistedRows = settings.RegisterIntSetting(
"sql.stats.persisted_rows.max",
"maximum number of rows of statement and transaction"+
" statistics that will be persisted in the system tables",
10000, /* defaultValue */
).WithPublic()
Loading

0 comments on commit ea43c86

Please sign in to comment.