Skip to content

Commit

Permalink
Merge #60958
Browse files Browse the repository at this point in the history
60958: bulkio: use correct context in processor initialization r=adityamaru a=pbardea

This commit ensures that producer goroutines created during the
initialization of bulk processors use the same context as bp.Ctx.

This does change the context that these goroutines were using,
but that shouldn't actually have any effects on the processors, other
than missing tracing capabilities.

Closes #60940.

Closes #60984
Closes #60983
Closes #60977
Closes #60990
Closes #60989
Closes #60979
Closes #60987
Closes #60986
Closes #60985
Closes #60981
Closes #60980
Closes #60978
Closes #60978
Closes #60976
Closes #60975

Release note: None

Co-authored-by: Paul Bardea <pbardea@gmail.com>
  • Loading branch information
craig[bot] and pbardea committed Feb 23, 2021
2 parents bdcdf88 + 7dc5d1e commit 0edd7e8
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 30 deletions.
8 changes: 1 addition & 7 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,11 @@ func newBackupDataProcessor(

// Start is part of the RowSource interface.
func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = bp.StartInternal(ctx, backupProcessorName)
go func() {
defer close(bp.progCh)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh)
}()
ctx = bp.StartInternal(ctx, backupProcessorName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
// TODO(bulkio): check whether this context should be used in the closure
// above.
_ = ctx
}

// Next is part of the RowSource interface.
Expand Down
8 changes: 1 addition & 7 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func newSplitAndScatterProcessor(

// Start is part of the RowSource interface.
func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
ctx = ssp.StartInternal(ctx, splitAndScatterProcessorName)
go func() {
// Note that the loop over doneScatterCh in Next should prevent this
// goroutine from leaking when there are no errors. However, if that loop
Expand All @@ -213,13 +214,6 @@ func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
defer close(ssp.doneScatterCh)
ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer)
}()
ctx = ssp.StartInternal(ctx, splitAndScatterProcessorName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
// TODO(bulkio): check whether this context should be used in the closure
// above.
_ = ctx
}

type entryNode struct {
Expand Down
8 changes: 1 addition & 7 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,14 @@ func newReadImportDataProcessor(

// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) {
ctx = idp.StartInternal(ctx, readImportDataProcessorName)
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh,
idp.seqChunkProvider)
}()
ctx = idp.StartInternal(ctx, readImportDataProcessorName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
// TODO(bulkio): check whether this context should be used in the closure
// above.
_ = ctx
}

// Next is part of the RowSource interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/multi_region_backup
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# LogicTest: multiregion-9node-3region-3azs

skip flaky # see #60773

query TTTT
SHOW REGIONS
----
Expand Down Expand Up @@ -677,8 +679,6 @@ postgres
system
test

skip flaky # see #60773

statement ok
RESTORE DATABASE "mr-backup-1", "mr-backup-2" FROM 'nodelocal://1/mr-backup-combined/'

Expand Down
10 changes: 3 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,11 @@ func newStreamIngestionDataProcessor(
// Start is part of the RowSource interface.
func (sip *streamIngestionProcessor) Start(ctx context.Context) {
ctx = sip.StartInternal(ctx, streamIngestionProcessorName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx

evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(sip.Ctx, db, evalCtx.Settings,
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings,
func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) })
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
Expand All @@ -176,14 +172,14 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
startTime := timeutil.Unix(0 /* sec */, sip.spec.StartTime.WallTime)
eventChs := make(map[streamingccl.PartitionAddress]chan streamingccl.Event)
for _, partitionAddress := range sip.spec.PartitionAddresses {
eventCh, err := sip.client.ConsumePartition(sip.Ctx, partitionAddress, startTime)
eventCh, err := sip.client.ConsumePartition(ctx, partitionAddress, startTime)
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", partitionAddress))
return
}
eventChs[partitionAddress] = eventCh
}
sip.eventCh = sip.merge(sip.Ctx, eventChs)
sip.eventCh = sip.merge(ctx, eventChs)
}

// Next is part of the RowSource interface.
Expand Down

0 comments on commit 0edd7e8

Please sign in to comment.