Skip to content

Commit

Permalink
backupccl: break dependency on gossip
Browse files Browse the repository at this point in the history
This commit removes the dependency of the backup / restore on gossip.
Previously, gossip was used to count the number of nodes in the cluster,
and this information is used for telemetry reporting as well as for
chunking up the working spans. However, that seems unnecessary and in
some cases incorrect.

In particular, for backup telemetry we computed speed per node via
dividing by the cluster node count, but we might have used less SQL
instances, so we should actually be dividing by the number of SQL
instances used in the backup plan. In restore we use all available SQL
instances, and we now calculate that number right before using it for
telemetry calculation. It's possible that we used different number of
instances in the restore operations (which there appear to be three)
when the instances went up or down, but it's probably not that
important.

Release note: None
  • Loading branch information
yuzefovich committed Jun 13, 2023
1 parent 8814ce5 commit 0606fce
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 68 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ go_library(
"//pkg/cloud/cloudprivilege",
"//pkg/clusterversion",
"//pkg/featureflag",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/joberror",
"//pkg/jobs/jobspb",
Expand Down
73 changes: 24 additions & 49 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -96,37 +95,16 @@ func filterSpans(includes []roachpb.Span, excludes []roachpb.Span) []roachpb.Spa
return cov.Slice()
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
g, err := gw.OptionalErr(47970)
if err != nil {
return 0, err
}
var nodes int
err = g.IterateInfos(
gossip.KeyNodeDescPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
},
)
if err != nil {
return 0, err
}
// If we somehow got 0 and return it, a caller may panic if they divide by
// such a nonsensical nodecount.
if nodes == 0 {
return 1, errors.New("failed to count nodes")
}
return nodes, nil
}

// backup exports a snapshot of every kv entry into ranged sstables.
//
// The output is an sstable per range with files in the following locations:
// - <dir>/<unique_int>.sst
// - <dir> is given by the user and may be cloud storage
// - Each file contains data for a key range that doesn't overlap with any other
// file.
//
// - numBackupInstances indicates the number of SQL instances that were used to
// execute the backup.
func backup(
ctx context.Context,
execCtx sql.JobExecContext,
Expand All @@ -141,7 +119,7 @@ func backup(
encryption *jobspb.BackupEncryptionOptions,
statsCache *stats.TableStatisticsCache,
execLocality roachpb.Locality,
) (roachpb.RowCount, error) {
) (_ roachpb.RowCount, numBackupInstances int, _ error) {
resumerSpan := tracing.SpanFromContext(ctx)
var lastCheckpoint time.Time

Expand All @@ -158,12 +136,12 @@ func backup(
iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv)
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}
defer it.Close()
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
} else if !ok {
break
}
Expand Down Expand Up @@ -196,7 +174,7 @@ func backup(
ctx, evalCtx, execCtx.ExecCfg(), physicalplan.DefaultReplicaChooser, execLocality,
)
if err != nil {
return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run")
return roachpb.RowCount{}, 0, errors.Wrap(err, "failed to determine nodes on which to run")
}

backupSpecs, err := distBackupPlanSpecs(
Expand All @@ -217,9 +195,10 @@ func backup(
backupManifest.EndTime,
)
if err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}

numBackupInstances = len(backupSpecs)
numTotalSpans := 0
for _, spec := range backupSpecs {
numTotalSpans += len(spec.IntroducedSpans) + len(spec.Spans)
Expand Down Expand Up @@ -337,7 +316,7 @@ func backup(
}

if err := ctxgroup.GoAndWait(ctx, jobProgressLoop, checkpointLoop, storePerNodeProgressLoop, runBackup); err != nil {
return roachpb.RowCount{}, errors.Wrapf(err, "exporting %d ranges", errors.Safe(numTotalSpans))
return roachpb.RowCount{}, 0, errors.Wrapf(err, "exporting %d ranges", errors.Safe(numTotalSpans))
}

backupID := uuid.MakeV4()
Expand Down Expand Up @@ -375,7 +354,7 @@ func backup(
return backupinfo.WriteBackupPartitionDescriptor(ctx, store, filename,
encryption, &kmsEnv, &desc)
}(); err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}
}
}
Expand All @@ -388,7 +367,7 @@ func backup(
// `BACKUP_METADATA` instead.
if err := backupinfo.WriteBackupManifest(ctx, defaultStore, backupbase.BackupManifestName,
encryption, &kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}

// Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy
Expand All @@ -400,27 +379,27 @@ func backup(
if backupinfo.WriteMetadataWithExternalSSTsEnabled.Get(&settings.SV) {
if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}
}

statsTable := getTableStatsForBackup(ctx, statsCache, backupManifest.Descriptors)
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &kmsEnv, &statsTable); err != nil {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}

if backupinfo.WriteMetadataSST.Get(&settings.SV) {
if err := backupinfo.WriteBackupMetadataSST(ctx, defaultStore, encryption, &kmsEnv, backupManifest,
statsTable.Statistics); err != nil {
err = errors.Wrap(err, "writing forward-compat metadata sst")
if !build.IsRelease() {
return roachpb.RowCount{}, err
return roachpb.RowCount{}, 0, err
}
log.Warningf(ctx, "%+v", err)
}
}

