Skip to content

Commit

Permalink
jobs: add job metrics per-type to track success, failure, and cancel
Browse files Browse the repository at this point in the history
Fixes: #59711

Previously, there were only over all counters tracking how many
jobs were completed, cancelled, or failed. This was inadequate
because it didn't make it easy to tell in aggregate what job
types they were. To address this, this patch will add counters
for different job types for tracking success, failure, and
cancellation.

Release justification: Low risk change only adding a metric inside
the crdb_internal.feature_usage table
Release note: None
  • Loading branch information
fqazi committed Mar 4, 2021
1 parent c140198 commit a5df1c8
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/restore-permissions
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,17 @@ exec-sql server=s3 user=testuser
RESTORE TABLE d.t FROM 'nodelocal://0/test/'
----
pq: only users with the admin role are allowed to RESTORE from the specified nodelocal URI


# Validate that the backup / restore job metrics
query-sql
SELECT feature_name FROM crdb_internal.feature_usage
WHERE (feature_name LIKE 'sql.schema.job.%successful' OR
feature_name LIKE 'sql.schema.job.%failed' OR
feature_name LIKE 'sql.schema.job.%canceled') AND
usage_count > 1 and usage_count < 100
ORDER BY feature_name DESC
----
sql.schema.job.schema_change_successful
sql.schema.job.restore_successful
sql.schema.job.backup_successful
101 changes: 101 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -571,6 +573,7 @@ func (j *Job) canceled(
}
ju.UpdateStatus(StatusCanceled)
md.Payload.FinishedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
IncrementJobCanceled(md)
ju.UpdatePayload(md.Payload)
return nil
})
Expand All @@ -594,6 +597,7 @@ func (j *Job) failed(
ju.UpdateStatus(StatusFailed)
md.Payload.Error = err.Error()
md.Payload.FinishedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
IncrementJobFailed(md)
ju.UpdatePayload(md.Payload)
return nil
})
Expand All @@ -618,6 +622,7 @@ func (j *Job) succeeded(
}
ju.UpdateStatus(StatusSucceeded)
md.Payload.FinishedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
IncrementJobSuccess(md)
ju.UpdatePayload(md.Payload)
md.Progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 1.0,
Expand Down Expand Up @@ -928,3 +933,99 @@ func (sj *StartableJob) Cancel(ctx context.Context) error {
defer sj.registry.unregister(sj.ID())
return sj.registry.CancelRequested(ctx, nil, sj.ID())
}

