Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
121303: backup: don't flush mid-row r=stevendanna a=dt

This patch prevents the file sink from flushing mid row-- i.e. when an export
request splits between two column families in the same sql row. By preventing
this flush from occuring, backup guarantees that the backup file span start and
end keys are valid split points, which online restore can use.

Release note: none.
Epic: none.

122112: sqlstats: rename sizeUnsafe -> sizeUnsafeLocked r=xinhaoz a=xinhaoz

This function accesses data behind a mutex without acquring it. Ensure that this is clearly communicated so this fn is not misused.

Epic: none

Release note: None

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
  • Loading branch information
3 people committed Apr 10, 2024
3 parents c453bd1 + e839e60 + f6332e8 commit e37b1f1
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 153 deletions.
27 changes: 18 additions & 9 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ type exportedSpan struct {
dataSST []byte
revStart hlc.Timestamp
completedSpans int32
atKeyBoundary bool
resumeKey roachpb.Key
}

func runBackupProcessor(
Expand Down Expand Up @@ -508,6 +508,7 @@ func runBackupProcessor(
return ctx.Err()
case spans := <-todo:
for _, span := range spans {
resumed := false
for len(span.span.Key) != 0 {
splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV)
// If we started splitting already, we must continue until we reach the end
Expand Down Expand Up @@ -613,9 +614,11 @@ func runBackupProcessor(
span.lastTried = timeutil.Now()
span.attempts++
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, lockErr.Error())
// If we're not mid-key we can put this on the the queue to give
// it time to resolve on its own while we work on other spans.
if span.firstKeyTS.IsEmpty() {
// If we're not mid-span we can put this on the the queue to
// give it time to resolve on its own while we work on other
// spans; if we've flushed any of this span though we finish it
// so that we get to a known row end key for our backed up span.
if !resumed {
todo <- []spanAndTime{span}
span = spanAndTime{}
}
Expand Down Expand Up @@ -656,7 +659,7 @@ func runBackupProcessor(
if !resp.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}

resumed = true
resumeTS := hlc.Timestamp{}
// Taking resume timestamp from the last file of response since files must
// always be consecutive even if we currently expect only one.
Expand Down Expand Up @@ -702,9 +705,12 @@ func runBackupProcessor(
LocalityKV: destLocalityKV,
ApproximatePhysicalSize: uint64(len(file.SST)),
},
dataSST: file.SST,
revStart: resp.StartTime,
atKeyBoundary: file.EndKeyTS.IsEmpty()}
dataSST: file.SST,
revStart: resp.StartTime,
}
if resp.ResumeSpan != nil {
ret.resumeKey = resumeSpan.span.Key
}
if span.start != spec.BackupStartTime {
ret.metadata.StartTime = span.start
ret.metadata.EndTime = span.end
Expand All @@ -715,7 +721,10 @@ func runBackupProcessor(
ret.completedSpans = completedSpans
}

if err := sink.write(ctx, ret); err != nil {
// Cannot set the error to err, which is shared across workers.
var writeErr error
resumeSpan.span.Key, writeErr = sink.write(ctx, ret)
if writeErr != nil {
return err
}
}
Expand Down
23 changes: 13 additions & 10 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6652,7 +6652,7 @@ INSERT INTO foo.bar VALUES (110), (210), (310), (410), (510)`)
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='50b'`)
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test2'`)
startingSpan = mkSpan(id1, "/Tenant/10/Table/:id/1", "/Tenant/10/Table/:id/2")
resumeSpan := mkSpan(id1, "/Tenant/10/Table/:id/1/510/0", "/Tenant/10/Table/:id/2")
resumeSpan := mkSpan(id1, "/Tenant/10/Table/:id/1/510", "/Tenant/10/Table/:id/2")
mu.Lock()
require.Equal(t, []string{startingSpan.String(), resumeSpan.String()}, mu.exportRequestSpans)
mu.Unlock()
Expand All @@ -6664,10 +6664,10 @@ INSERT INTO foo.bar VALUES (110), (210), (310), (410), (510)`)
var expected []string
for _, resume := range []exportResumePoint{
{mkSpan(id1, "/Tenant/10/Table/:id/1", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/210/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/310/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/410/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/510/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/210", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/310", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/410", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id1, "/Tenant/10/Table/:id/1/510", "/Tenant/10/Table/:id/2"), withoutTS},
} {
expected = append(expected, requestSpanStr(resume.Span, resume.timestamp))
}
Expand Down Expand Up @@ -6695,12 +6695,15 @@ INSERT INTO baz.bar VALUES (110, 'a'), (210, 'b'), (310, 'c'), (410, 'd'), (510,
for _, resume := range []exportResumePoint{
{mkSpan(id2, "/Tenant/10/Table/3", "/Tenant/10/Table/4"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/210/0", "/Tenant/10/Table/:id/2"), withoutTS},
// We have two entries for 210 because of history and super small table size
{mkSpan(id2, "/Tenant/10/Table/:id/1/210", "/Tenant/10/Table/:id/2"), withoutTS},
// We have two entries for 210 because of history and super small table
// size. Note that the second resume span has start key with the column
// family which implies that the previous export request response will not
// flush until this span completes.
{mkSpan(id2, "/Tenant/10/Table/:id/1/210/0", "/Tenant/10/Table/:id/2"), withTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/310/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/410/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/510/0", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/310", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/410", "/Tenant/10/Table/:id/2"), withoutTS},
{mkSpan(id2, "/Tenant/10/Table/:id/1/510", "/Tenant/10/Table/:id/2"), withoutTS},
} {
expected = append(expected, requestSpanStr(resume.Span, resume.timestamp))
}
Expand Down
Loading

0 comments on commit e37b1f1

Please sign in to comment.