Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulkio: Correctly group producer/consumers when importing data #49995

Merged
merged 1 commit into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,66 @@ func TestImportHonorsResumePosition(t *testing.T) {
}
}

type duplicateKeyErrorAdder struct {
doNothingKeyAdder
}

var _ kvserverbase.BulkAdder = &duplicateKeyErrorAdder{}

func (a *duplicateKeyErrorAdder) Add(_ context.Context, k roachpb.Key, v []byte) error {
return &kvserverbase.DuplicateKeyError{Key: k, Value: v}
}

func TestImportHandlesDuplicateKVs(t *testing.T) {
defer leaktest.AfterTest(t)()

batchSize := 13
defer row.TestingSetDatumRowConverterBatchSize(batchSize)()
evalCtx := tree.MakeTestingEvalContext(nil)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: &cluster.Settings{},
ExternalStorage: externalStorageFactory,
BulkAdder: func(
_ context.Context, _ *kv.DB, _ hlc.Timestamp,
opts kvserverbase.BulkAdderOptions) (kvserverbase.BulkAdder, error) {
return &duplicateKeyErrorAdder{}, nil
},
TestingKnobs: execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
},
}

// In this test, we'll attempt to import different input formats.
// All imports produce a DuplicateKeyError, which we expect to be propagated.
testSpecs := []testSpec{
newTestSpec(t, csvFormat(), "testdata/csv/data-0"),
newTestSpec(t, mysqlDumpFormat(), "testdata/mysqldump/simple.sql"),
newTestSpec(t, mysqlOutFormat(), "testdata/mysqlout/csv-ish/simple.txt"),
newTestSpec(t, pgCopyFormat(), "testdata/pgcopy/default/test.txt"),
newTestSpec(t, pgDumpFormat(), "testdata/pgdump/simple.sql"),
newTestSpec(t, avroFormat(t, roachpb.AvroOptions_JSON_RECORDS), "testdata/avro/simple-sorted.json"),
}

for _, testCase := range testSpecs {
spec := testCase.getConverterSpec()

t.Run(fmt.Sprintf("duplicate-key-%v", spec.Format.Format), func(t *testing.T) {
progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
defer close(progCh)
go func() {
for range progCh {
}
}()

_, err := runImport(context.Background(), flowCtx, spec, progCh)
require.True(t, errors.HasType(err, &kvserverbase.DuplicateKeyError{}))
})
}
}

// syncBarrier allows 2 threads (a controller and a worker) to
// synchronize between themselves. A controller portion of the
// barrier waits until worker starts running, and then notifies
Expand Down
27 changes: 13 additions & 14 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ func runImport(
}

// This group holds the go routines that are responsible for producing KV batches.
// After this group is done, we need to close kvCh.
// and ingesting produced KVs.
// Depending on the import implementation both conv.start and conv.readFiles can
// produce KVs so we should close the channel only after *both* are finished.
producerGroup := ctxgroup.WithContext(ctx)
conv.start(producerGroup)
group := ctxgroup.WithContext(ctx)
conv.start(group)

// Read input files into kvs
producerGroup.GoCtx(func(ctx context.Context) error {
group.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
ctx, span := tracing.ChildSpan(ctx, "readImportFiles")
defer tracing.FinishSpan(span)
var inputs map[int32]string
Expand All @@ -77,13 +79,6 @@ func runImport(
return conv.readFiles(ctx, inputs, spec.ResumePos, spec.Format, flowCtx.Cfg.ExternalStorage)
})

// This group links together the producers (via producerGroup) and the KV ingester.
group := ctxgroup.WithContext(ctx)
group.Go(func() error {
defer close(kvCh)
return producerGroup.Wait()
})

// Ingest the KVs that the producer group emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
var summary *roachpb.BulkOpSummary
Expand All @@ -99,11 +94,15 @@ func runImport(
prog.CompletedFraction[i] = 1.0
prog.ResumePos[i] = math.MaxInt64
}
progCh <- prog
return nil
select {
case <-ctx.Done():
return ctx.Err()
case progCh <- prog:
return nil
}
})

if err := group.Wait(); err != nil {
if err = group.Wait(); err != nil {
return nil, err
}

Expand Down