return backupManifest.EntryCounts, nil
return backupManifest.EntryCounts, numBackupInstances, nil
}

func releaseProtectedTimestamp(
Expand Down Expand Up @@ -742,8 +721,9 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// We want to retry a backup if there are transient failures (i.e. worker nodes
// dying), so if we receive a retryable error, re-plan and retry the backup.
var res roachpb.RowCount
var numBackupInstances int
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
res, err = backup(
res, numBackupInstances, err = backup(
ctx,
p,
details.URI,
Expand Down Expand Up @@ -852,15 +832,6 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {

// Collect telemetry.
{
numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
return err
}
log.Warningf(ctx, "unable to determine cluster node count: %v", err)
numClusterNodes = 1
}

telemetry.Count("backup.total.succeeded")
const mb = 1 << 20
sizeMb := res.DataSize / mb
Expand All @@ -869,16 +840,20 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
if sec > 0 {
mbps = mb / sec
}
if numBackupInstances == 0 {
// This can happen when we didn't have anything to back up.
numBackupInstances = 1
}
if details.StartTime.IsEmpty() {
telemetry.CountBucketed("backup.duration-sec.full-succeeded", sec)
telemetry.CountBucketed("backup.size-mb.full", sizeMb)
telemetry.CountBucketed("backup.speed-mbps.full.total", mbps)
telemetry.CountBucketed("backup.speed-mbps.full.per-node", mbps/int64(numClusterNodes))
telemetry.CountBucketed("backup.speed-mbps.full.per-node", mbps/int64(numBackupInstances))
} else {
telemetry.CountBucketed("backup.duration-sec.inc-succeeded", sec)
telemetry.CountBucketed("backup.size-mb.inc", sizeMb)
telemetry.CountBucketed("backup.speed-mbps.inc.total", mbps)
telemetry.CountBucketed("backup.speed-mbps.inc.per-node", mbps/int64(numClusterNodes))
telemetry.CountBucketed("backup.speed-mbps.inc.per-node", mbps/int64(numBackupInstances))
}
logutil.LogJobCompletion(ctx, b.getTelemetryEventType(), b.job.ID(), true, nil, res.Rows)
}
Expand Down
29 changes: 12 additions & 17 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
Expand Down Expand Up @@ -150,7 +149,6 @@ func rewriteBackupSpanKey(
func restoreWithRetry(
restoreCtx context.Context,
execCtx sql.JobExecContext,
numNodes int,
backupManifests []backuppb.BackupManifest,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
endTime hlc.Timestamp,
Expand Down Expand Up @@ -178,7 +176,6 @@ func restoreWithRetry(
res, err = restore(
restoreCtx,
execCtx,
numNodes,
backupManifests,
backupLocalityInfo,
endTime,
Expand Down Expand Up @@ -256,7 +253,6 @@ func makeBackupLocalityMap(
func restore(
restoreCtx context.Context,
execCtx sql.JobExecContext,
numNodes int,
backupManifests []backuppb.BackupManifest,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
endTime hlc.Timestamp,
Expand Down Expand Up @@ -413,7 +409,6 @@ func restore(
details.URIs,
backupLocalityInfo,
filter,
numNodes,
numImportSpans,
simpleImportSpans,
progCh,
Expand Down Expand Up @@ -1644,22 +1639,12 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}

numNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
return err
}
log.Warningf(ctx, "unable to determine cluster node count: %v", err)
numNodes = 1
}

var resTotal roachpb.RowCount

if !preData.isEmpty() {
res, err := restoreWithRetry(
ctx,
p,
numNodes,
backupManifests,
details.BackupLocalityInfo,
details.EndTime,
Expand Down Expand Up @@ -1696,7 +1681,6 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
res, err := restoreWithRetry(
ctx,
p,
numNodes,
backupManifests,
details.BackupLocalityInfo,
details.EndTime,
Expand All @@ -1718,7 +1702,6 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
res, err := restoreWithRetry(
ctx,
p,
numNodes,
backupManifests,
details.BackupLocalityInfo,
details.EndTime,
Expand Down Expand Up @@ -1813,6 +1796,18 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
// Emit an event now that the restore job has completed.
emitRestoreJobEvent(ctx, p, jobs.StatusSucceeded, r.job)

// Restore used all available SQL instances.
_, sqlInstanceIDs, err := p.DistSQLPlanner().SetupAllNodesPlanning(ctx, p.ExtendedEvalContext(), p.ExecCfg())
if err != nil {
return err
}
numNodes := len(sqlInstanceIDs)
if numNodes == 0 {
// This shouldn't ever happen, but we know that we have at least one
// instance (which is running this code right now).
numNodes = 1
}

// Collect telemetry.
{
telemetry.Count("restore.total.succeeded")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func distRestore(
uris []string,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
spanFilter spanCoveringFilter,
numNodes int,
numImportSpans int,
useSimpleImportSpans bool,
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
Expand Down Expand Up @@ -117,6 +116,7 @@ func distRestore(
return nil, nil, err
}

numNodes := len(sqlInstanceIDs)
p := planCtx.NewPhysicalPlan()

restoreDataSpec := execinfrapb.RestoreDataSpec{
Expand Down

0 comments on commit 0606fce

Please sign in to comment.