Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59220: sql,gcjob: set RunningStatus when GC job performs r=postamar a=postamar

Previously, the RunningStatus of a GC job was always set to "waiting for
GC TTL". This patch sets it to a more meaningful value when the GC job
is actually performing garbage collection.

Release note (sql change): SHOW JOBS now displays a meaningful value
in the running_status column for GC jobs which are actually performing
garbage collection, as opposed to waiting on a timer.

60484: execstats: include local streams in the physical plan r=RaduBerinde a=asubiotto

Fixes #60463 

Look at commits for individual details. Ignored errors caused this bug to go unnoticed, so the first commit adds a panic when the crdb test flag is specified. This would've caught the issue easily.

Release note: None

60511: sql: Added telemetry for pg_catalog and information_schema tables access r=solongordon a=mnovelodou

Previously, when querying a not implemented table it was tracket by telemetry
This was inadequate because when adding these tables as empty tables we no longer capture the access of these tables that are empty and unsupported
To address this, this patch adds telemetry to track queries at pg_catalog and
information_schema existing tables

Release note (sql change): Added telemetry to track usage of pg_catalog and
information_schema tables

Fixes #58732

Co-authored-by: Marius Posta <marius@cockroachlabs.com>
Co-authored-by: Alfonso Subiotto Marques <alfonso@cockroachlabs.com>
Co-authored-by: MiguelNovelo <miguel.novelo@digitalonus.com>
  • Loading branch information
4 people committed Feb 16, 2021
4 parents 8f5dc42 + dc23266 + 9a31d0a + bd760b0 commit 314afa3
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 24 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,8 @@ OPTGEN_TARGETS = \
test-targets := \
check test testshort testslow testrace testraceslow testbuild \
stress stressrace \
roachprod-stress roachprod-stressrace
roachprod-stress roachprod-stressrace \
testlogic testbaselogic testccllogic testoptlogic

go-targets-ccl := \
$(COCKROACH) \
Expand Down
25 changes: 10 additions & 15 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta
a.processorStats[execinfrapb.ProcessorID(proc.ProcessorID)] = &processorStats{nodeID: nodeID}
for _, output := range proc.Output {
for _, stream := range output.Streams {
if stream.Type == execinfrapb.StreamEndpointSpec_REMOTE {
a.streamStats[stream.StreamID] = &streamStats{
originNodeID: nodeID,
destinationNodeID: stream.TargetNodeID,
}
a.streamStats[stream.StreamID] = &streamStats{
originNodeID: nodeID,
destinationNodeID: stream.TargetNodeID,
}
}
}
Expand Down Expand Up @@ -261,14 +259,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
}

// Process query level stats.
a.queryLevelStats = QueryLevelStats{
NetworkBytesSent: int64(0),
MaxMemUsage: int64(0),
KVBytesRead: int64(0),
KVRowsRead: int64(0),
KVTime: time.Duration(0),
NetworkMessages: int64(0),
}
a.queryLevelStats = QueryLevelStats{}

for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode {
a.queryLevelStats.NetworkBytesSent += bytesSentByNode
Expand Down Expand Up @@ -317,7 +308,9 @@ func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, er
if v.NetTx.BytesSent.HasValue() {
return int64(v.NetTx.BytesSent.Value()), nil
}
return 0, errors.Errorf("could not get network bytes; neither BytesReceived and BytesSent is set")
// If neither BytesReceived or BytesSent is set, this ComponentStat belongs to
// a local component, e.g. a local hashrouter output.
return 0, nil
}

func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (int64, error) {
Expand All @@ -335,7 +328,9 @@ func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (in
if v.NetTx.MessagesSent.HasValue() {
return int64(v.NetTx.MessagesSent.Value()), nil
}
return 0, errors.Errorf("could not get network messages; neither MessagesReceived and MessagesSent is set")
// If neither BytesReceived or BytesSent is set, this ComponentStat belongs to
// a local component, e.g. a local hashrouter output.
return 0, nil
}

// GetNodeLevelStats returns the node level stats calculated and stored in the
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})