// IncrementJobCanceled increments the canceled job counters based
// on the type of job.
func IncrementJobCanceled(md JobMetadata) {
if md.Payload != nil {
if _, ok := md.Payload.Details.(*jobspb.Payload_SchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_SchemaChangeGC); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaGCCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_TypeSchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_NewSchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Backup); ok {
telemetry.Inc(sqltelemetry.JobsForBackupCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Restore); ok {
telemetry.Inc(sqltelemetry.JobsForRestoreCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Import); ok {
telemetry.Inc(sqltelemetry.JobsForImportCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Changefeed); ok {
telemetry.Inc(sqltelemetry.JobsForChangeFeedCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_CreateStats); ok {
telemetry.Inc(sqltelemetry.JobsForCreateStatsCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_StreamIngestion); ok {
telemetry.Inc(sqltelemetry.JobsForStreamIngestionCanceled)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Migration); ok {
telemetry.Inc(sqltelemetry.JobsForMigrationCanceled)
} else {
panic("Unknown job type")
}
}
}

// IncrementJobSuccess increments the successful job counters based
// on the type of job.
func IncrementJobSuccess(md JobMetadata) {
if md.Payload != nil {
if _, ok := md.Payload.Details.(*jobspb.Payload_SchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_SchemaChangeGC); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaGCSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_TypeSchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_NewSchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Backup); ok {
telemetry.Inc(sqltelemetry.JobsForBackupSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Restore); ok {
telemetry.Inc(sqltelemetry.JobsForRestoreSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Import); ok {
telemetry.Inc(sqltelemetry.JobsForImportSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Changefeed); ok {
telemetry.Inc(sqltelemetry.JobsForChangeFeedSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_CreateStats); ok {
telemetry.Inc(sqltelemetry.JobsForCreateStatsSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_StreamIngestion); ok {
telemetry.Inc(sqltelemetry.JobsForStreamIngestionSuccess)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Migration); ok {
telemetry.Inc(sqltelemetry.JobsForMigrationSuccess)
} else {
panic("Unknown job type")
}
}
}

// IncrementJobFailed increments the failed job counters based
// on the type of job.
func IncrementJobFailed(md JobMetadata) {
if md.Payload != nil {
if _, ok := md.Payload.Details.(*jobspb.Payload_SchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_SchemaChangeGC); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaGCFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_TypeSchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_NewSchemaChange); ok {
telemetry.Inc(sqltelemetry.JobsForSchemaFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Backup); ok {
telemetry.Inc(sqltelemetry.JobsForBackupFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Restore); ok {
telemetry.Inc(sqltelemetry.JobsForRestoreFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Import); ok {
telemetry.Inc(sqltelemetry.JobsForImportFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Changefeed); ok {
telemetry.Inc(sqltelemetry.JobsForChangeFeedFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_CreateStats); ok {
telemetry.Inc(sqltelemetry.JobsForCreateStatsFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_StreamIngestion); ok {
telemetry.Inc(sqltelemetry.JobsForStreamIngestionFailed)
} else if _, ok := md.Payload.Details.(*jobspb.Payload_Migration); ok {
telemetry.Inc(sqltelemetry.JobsForMigrationFailed)
} else {
panic("Unknown job type")
}
}
}
11 changes: 11 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -1688,3 +1688,14 @@ SELECT count(descriptor_id)
WHERE descriptor_id = ('test.public.t45985'::REGCLASS)::INT8;
----
0

# Validate that the schema_change_successful metric
query T
SELECT feature_name FROM crdb_internal.feature_usage
WHERE (feature_name LIKE 'sql.schema.job.%successful' OR
feature_name LIKE 'sql.schema.job.%failed' OR
feature_name LIKE 'sql.schema.job.%canceled')
ORDER BY feature_name DESC
----
sql.schema.job.schema_change_successful
sql.schema.job.schema_change_failed
11 changes: 11 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/distsql_stats
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,14 @@ SHOW STATISTICS USING JSON FOR TABLE greeting_stats

statement ok
ALTER TABLE greeting_stats INJECT STATISTICS '$stats'

# Validate that the schema_change_successful metric
query T
SELECT feature_name FROM crdb_internal.feature_usage
WHERE (feature_name LIKE 'sql.schema.job.%successful' OR
feature_name LIKE 'sql.schema.job.%failed' OR
feature_name LIKE 'sql.schema.job.%canceled')
ORDER BY feature_name DESC
----
sql.schema.job.schema_change_successful
sql.schema.job.createstats_successful
11 changes: 11 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/jobs
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,14 @@ user testuser
# testuser should no longer have the ability to control jobs.
statement error pq: user testuser does not have CONTROLJOB privilege
PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC')

# Validate that the schema_change_successful metric
query T
SELECT feature_name FROM crdb_internal.feature_usage
WHERE (feature_name LIKE 'sql.schema.job.%successful' OR
feature_name LIKE 'sql.schema.job.%failed' OR
feature_name LIKE 'sql.schema.job.%canceled') AND
usage_count > 1 and usage_count < 100
ORDER BY feature_name DESC
----
sql.schema.job.schema_change_successful
108 changes: 108 additions & 0 deletions pkg/sql/sqltelemetry/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,111 @@ var CreateUnloggedTableCounter = telemetry.GetCounterOnce("sql.schema.create_unl
// SchemaRefreshMaterializedView is to be incremented every time a materialized
// view is refreshed.
var SchemaRefreshMaterializedView = telemetry.GetCounterOnce("sql.schema.refresh_materialized_view")

// JobsForSchemaSuccess is a counter that incremented whenever a schema change
// job completes successfully.
var JobsForSchemaSuccess = telemetry.GetCounterOnce("sql.schema.job.schema_change_successful")

// JobsForSchemaFailed is a counter that incremented whenever a schema change
// job completes fails.
var JobsForSchemaFailed = telemetry.GetCounterOnce("sql.schema.job.schema_change_failed")

// JobsForSchemaCanceled is a counter that incremented whenever a schema change
// job gets canceled.
var JobsForSchemaCanceled = telemetry.GetCounterOnce("sql.schema.job.schema_change_canceled")

// JobsForSchemaGCSuccess is a counter that is incremented whenever a schema GC
// job completes successfully.
var JobsForSchemaGCSuccess = telemetry.GetCounterOnce("sql.schema.job.schemagc_change_successful")

// JobsForSchemaGCFailed is a counter that is incremented whenever a schema GC
// job fails.
var JobsForSchemaGCFailed = telemetry.GetCounterOnce("sql.schema.job.schemagc_change_failed")

// JobsForSchemaGCCanceled is a counter that is incremented whenever a schema GC
// job gets canceled.
var JobsForSchemaGCCanceled = telemetry.GetCounterOnce("sql.schema.job.schemagc_change_canceled")

// JobsForBackupSuccess is a counter that is incremented whenever a backup
// job completes successfully.
var JobsForBackupSuccess = telemetry.GetCounterOnce("sql.schema.job.backup_successful")

// JobsForBackupFailed is a counter that is incremented whenever a backup
// job fails.
var JobsForBackupFailed = telemetry.GetCounterOnce("sql.schema.job.backup_failed")

// JobsForBackupCanceled is a counter that is incremented whenever a backup
// job gets canceled.
var JobsForBackupCanceled = telemetry.GetCounterOnce("sql.schema.job.backup_canceled")

// JobsForRestoreSuccess is a counter that is incremented whenever a restore
// job completes successfully.
var JobsForRestoreSuccess = telemetry.GetCounterOnce("sql.schema.job.restore_successful")

// JobsForRestoreFailed is a counter that is incremented whenever a restore
// job fails.
var JobsForRestoreFailed = telemetry.GetCounterOnce("sql.schema.job.restore_failed")

// JobsForRestoreCanceled is a counter that is incremented whenever a restore
// job gets canceled.
var JobsForRestoreCanceled = telemetry.GetCounterOnce("sql.schema.job.restore_canceled")

// JobsForImportSuccess is a counter that is incremented whenever an import
// job completes successfully.
var JobsForImportSuccess = telemetry.GetCounterOnce("sql.schema.job.import_successful")

// JobsForImportFailed is a counter that is incremented whenever an import
// job fails.
var JobsForImportFailed = telemetry.GetCounterOnce("sql.schema.job.import_failed")

// JobsForImportCanceled is a counter that is incremented whenever an import
// job gets canceled.
var JobsForImportCanceled = telemetry.GetCounterOnce("sql.schema.job.import_canceled")

// JobsForChangeFeedSuccess is a counter that is incremented whenever a change feed
// job completes successfully.
var JobsForChangeFeedSuccess = telemetry.GetCounterOnce("sql.schema.job.changedfeed_successful")

// JobsForChangeFeedFailed is a counter that is incremented whenever a change feed
// job fails.
var JobsForChangeFeedFailed = telemetry.GetCounterOnce("sql.schema.job.changefeed_failed")

// JobsForChangeFeedCanceled is a counter that is incremented whenever a change feed
// job gets canceled.
var JobsForChangeFeedCanceled = telemetry.GetCounterOnce("sql.schema.job.changedfeed_canceled")

// JobsForCreateStatsSuccess is a counter that is incremented whenever a create stats
// job completes successfully.
var JobsForCreateStatsSuccess = telemetry.GetCounterOnce("sql.schema.job.createstats_successful")

// JobsForCreateStatsFailed is a counter that is incremented whenever a create stats
// job completes fails.
var JobsForCreateStatsFailed = telemetry.GetCounterOnce("sql.schema.job.createstats_failed")

// JobsForCreateStatsCanceled is a counter that is incremented whenever a create stats
// job gets canceled.
var JobsForCreateStatsCanceled = telemetry.GetCounterOnce("sql.schema.job.createstats_canceled")

// JobsForStreamIngestionSuccess is a counter that is incremented whenever a stream ingestion
// job completes successfully.
var JobsForStreamIngestionSuccess = telemetry.GetCounterOnce("sql.schema.job.streamingestion_successful")

// JobsForStreamIngestionFailed is a counter that is incremented whenever a stream ingestion
// job fails.
var JobsForStreamIngestionFailed = telemetry.GetCounterOnce("sql.schema.job.streamingestion_failed")

// JobsForStreamIngestionCanceled is a counter that is incremented whenever a stream ingestion
// job gets canceled.
var JobsForStreamIngestionCanceled = telemetry.GetCounterOnce("sql.schema.job.streamingetion_canceled")

// JobsForMigrationSuccess is a counter that is incremented whenever a stream ingestion
// job completes successfully.
var JobsForMigrationSuccess = telemetry.GetCounterOnce("sql.schema.job.migration_successful")

// JobsForMigrationFailed is a counter that is incremented whenever a stream ingestion
// job fails.
var JobsForMigrationFailed = telemetry.GetCounterOnce("sql.schema.job.migration_failed")

// JobsForMigrationCanceled is a counter that is incremented whenever a stream ingestion
// job gets canceled.
var JobsForMigrationCanceled = telemetry.GetCounterOnce("sql.schema.job.migration_canceled")

0 comments on commit a5df1c8

Please sign in to comment.