Skip to content

Commit f9f183d

Browse files
authored
Merge pull request #147564 from rafiss/blathers/backport-release-25.2.1-rc-147511
release-25.2.1-rc: sql: fix memory leak in index backfill progress tracking
2 parents da46359 + 6f1dde2 commit f9f183d

File tree

3 files changed

+144
-126
lines changed

3 files changed

+144
-126
lines changed

pkg/sql/execinfra/server_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ type TestingKnobs struct {
245245

246246
// RunBeforeIndexBackfillProgressUpdate is called before updating the
247247
// progress for a single index backfill.
248-
RunBeforeIndexBackfillProgressUpdate func(completed []roachpb.Span)
248+
RunBeforeIndexBackfillProgressUpdate func(ctx context.Context, completed []roachpb.Span)
249249

250250
// SerializeIndexBackfillCreationAndIngestion ensures that every index batch
251251
// created during an index backfill is also ingested before moving on to the

pkg/sql/index_backfiller.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
2222
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2323
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
24+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
2425
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2526
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
27+
"github.com/cockroachdb/errors"
2628
)
2729

2830
// IndexBackfillPlanner holds dependencies for an index backfiller
@@ -77,17 +79,34 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
7779
completed.g.Add(c...)
7880
return completed.g.Slice()
7981
}
82+
// Add spans that were already completed before the job resumed.
83+
addCompleted(progress.CompletedSpans...)
8084
updateFunc := func(
8185
ctx context.Context, meta *execinfrapb.ProducerMetadata,
8286
) error {
8387
if meta.BulkProcessorProgress == nil {
8488
return nil
8589
}
86-
progress.CompletedSpans = append(progress.CompletedSpans, addCompleted(
87-
meta.BulkProcessorProgress.CompletedSpans...)...)
90+
progress.CompletedSpans = addCompleted(meta.BulkProcessorProgress.CompletedSpans...)
91+
// Make sure the progress update does not contain overlapping spans.
92+
// This is a sanity check that only runs in test configurations, since it
93+
// is an expensive n^2 check.
94+
if buildutil.CrdbTestBuild {
95+
for i, span1 := range progress.CompletedSpans {
96+
for j, span2 := range progress.CompletedSpans {
97+
if i <= j {
98+
continue
99+
}
100+
if span1.Overlaps(span2) {
101+
return errors.Newf("progress update contains overlapping spans: %s and %s", span1, span2)
102+
}
103+
}
104+
}
105+
}
106+
88107
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
89108
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
90-
knobs.RunBeforeIndexBackfillProgressUpdate(progress.CompletedSpans)
109+
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, progress.CompletedSpans)
91110
}
92111
return tracker.SetBackfillProgress(ctx, progress)
93112
}

pkg/sql/indexbackfiller_test.go