if expired {
// Some elements have been marked as DELETING so save the progress.
persistProgress(ctx, execCfg, r.jobID, progress)
persistProgress(ctx, execCfg, r.jobID, progress, runningStatusGC(progress))
if fn := execCfg.GCJobTestingKnobs.RunBeforePerformGC; fn != nil {
if err := fn(r.jobID); err != nil {
return err
}
}
if err := performGC(ctx, execCfg, details, progress); err != nil {
return err
}
persistProgress(ctx, execCfg, r.jobID, progress)
persistProgress(ctx, execCfg, r.jobID, progress, sql.RunningStatusWaitingGC)

// Trigger immediate re-run in case of more expired elements.
timerDuration = 0
Expand Down
81 changes: 79 additions & 2 deletions pkg/sql/gcjob/gc_job_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package gcjob

import (
"context"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -142,6 +144,72 @@ func isDoneGC(progress *jobspb.SchemaChangeGCProgress) bool {
return true
}

// runningStatusGC generates a RunningStatus string which always remains under
// a certain size, given any progress struct.
func runningStatusGC(progress *jobspb.SchemaChangeGCProgress) jobs.RunningStatus {
tableIDs := make([]string, 0, len(progress.Tables))
indexIDs := make([]string, 0, len(progress.Indexes))
for _, table := range progress.Tables {
if table.Status == jobspb.SchemaChangeGCProgress_DELETING {
tableIDs = append(tableIDs, strconv.Itoa(int(table.ID)))
}
}
for _, index := range progress.Indexes {
if index.Status == jobspb.SchemaChangeGCProgress_DELETING {
indexIDs = append(indexIDs, strconv.Itoa(int(index.IndexID)))
}
}

var b strings.Builder
b.WriteString("performing garbage collection on")
var flag bool
if progress.Tenant != nil && progress.Tenant.Status == jobspb.SchemaChangeGCProgress_DELETING {
b.WriteString(" tenant")
flag = true
}

for _, s := range []struct {
ids []string
singular string
plural string
}{
{tableIDs, "table", "tables"},
{indexIDs, "index", "indexes"},
} {
if len(s.ids) == 0 {
continue
}
if flag {
b.WriteRune(';')
}
b.WriteRune(' ')
switch len(s.ids) {
case 1:
// one id, e.g. "table 123"
b.WriteString(s.singular)
b.WriteRune(' ')
b.WriteString(s.ids[0])
case 2, 3, 4, 5:
// a few ids, e.g. "tables 123, 456, 789"
b.WriteString(s.plural)
b.WriteRune(' ')
b.WriteString(strings.Join(s.ids, ", "))
default:
// too many ids to print, e.g. "25 tables"
b.WriteString(strconv.Itoa(len(s.ids)))
b.WriteRune(' ')
b.WriteString(s.plural)
}
flag = true
}

if !flag {
// `flag` not set implies we're not GCing anything.
return sql.RunningStatusWaitingGC
}
return jobs.RunningStatus(b.String())
}

// getAllTablesWaitingForGC returns a slice with all of the table IDs which have
// note yet been been GC'd. This is used to determine which tables' statuses
// need to be updated.
Expand Down Expand Up @@ -178,12 +246,14 @@ func validateDetails(details *jobspb.SchemaChangeGCDetails) error {
return nil
}

// persistProgress sets the current state of the progress back on the job.
// persistProgress sets the current state of the progress and running status
// back on the job.
func persistProgress(
ctx context.Context,
execCfg *sql.ExecutorConfig,
jobID int64,
progress *jobspb.SchemaChangeGCProgress,
runningStatus jobs.RunningStatus,
) {
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
job, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, txn)
Expand All @@ -194,9 +264,16 @@ func persistProgress(
return err
}
log.Infof(ctx, "updated progress payload: %+v", progress)
err = job.RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
return runningStatus, nil
})
if err != nil {
return err
}
log.Infof(ctx, "updated running status: %+v", runningStatus)
return nil
}); err != nil {
log.Warningf(ctx, "failed to update job's progress payload err: %+v", err)
log.Warningf(ctx, "failed to update job's progress payload or running status err: %+v", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func refreshTables(
}

if expired || haveAnyMissing {
persistProgress(ctx, execCfg, jobID, progress)
persistProgress(ctx, execCfg, jobID, progress, sql.RunningStatusWaitingGC)
}

return expired, earliestDeadline
Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ func TestSchemaChangeGCJob(t *testing.T) {

for _, dropItem := range []DropItem{INDEX, TABLE, DATABASE} {
for _, ttlTime := range []TTLTime{PAST, SOON, FUTURE} {
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
params := base.TestServerArgs{}
blockGC := make(chan struct{}, 1)
params.Knobs.GCJob = &sql.GCJobTestingKnobs{
RunBeforePerformGC: func(_ int64) error {
<-blockGC
return nil
},
}
s, db, kvDB := serverutils.StartServer(t, params)
ctx := context.Background()
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
Expand Down Expand Up @@ -107,6 +115,7 @@ func TestSchemaChangeGCJob(t *testing.T) {
dropTime = 1
}
var details jobspb.SchemaChangeGCDetails
var expectedRunningStatus string
switch dropItem {
case INDEX:
details = jobspb.SchemaChangeGCDetails{
Expand All @@ -122,6 +131,7 @@ func TestSchemaChangeGCJob(t *testing.T) {
myTableDesc.GCMutations = append(myTableDesc.GCMutations, descpb.TableDescriptor_GCDescriptorMutation{
IndexID: descpb.IndexID(2),
})
expectedRunningStatus = "performing garbage collection on index 2"
case TABLE:
details = jobspb.SchemaChangeGCDetails{
Tables: []jobspb.SchemaChangeGCDetails_DroppedID{
Expand All @@ -133,6 +143,7 @@ func TestSchemaChangeGCJob(t *testing.T) {
}
myTableDesc.State = descpb.DescriptorState_DROP
myTableDesc.DropTime = dropTime
expectedRunningStatus = fmt.Sprintf("performing garbage collection on table %d", myTableID)
case DATABASE:
details = jobspb.SchemaChangeGCDetails{
Tables: []jobspb.SchemaChangeGCDetails_DroppedID{
Expand All @@ -151,6 +162,7 @@ func TestSchemaChangeGCJob(t *testing.T) {
myTableDesc.DropTime = dropTime
myOtherTableDesc.State = descpb.DescriptorState_DROP
myOtherTableDesc.DropTime = dropTime
expectedRunningStatus = fmt.Sprintf("performing garbage collection on tables %d, %d", myTableID, myOtherTableID)
}

if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down Expand Up @@ -196,6 +208,15 @@ func TestSchemaChangeGCJob(t *testing.T) {
t.Fatal(err)
}

if ttlTime != FUTURE {
// Check that the job eventually blocks right before performing GC, due to the testing knob.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf("SELECT status, running_status FROM [SHOW JOBS] WHERE job_id = %s", jobIDStr),
[][]string{{"running", expectedRunningStatus}})
}
blockGC <- struct{}{}

if ttlTime == FUTURE {
time.Sleep(500 * time.Millisecond)
} else {
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
Expand Down Expand Up @@ -80,6 +81,9 @@ type instrumentationHelper struct {
// collectExecStats is set when we are collecting execution statistics for a
// statement.
collectExecStats bool
// startedExplicitTrace is set to true when the instrumentation helper started
// an explicit trace to collect execution stats.
startedExplicitTrace bool

// discardRows is set if we want to discard any results rather than sending
// them back to the client. Used for testing/benchmarking. Note that the
Expand Down Expand Up @@ -178,6 +182,7 @@ func (ih *instrumentationHelper) Setup(
// If we need to collect stats, create a non-verbose child span. Stats
// will be added as structured metadata and processed in Finish.
ih.origCtx = ctx
ih.startedExplicitTrace = true
newCtx, ih.sp = tracing.EnsureChildSpan(ctx, cfg.AmbientCtx.Tracer, "traced statement")
return newCtx, true
}
Expand Down Expand Up @@ -231,7 +236,13 @@ func (ih *instrumentationHelper) Finish(
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowsMetadata)
if err != nil {
log.VInfof(ctx, 1, "error getting query level stats for statement %s: %+v", ast, err)
const msg = "error getting query level stats for statement: %s: %+v"
if util.CrdbTestBuild && ih.startedExplicitTrace {
// A panic is much more visible in tests than an error.
// TODO(asubiotto): Remove ih.startedExplicitTrace. See #60609.
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
} else {
stmtStats.mu.Lock()
stmtStats.mu.data.ExecStatCollectionCount++
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,8 @@ func CreateGCJobRecord(
// Note that this is defined here for testing purposes to avoid cyclic
// dependencies.
type GCJobTestingKnobs struct {
RunBeforeResume func(jobID int64) error
RunBeforeResume func(jobID int64) error
RunBeforePerformGC func(jobID int64) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqltelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"session.go",
"show.go",
"user_defined_schema.go",
"virtual_schema.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry",
visibility = ["//visibility:public"],
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/sqltelemetry/virtual_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sqltelemetry

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
)

const getVirtualSchemaEntry = "sql.schema.get_virtual_table.%s.%s"

// trackedSchemas have the schemas that we track by telemetry.
var trackedSchemas = map[string]struct{}{
"pg_catalog": {},
"information_schema": {},
}

// IncrementGetVirtualTableEntry is used to increment telemetry counter for any
// use of tracked schemas tables.
func IncrementGetVirtualTableEntry(schema, tableName string) {
if _, ok := trackedSchemas[schema]; ok {
telemetry.Inc(telemetry.GetCounter(fmt.Sprintf(getVirtualSchemaEntry, schema, tableName)))
}
}
22 changes: 22 additions & 0 deletions pkg/sql/testdata/telemetry/virtual_schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
feature-allowlist
sql.schema.get_virtual_table.*
----

feature-usage
SELECT * FROM pg_catalog.pg_cast
----
sql.schema.get_virtual_table.pg_catalog.pg_cast

feature-usage
SELECT * FROM information_schema.schema_privileges LIMIT 1
----
sql.schema.get_virtual_table.information_schema.schema_privileges

feature-usage
SELECT * FROM crdb_internal.databases LIMIT 1
----

feature-usage
SELECT * FROM pg_catalog.pg_xxx
----
error: pq: relation "pg_catalog.pg_xxx" does not exist
Loading

0 comments on commit 314afa3

Please sign in to comment.