Skip to content

Commit 0e835dd

Browse files
craig[bot]adityamaru
andcommitted
Merge #95323
95323: streamingccl: display fraction progressed when cutting over r=stevendanna a=adityamaru This change teaches the logic that issues RevertRange requests on replication cutover to update the fraction progressed of the replication job, as and when revisions in different ranges are reverted to the cutover time. This should provide the user a visual cue on the Jobs page in the console about how long is left until the destination tenant will be marked active. Fixes: #93448 Release note: None Co-authored-by: adityamaru <adityamaru@gmail.com>
2 parents 4ec5a5f + 706bf0e commit 0e835dd

File tree

6 files changed

+186
-9
lines changed

6 files changed

+186
-9
lines changed

pkg/ccl/streamingccl/streamingest/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ go_test(
130130
"//pkg/testutils/storageutils",
131131
"//pkg/testutils/testcluster",
132132
"//pkg/upgrade/upgradebase",
133+
"//pkg/util/ctxgroup",
133134
"//pkg/util/hlc",
134135
"//pkg/util/leaktest",
135136
"//pkg/util/limit",

pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,36 @@ func maybeRevertToCutoverTimestamp(
466466
}
467467

468468
updateRunningStatus(ctx, j, fmt.Sprintf("starting to cut over to the given timestamp %s", cutoverTime))
469+
470+
origNRanges := -1
469471
spans := []roachpb.Span{sd.Span}
472+
updateJobProgress := func() error {
473+
if spans == nil {
474+
return nil
475+
}
476+
nRanges, err := sql.NumRangesInSpans(ctx, p.ExecCfg().DB, p.DistSQLPlanner(), spans)
477+
if err != nil {
478+
return err
479+
}
480+
if origNRanges == -1 {
481+
origNRanges = nRanges
482+
}
483+
return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
484+
if nRanges < origNRanges {
485+
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
486+
if err := j.FractionProgressed(ctx, txn,
487+
jobs.FractionUpdater(fractionRangesFinished)); err != nil {
488+
return jobs.SimplifyInvalidStatusError(err)
489+
}
490+
}
491+
return nil
492+
})
493+
}
494+
470495
for len(spans) != 0 {
496+
if err := updateJobProgress(); err != nil {
497+
log.Warningf(ctx, "failed to update replication job progress: %+v", err)
498+
}
471499
var b kv.Batch
472500
for _, span := range spans {
473501
b.AddRawRequest(&roachpb.RevertRangeRequest{
@@ -479,6 +507,9 @@ func maybeRevertToCutoverTimestamp(
479507
})
480508
}
481509
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize
510+
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize != 0 {
511+
b.Header.MaxSpanRequestKeys = p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize
512+
}
482513
if err := db.Run(ctx, &b); err != nil {
483514
return false, err
484515
}
@@ -494,7 +525,7 @@ func maybeRevertToCutoverTimestamp(
494525
}
495526
}
496527
}
497-
return true, j.SetProgress(ctx, nil /* txn */, *sp.StreamIngest)
528+
return true, updateJobProgress()
498529
}
499530