Lines changed: 121 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,12 @@ import (
4343
"github.com/cockroachdb/cockroach/pkg/sql/types"
4444
"github.com/cockroachdb/cockroach/pkg/testutils"
4545
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
46-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4746
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
48-
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
4947
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
5048
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
5149
"github.com/cockroachdb/cockroach/pkg/util/log"
5250
"github.com/cockroachdb/cockroach/pkg/util/mon"
51+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
5352
"github.com/cockroachdb/errors"
5453
"github.com/stretchr/testify/require"
5554
"google.golang.org/protobuf/proto"
@@ -576,61 +575,55 @@ INSERT INTO foo VALUES (1), (10), (100);
576575
func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
577576
defer leaktest.AfterTest(t)()
578577
defer log.Scope(t).Close(t)
579-
skip.UnderStress(t, "timing-sensitive test")
580578

581579
ctx := context.Background()
582-
backfillProgressUpdateCh := make(chan struct{})
583580
backfillProgressCompletedCh := make(chan []roachpb.Span)
584-
checkpointedSpansCh := make(chan []roachpb.Span)
585581
const numSpans = 100
586582
var isBlockingBackfillProgress atomic.Bool
587583
isBlockingBackfillProgress.Store(true)
588-
var isBlockingCheckpoint atomic.Bool
589584

590-
clusterArgs := base.TestClusterArgs{
591-
ServerArgs: base.TestServerArgs{
592-
Knobs: base.TestingKnobs{
593-
DistSQL: &execinfra.TestingKnobs{
594-
// We want to push progress every batch_size rows to control
595-
// the backfill incrementally.
596-
BulkAdderFlushesEveryBatch: true,
597-
RunBeforeIndexBackfillProgressUpdate: func(completed []roachpb.Span) {
598-
if isBlockingBackfillProgress.Load() {
599-
<-backfillProgressUpdateCh
600-
backfillProgressCompletedCh <- completed
585+
// Start the server with testing knob.
586+
tc, db, _ := serverutils.StartServer(t, base.TestServerArgs{
587+
Knobs: base.TestingKnobs{
588+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
589+
DistSQL: &execinfra.TestingKnobs{
590+
// We want to push progress every batch_size rows to control
591+
// the backfill incrementally.
592+
BulkAdderFlushesEveryBatch: true,
593+
RunBeforeIndexBackfillProgressUpdate: func(ctx context.Context, completed []roachpb.Span) {
594+
if isBlockingBackfillProgress.Load() {
595+
select {
596+
case <-ctx.Done():
597+
case backfillProgressCompletedCh <- completed:
598+
t.Logf("before index backfill progress update, completed spans: %v", completed)
601599
}
602-
},
600+
}
603601
},
604-
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
605-
RunBeforeBackfill: func(progresses []scexec.BackfillProgress) error {
606-
if isBlockingCheckpoint.Load() && progresses != nil && progresses[0].CompletedSpans != nil {
607-
checkpointedSpansCh <- progresses[0].CompletedSpans
608-
}
609-
return nil
610-
},
602+
},
603+
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
604+
RunBeforeBackfill: func(progresses []scexec.BackfillProgress) error {
605+
if progresses != nil {
606+
t.Logf("before resuming backfill, checkpointed spans: %v", progresses[0].CompletedSpans)
607+
}
608+
return nil
611609
},
612610
},
613611
},
614-
}
615-
// Start the server with a testing knob
616-
tc := testcluster.NewTestCluster(t, 1, clusterArgs)
617-
tc.Start(t)
612+
})
618613
defer tc.Stopper().Stop(ctx)
619-
db := tc.Conns[0]
620614

621615
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
622616
require.NoError(t, err)
623617
// Ensure that we checkpoint our progress to the backfill job so that
624618
// RESUMEs can get an up-to-date backfill progress.
625-
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '0s'`)
619+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '10ms'`)
626620
require.NoError(t, err)
627621
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
628622
require.NoError(t, err)
629623
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
630624
require.NoError(t, err)
631625
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
632626
require.NoError(t, err)
633-
require.NoError(t, tc.WaitForFullReplication())
634627
var descID catid.DescID
635628
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
636629
err = descIDRow.Scan(&descID)
@@ -646,112 +639,118 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
646639
return nil
647640
})
648641

649-
g.GoCtx(func(ctx context.Context) error {
650-
testutils.SucceedsSoon(t, func() error {
651-
jobIDRow := db.QueryRow(`
642+
testutils.SucceedsWithin(t, func() error {
643+
jobIDRow := db.QueryRow(`
652644
SELECT job_id FROM [SHOW JOBS]
653-
WHERE job_type = 'NEW SCHEMA CHANGE' AND description ILIKE '%ADD COLUMN%'`,
654-
)
655-
if err := jobIDRow.Scan(&jobID); err != nil {
645+
WHERE job_type = 'NEW SCHEMA CHANGE' AND description ILIKE '%ADD COLUMN j%'`,
646+
)
647+
if err := jobIDRow.Scan(&jobID); err != nil {
648+
return err
649+
}
650+
return nil
651+
}, 5*time.Second)
652+
653+
ensureJobState := func(targetState string) {
654+
testutils.SucceedsWithin(t, func() error {
655+
var jobState string
656+
statusRow := db.QueryRow(`SELECT status FROM [SHOW JOB $1]`, jobID)
657+
if err := statusRow.Scan(&jobState); err != nil {
656658
return err
657659
}
660+
if jobState != targetState {
661+
return errors.Errorf("expected job to be %s, but found status: %s",
662+
targetState, jobState)
663+
}
658664
return nil
659-
})
665+
}, 5*time.Second)
666+
}
660667

661-
ensureJobState := func(targetState string) {
662-
testutils.SucceedsSoon(t, func() error {
663-
var jobState string
664-
statusRow := db.QueryRow(`SELECT status FROM [SHOW JOB $1]`, jobID)
665-
if err := statusRow.Scan(&jobState); err != nil {
666-
return err
668+
var completedSpans roachpb.SpanGroup
669+
receiveProgressUpdate := func() {
670+
progressUpdate := <-backfillProgressCompletedCh
671+
672+
// Make sure the progress update does not contain overlapping spans.
673+
for i, span1 := range progressUpdate {
674+
for j, span2 := range progressUpdate {
675+
if i <= j {
676+
continue
667677
}
668-
if jobState != targetState {
669-
return errors.Errorf("expected job to be %s, but found status: %s",
670-
targetState, jobState)
678+
if span1.Overlaps(span2) {
679+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
671680
}
672-
return nil
673-
})
681+
}
674682
}
683+
completedSpans.Add(progressUpdate...)
684+
}
675685

676-
// Let the backfill step forward a bit before we do our PAUSE/RESUME
677-
// dance.
678-
var spansCompletedBeforePause []roachpb.Span
679-
for i := 0; i < 2; i++ {
680-
backfillProgressUpdateCh <- struct{}{}
681-
spansCompletedBeforePause = <-backfillProgressCompletedCh
682-
}
686+
ensureCompletedSpansAreCheckpointed := func() {
687+
testutils.SucceedsWithin(t, func() error {
688+
stmt := `SELECT payload FROM crdb_internal.system_jobs WHERE id = $1`
689+
var payloadBytes []byte
690+
if err := db.QueryRowContext(ctx, stmt, jobID).Scan(&payloadBytes); err != nil {
691+
return err
692+
}
683693

684-
_, err := db.Exec(`PAUSE JOB $1`, jobID)
685-
if err != nil {
686-
return err
687-
}
688-
ensureJobState("paused")
694+
payload := &jobspb.Payload{}
695+
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
696+
return err
697+
}
689698

690-
_, err = db.Exec(`RESUME JOB $1`, jobID)
691-
if err != nil {
692-
return err
693-
}
694-
ensureJobState("running")
699+
schemaChangeProgress := *(payload.Details.(*jobspb.Payload_NewSchemaChange).NewSchemaChange)
700+
var checkpointedSpans []roachpb.Span
701+
if len(schemaChangeProgress.BackfillProgress) > 0 {
702+
checkpointedSpans = schemaChangeProgress.BackfillProgress[0].CompletedSpans
703+
}
704+
var sg roachpb.SpanGroup
705+
sg.Add(checkpointedSpans...)
706+
// Ensure that the spans we already completed are fully contained in our
707+
// checkpointed completed spans group.
708+
if !sg.Encloses(completedSpans.Slice()...) {
709+
return errors.Errorf("checkpointed spans %v do not enclose completed spans %v",
710+
checkpointedSpans, completedSpans.Slice())
711+
}
695712

696-
for i := 0; i < 2; i++ {
697-
backfillProgressUpdateCh <- struct{}{}
698-
<-backfillProgressCompletedCh
699-
}
700-
isBlockingCheckpoint.Store(true)
713+
return nil
714+
}, 5*time.Second)
715+
}
701716

702-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
703-
if err != nil {
704-
return err
705-
}
706-
ensureJobState("paused")
717+
// Let the backfill step forward a bit before we do our PAUSE/RESUME
718+
// dance.
719+
for i := 0; i < 2; i++ {
720+
receiveProgressUpdate()
721+
}
707722

708-
_, err = db.Exec(`RESUME JOB $1`, jobID)
709-
if err != nil {
710-
return err
711-
}
712-
ensureJobState("running")
713-
714-
var wg sync.WaitGroup
715-
wg.Add(1)
716-
go func() {
717-
defer wg.Done()
718-
timer := time.NewTimer(1 * time.Minute)
719-
defer timer.Stop()
720-
select {
721-
case checkpointed := <-checkpointedSpansCh:
722-
isBlockingCheckpoint.Store(false)
723-
var sg roachpb.SpanGroup
724-
sg.Add(checkpointed...)
725-
// Ensure that the spans we completed before any PAUSE is
726-
// fully contained in our checkpointed completed spans group.
727-
require.True(t, sg.Encloses(spansCompletedBeforePause...))
728-
case <-timer.C:
729-
require.Fail(t, "timed out waiting for checkpoint")
730-
}
731-
}()
732-
733-
wg.Add(1)
734-
go func() {
735-
defer wg.Done()
736-
737-
isBlockingBackfillProgress.Store(false)
738-
timer := time.NewTimer(1 * time.Minute)
739-
defer timer.Stop()
740-
select {
741-
case backfillProgressUpdateCh <- struct{}{}:
742-
<-backfillProgressCompletedCh
743-
case <-timer.C:
744-
require.Fail(t, "timed out waiting for backfill progress")
745-
}
746-
}()
723+
ensureCompletedSpansAreCheckpointed()
724+
t.Logf("pausing backfill")
725+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
726+
require.NoError(t, err)
727+
ensureJobState("paused")
747728

748-
// Wait for both operations to complete
749-
wg.Wait()
729+
t.Logf("resuming backfill")
730+
_, err = db.Exec(`RESUME JOB $1`, jobID)
731+
require.NoError(t, err)
732+
ensureJobState("running")
750733

751-
// Now we can wait for the job to succeed
752-
ensureJobState("succeeded")
753-
return nil
754-
})
734+
// Step forward again before re-pausing.
735+
for i := 0; i < 2; i++ {
736+
receiveProgressUpdate()
737+
}
738+
739+
ensureCompletedSpansAreCheckpointed()
740+
isBlockingBackfillProgress.Store(false)
741+
742+
t.Logf("pausing backfill")
743+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
744+
require.NoError(t, err)
745+
ensureJobState("paused")
746+
747+
t.Logf("resuming backfill")
748+
_, err = db.Exec(`RESUME JOB $1`, jobID)
749+
require.NoError(t, err)
750+
ensureJobState("running")
751+
752+
// Now we can wait for the job to succeed
753+
ensureJobState("succeeded")
755754

756755
if err = g.Wait(); err != nil {
757756
require.NoError(t, err)

0 commit comments

Comments
 (0)