Skip to content

Commit

Permalink
backupccl: add trace recordings to the backup execution
Browse files Browse the repository at this point in the history
This change adds trace recordings to noteworthy places during
backup exection.

Release note: None
  • Loading branch information
adityamaru committed Jun 29, 2021
1 parent ecdd850 commit 94d9723
Show file tree
Hide file tree
Showing 11 changed files with 1,376 additions and 125 deletions.
1,345 changes: 1,236 additions & 109 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,34 @@ message RestoreProgress {
int64 progressIdx = 2;
roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false];
}

message BackupProcessorPlanningTraceEvent {
map<int32, int64> node_to_num_spans = 1 [(gogoproto.nullable) = false];
int64 total_num_spans = 2;
}

message BackupProgressTraceEvent {
int64 total_num_files = 1;
RowCount total_entry_counts = 2 [(gogoproto.nullable) = false];
util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false];
}

// BackupExportTraceRequestEvent is the trace event recorded when an
// ExportRequest has been sent.
message BackupExportTraceRequestEvent {
string span = 1;
int32 attempt = 2;
string priority = 3;
string req_sent_time = 4;
}

// BackupExportTraceResponseEvent is the trace event recorded when we receive a
// response from the ExportRequest.
message BackupExportTraceResponseEvent {
string duration = 1;
int32 num_files = 2;
repeated RowCount file_summaries = 3 [(gogoproto.nullable) = false];
bool has_returned_ssts = 4 [(gogoproto.customname) = "HasReturnedSSTs"];
string retryable_error = 5;
}