500531
func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachpb.TenantID) error {

pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package streamingest
1010

1111
import (
1212
"context"
13+
"fmt"
1314
"net/url"
1415
"strings"
1516
"testing"
@@ -25,6 +26,7 @@ import (
2526
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2627
"github.com/cockroachdb/cockroach/pkg/keys"
2728
"github.com/cockroachdb/cockroach/pkg/kv"
29+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2830
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
2931
"github.com/cockroachdb/cockroach/pkg/roachpb"
3032
"github.com/cockroachdb/cockroach/pkg/security/username"
@@ -35,6 +37,7 @@ import (
3537
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3638
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3739
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
40+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3841
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3942
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4043
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -382,3 +385,138 @@ func TestReplicationJobResumptionStartTime(t *testing.T) {
382385
c.Cutover(producerJobID, replicationJobID, srcTime.GoTime())
383386
jobutils.WaitForJobToSucceed(t, c.DestSysSQL, jobspb.JobID(replicationJobID))
384387
}
388+
389+
func makeTableSpan(codec keys.SQLCodec, tableID uint32) roachpb.Span {
390+
k := codec.TablePrefix(tableID)
391+
return roachpb.Span{Key: k, EndKey: k.PrefixEnd()}
392+
}
393+
394+
func TestCutoverFractionProgressed(t *testing.T) {
395+
defer leaktest.AfterTest(t)()
396+
defer log.Scope(t).Close(t)
397+
398+
ctx := context.Background()
399+
400+
respRecvd := make(chan struct{})
401+
continueRevert := make(chan struct{})
402+
defer close(continueRevert)
403+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
404+
Knobs: base.TestingKnobs{
405+
Store: &kvserver.StoreTestingKnobs{
406+
TestingResponseFilter: func(ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
407+
for _, ru := range br.Responses {
408+
switch ru.GetInner().(type) {
409+
case *roachpb.RevertRangeResponse:
410+
respRecvd <- struct{}{}
411+
<-continueRevert
412+
}
413+
}
414+
return nil
415+
},
416+
},
417+
Streaming: &sql.StreamingTestingKnobs{
418+
OverrideRevertRangeBatchSize: 1,
419+
},
420+
},
421+
DisableDefaultTestTenant: true,
422+
})
423+
defer s.Stopper().Stop(ctx)
424+
425+
_, err := sqlDB.Exec(`CREATE TABLE foo(id) AS SELECT generate_series(1, 10)`)
426+
require.NoError(t, err)
427+
428+
cutover := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
429+
430+
// Insert some revisions which we can revert to a timestamp before the update.
431+
_, err = sqlDB.Exec(`UPDATE foo SET id = id + 1`)
432+
require.NoError(t, err)
433+
434+
// Split every other row into its own range. Progress updates are on a
435+
// per-range basis so we need >1 range to see the fraction progress.
436+
_, err = sqlDB.Exec(`ALTER TABLE foo SPLIT AT (SELECT rowid FROM foo WHERE rowid % 2 = 0)`)
437+
require.NoError(t, err)
438+
439+
var nRanges int
440+
require.NoError(t, sqlDB.QueryRow(
441+
`SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`).Scan(&nRanges))
442+
443+
require.Equal(t, nRanges, 6)
444+
var id int
445+
err = sqlDB.QueryRow(`SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&id)
446+
require.NoError(t, err)
447+
448+
// Create a mock replication job with the `foo` table span so that on cut over
449+
// we can revert the table's ranges.
450+
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
451+
jobExecCtx := &sql.FakeJobExecContext{ExecutorConfig: &execCfg}
452+
mockReplicationJobDetails := jobspb.StreamIngestionDetails{
453+
Span: makeTableSpan(execCfg.Codec, uint32(id)),
454+
}
455+
mockReplicationJobRecord := jobs.Record{
456+
Details: mockReplicationJobDetails,
457+
Progress: jobspb.StreamIngestionProgress{
458+
CutoverTime: cutover,
459+
},
460+
Username: username.TestUserName(),
461+
}
462+
registry := execCfg.JobRegistry
463+
jobID := registry.MakeJobID()
464+
replicationJob, err := registry.CreateJobWithTxn(ctx, mockReplicationJobRecord, jobID, nil)
465+
require.NoError(t, err)
466+
require.NoError(t, replicationJob.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
467+
return jobs.UpdateHighwaterProgressed(cutover, md, ju)
468+
}))
469+
470+
g := ctxgroup.WithContext(ctx)
471+
g.GoCtx(func(ctx context.Context) error {
472+
defer close(respRecvd)
473+
revert, err := maybeRevertToCutoverTimestamp(ctx, jobExecCtx, jobID)
474+
require.NoError(t, err)
475+
require.True(t, revert)
476+
return nil
477+
})
478+
479+
loadProgress := func() jobspb.Progress {
480+
j, err := execCfg.JobRegistry.LoadJob(ctx, jobID)
481+
require.NoError(t, err)
482+
return j.Progress()
483+
}
484+
progressMap := map[string]bool{
485+
"0.00": false,
486+
"0.17": false,
487+
"0.33": false,
488+
"0.50": false,
489+
"0.67": false,
490+
"0.83": false,
491+
}
492+
g.GoCtx(func(ctx context.Context) error {
493+
for {
494+
select {
495+
case <-ctx.Done():
496+
return ctx.Err()
497+
case _, ok := <-respRecvd:
498+
if !ok {
499+
return nil
500+
}
501+
sip := loadProgress()
502+
curProgress := sip.GetFractionCompleted()
503+
s := fmt.Sprintf("%.2f", curProgress)
504+
if _, ok := progressMap[s]; !ok {
505+
t.Fatalf("unexpected progress fraction %s", s)
506+
}
507+
progressMap[s] = true
508+
continueRevert <- struct{}{}
509+
}
510+
}
511+
})
512+
require.NoError(t, g.Wait())
513+
sip := loadProgress()
514+
require.Equal(t, sip.GetFractionCompleted(), float32(1))
515+
516+
// Ensure we have hit all our expected progress fractions.
517+
for k, v := range progressMap {
518+
if !v {
519+
t.Fatalf("failed to see progress fraction %s", k)
520+
}
521+
}
522+
}

pkg/sql/backfill.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -844,11 +844,11 @@ func getJobIDForMutationWithDescriptor(
844844
"job not found for table id %d, mutation %d", tableDesc.GetID(), mutationID)
845845
}
846846

847-
// numRangesInSpans returns the number of ranges that cover a set of spans.
847+
// NumRangesInSpans returns the number of ranges that cover a set of spans.
848848
//
849-
// It operates entirely on the current goroutine and is thus able to
850-
// reuse an existing kv.Txn safely.
851-
func numRangesInSpans(
849+
// It operates entirely on the current goroutine and is thus able to reuse an
850+
// existing kv.Txn safely.
851+
func NumRangesInSpans(
852852
ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span,
853853
) (int, error) {
854854
txn := db.NewTxn(ctx, "num-ranges-in-spans")
@@ -1100,7 +1100,7 @@ func (sc *SchemaChanger) distIndexBackfill(
11001100
if updatedTodoSpans == nil {
11011101
return nil
11021102
}
1103-
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, updatedTodoSpans)
1103+
nRanges, err := NumRangesInSpans(ctx, sc.db, sc.distSQLPlanner, updatedTodoSpans)
11041104
if err != nil {
11051105
return err
11061106
}
@@ -1253,7 +1253,7 @@ func (sc *SchemaChanger) distColumnBackfill(
12531253
// schema change state machine or from a previous backfill attempt,
12541254
// we scale that fraction of ranges completed by the remaining fraction
12551255
// of the job's progress bar.
1256-
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
1256+
nRanges, err := NumRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
12571257
if err != nil {
12581258
return err
12591259
}
@@ -2925,7 +2925,7 @@ func (sc *SchemaChanger) distIndexMerge(
29252925
// TODO(rui): these can be initialized along with other new schema changer dependencies.
29262926
planner := NewIndexBackfillerMergePlanner(sc.execCfg)
29272927
rc := func(ctx context.Context, spans []roachpb.Span) (int, error) {
2928-
return numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, spans)
2928+
return NumRangesInSpans(ctx, sc.db, sc.distSQLPlanner, spans)
29292929
}
29302930
tracker := NewIndexMergeTracker(progress, sc.job, rc, fractionScaler)
29312931
periodicFlusher := newPeriodicProgressFlusher(sc.settings)

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1662,6 +1662,10 @@ type StreamingTestingKnobs struct {
16621662
// frontier specs generated for the replication job.
16631663
AfterReplicationFlowPlan func([]*execinfrapb.StreamIngestionDataSpec,
16641664
*execinfrapb.StreamIngestionFrontierSpec)
1665+
1666+
// OverrideRevertRangeBatchSize allows overriding the `MaxSpanRequestKeys`
1667+
// used when sending a RevertRange request.
1668+
OverrideRevertRangeBatchSize int64
16651669
}
16661670

16671671
var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}

pkg/sql/job_exec_context_test_util.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ func (p *FakeJobExecContext) SessionDataMutatorIterator() *sessionDataMutatorIte
5252

5353
// DistSQLPlanner implements the JobExecContext interface.
5454
func (p *FakeJobExecContext) DistSQLPlanner() *DistSQLPlanner {
55-
panic("unimplemented")
55+
if p.ExecutorConfig == nil {
56+
panic("unimplemented")
57+
}
58+
return p.ExecutorConfig.DistSQLPlanner
5659
}
5760

5861
// LeaseMgr implements the JobExecContext interface.

0 commit comments

Comments
 (0)