28 changes: 28 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -165,6 +166,7 @@ func backup(
// TODO(dan): Figure out how permissions should work. #6713 is tracking this
// for grpc.

resumerSpan := tracing.SpanFromContext(ctx)
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
Expand Down Expand Up @@ -241,6 +243,7 @@ func backup(
// When a processor is done exporting a span, it will send a progress update
// to progCh.
defer close(requestFinishedCh)
var numBackedUpFiles int64
for progress := range progCh {
var progDetails BackupManifest_Progress
if err := types.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
Expand All @@ -252,13 +255,19 @@ func backup(
for _, file := range progDetails.Files {
backupManifest.Files = append(backupManifest.Files, file)
backupManifest.EntryCounts.add(file.EntryCounts)
numBackedUpFiles++
}

// Signal that an ExportRequest finished to update job progress.
for i := int32(0); i < progDetails.CompletedSpans; i++ {
requestFinishedCh <- struct{}{}
}
if timeutil.Since(lastCheckpoint) > BackupCheckpointInterval {
resumerSpan.RecordStructured(&BackupProgressTraceEvent{
TotalNumFiles: numBackedUpFiles,
TotalEntryCounts: backupManifest.EntryCounts,
RevisionStartTime: backupManifest.RevisionStartTime,
})
err := writeBackupManifest(
ctx, settings, defaultStore, backupManifestCheckpointName, encryption, backupManifest,
)
Expand All @@ -272,6 +281,7 @@ func backup(
return nil
}

resumerSpan.RecordStructured(&types.StringValue{Value: "starting DistSQL backup execution"})
runBackup := func(ctx context.Context) error {
return distBackup(
ctx,
Expand All @@ -291,6 +301,7 @@ func backup(
backupManifest.ID = backupID
// Write additional partial descriptors to each node for partitioned backups.
if len(storageByLocalityKV) > 0 {
resumerSpan.RecordStructured(&types.StringValue{Value: "writing partition descriptors for partitioned backup"})
filesByLocalityKV := make(map[string][]BackupManifest_File)
for _, file := range backupManifest.Files {
filesByLocalityKV[file.LocalityKV] = append(filesByLocalityKV[file.LocalityKV], file)
Expand Down Expand Up @@ -325,6 +336,7 @@ func backup(
}
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"})
if err := writeBackupManifest(ctx, settings, defaultStore, backupManifestName, encryption, backupManifest); err != nil {
return RowCount{}, err
}
Expand Down Expand Up @@ -353,6 +365,7 @@ func backup(
Statistics: tableStatistics,
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"})
if err := writeTableStatistics(ctx, defaultStore, backupStatisticsFileName, encryption, &statsTable); err != nil {
return RowCount{}, err
}
Expand Down Expand Up @@ -388,8 +401,15 @@ type backupResumer struct {
}
}

var _ jobs.TraceableJob = &backupResumer{}

// ForceRealSpan implements the TraceableJob interface.
func (b *backupResumer) ForceRealSpan() {}

// Resume is part of the jobs.Resumer interface.
func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
resumerSpan := tracing.SpanFromContext(ctx)
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)

Expand Down Expand Up @@ -417,6 +437,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {

ptsID := details.ProtectedTimestampRecord
if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps {
resumerSpan.RecordStructured(&types.StringValue{Value: "verifying protected timestamp"})
if err := p.ExecCfg().ProtectedTimestampProvider.Verify(ctx, *ptsID); err != nil {
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
Expand Down Expand Up @@ -454,7 +475,14 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// We want to retry a backup if there are transient failures (i.e. worker nodes
// dying), so if we receive a retryable error, re-plan and retry the backup.
var res RowCount
var retryCount int32
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
retryCount++
resumerSpan.RecordStructured(&roachpb.RetryTracingEvent{
Operation: "backupResumer.Resume",
AttemptNumber: retryCount,
RetryError: tracing.RedactAndTruncateError(err),
})
res, err = backup(
ctx,
p,
Expand Down
28 changes: 27 additions & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
gogotypes "github.com/gogo/protobuf/types"
)
Expand Down Expand Up @@ -193,6 +194,7 @@ func runBackupProcessor(
spec *execinfrapb.BackupDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings

todo := make(chan spanAndTime, len(spec.Spans)+len(spec.IntroducedSpans))
Expand Down Expand Up @@ -326,23 +328,36 @@ func runBackupProcessor(
span.span, span.attempts+1, header.UserPriority.String())
var rawRes roachpb.Response
var pErr *roachpb.Error
var reqSentTime time.Time
var respReceivedTime time.Time
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
reqSentTime = timeutil.Now()
backupProcessorSpan.RecordStructured(&BackupExportTraceRequestEvent{
Span: span.span.String(),
Attempt: int32(span.attempts + 1),
Priority: header.UserPriority.String(),
ReqSentTime: reqSentTime.String(),
})

rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(),
header, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok {
if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok {
span.lastTried = timeutil.Now()
span.attempts++
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
backupProcessorSpan.RecordStructured(&BackupExportTraceResponseEvent{
RetryableError: tracing.RedactAndTruncateError(intentErr)})
continue
}
// TimeoutError improves the opaque `context deadline exceeded` error
Expand Down Expand Up @@ -382,14 +397,22 @@ func runBackupProcessor(
completedSpans = 1
}

duration := respReceivedTime.Sub(reqSentTime)
exportResponseTraceEvent := &BackupExportTraceResponseEvent{
Duration: duration.String(),
FileSummaries: make([]RowCount, 0),
}
var numFiles int
files := make([]BackupManifest_File, 0)
for i, file := range res.Files {
numFiles++
f := BackupManifest_File{
Span: file.Span,
Path: file.Path,
EntryCounts: countRows(file.Exported, spec.PKIDs),
LocalityKV: file.LocalityKV,
}
exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, f.EntryCounts)
if span.start != spec.BackupStartTime {
f.StartTime = span.start
f.EndTime = span.end
Expand All @@ -398,6 +421,7 @@ func runBackupProcessor(
// ch for the writer goroutine to handle. Otherwise, go
// ahead and record the file for progress reporting.
if len(file.SST) > 0 {
exportResponseTraceEvent.HasReturnedSSTs = true
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
Expand All @@ -413,6 +437,8 @@ func runBackupProcessor(
files = append(files, f)
}
}
exportResponseTraceEvent.NumFiles = int32(numFiles)
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

// If we have replies for exported files (as oppposed to the
// ones with inline SSTs we had to forward to the uploader
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand All @@ -38,6 +39,9 @@ func distBackupPlanSpecs(
mvccFilter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
) (map[roachpb.NodeID]*execinfrapb.BackupDataSpec, error) {
ctx, span := tracing.ChildSpan(planCtx.EvalContext().Context, "backup-plan-specs")
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
defer span.Finish()
user := execCtx.User()
execCfg := execCtx.ExecCfg()

Expand Down Expand Up @@ -120,6 +124,16 @@ func distBackupPlanSpecs(
}
}

backupPlanningTraceEvent := BackupProcessorPlanningTraceEvent{
NodeToNumSpans: make(map[int32]int64),
}
for node, spec := range nodeToSpec {
numSpans := int64(len(spec.Spans) + len(spec.IntroducedSpans))
backupPlanningTraceEvent.NodeToNumSpans[int32(node)] = numSpans
backupPlanningTraceEvent.TotalNumSpans += numSpans
}
span.RecordStructured(&backupPlanningTraceEvent)

return nodeToSpec, nil
}

Expand All @@ -134,6 +148,8 @@ func distBackup(
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupSpecs map[roachpb.NodeID]*execinfrapb.BackupDataSpec,
) error {
ctx, span := tracing.ChildSpan(ctx, "backup-distsql")
defer span.Finish()
ctx = logtags.AddTag(ctx, "backup-distsql", nil)
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/storageccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_gogo_protobuf//types",
"@org_golang_x_crypto//pbkdf2",
],
)
Expand Down
24 changes: 22 additions & 2 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
)

// ExportRequestTargetFileSize controls the target file size for SSTs created
Expand Down Expand Up @@ -74,8 +75,18 @@ func evalExport(
h := cArgs.Header
reply := resp.(*roachpb.ExportResponse)

ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer span.Finish()
ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer evalExportSpan.Finish()

var evalExportTrace types.StringValue
if cArgs.EvalCtx.NodeID() == h.GatewayNodeID {
evalExportTrace.Value = fmt.Sprintf("evaluating Export [%s, %s) on local node %d",
args.Key, args.EndKey, cArgs.EvalCtx.NodeID())
} else {
evalExportTrace.Value = fmt.Sprintf("evaluating Export [%s, %s) on remote node %d",
args.Key, args.EndKey, cArgs.EvalCtx.NodeID())
}
evalExportSpan.RecordStructured(&evalExportTrace)

// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
Expand Down Expand Up @@ -200,13 +211,22 @@ func evalExport(
}

exported.Path = GenerateUniqueSSTName(base.SQLInstanceID(cArgs.EvalCtx.NodeID()))
var attemptNum int
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error {
attemptNum++
retryTracingEvent := roachpb.RetryTracingEvent{
Operation: fmt.Sprintf("%s.ExportRequest.WriteFile", exportStore.Conf().Provider.String()),
AttemptNumber: int32(attemptNum),
}
// We blindly retry any error here because we expect the caller to have
// verified the target is writable before sending ExportRequests for it.
if err := cloud.WriteFile(ctx, exportStore, exported.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
retryTracingEvent.RetryError = fmt.Sprintf("failed to put file: %s", tracing.RedactAndTruncateError(err))
evalExportSpan.RecordStructured(&retryTracingEvent)
return err
}
evalExportSpan.RecordStructured(&retryTracingEvent)
return nil
}); err != nil {
return result.Result{}, err
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ go_library(
"//pkg/util/sysutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)
14 changes: 2 additions & 12 deletions pkg/storage/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// Timeout is a cluster setting used for cloud storage interactions.
Expand Down Expand Up @@ -75,15 +74,6 @@ func MakeHTTPClient(settings *cluster.Settings) (*http.Client, error) {
return &http.Client{Transport: t}, nil
}

func redactAndTruncateError(err error) string {
maxErrLength := 250
redactedErr := string(redact.Sprintf("%v", err))
if len(redactedErr) < maxErrLength {
maxErrLength = len(redactedErr)
}
return redactedErr[:maxErrLength]
}

// MaxDelayedRetryAttempts is the number of times the delayedRetry method will
// re-run the provided function.
const MaxDelayedRetryAttempts = 3
Expand All @@ -104,7 +94,7 @@ func DelayedRetry(
retryEvent := &roachpb.RetryTracingEvent{
Operation: opName,
AttemptNumber: attemptNumber,
RetryError: redactAndTruncateError(err),
RetryError: tracing.RedactAndTruncateError(err),
}
span.RecordStructured(retryEvent)
if customDelay != nil {
Expand Down Expand Up @@ -230,7 +220,7 @@ func (r *ResumingReader) Read(p []byte) (int, error) {
retryEvent := &roachpb.RetryTracingEvent{
Operation: "ResumingReader.Reader.Read",
AttemptNumber: int32(retries + 1),
RetryError: redactAndTruncateError(lastErr),
RetryError: tracing.RedactAndTruncateError(lastErr),
}
span.RecordStructured(retryEvent)
if retries >= maxNoProgressReads {
Expand Down
Loading

0 comments on commit 94d9723

Please sign in to comment.