Skip to content

Commit

Permalink
importccl: add experimental_save_rejected option to CSV IMPORT
Browse files Browse the repository at this point in the history
This was promised to a client and will be backported to 19.2.1.
The feature should stay undocumented for now since the semantics
and UX are still not well understood.

To make the change work, we had to remove the parsing workers from
`conv.start` and move them to `readFile` which means that for each
file a separate set of workers will be brought up and down. Also the
tally for all total number of rejected rows was moved to `read_import_base.go`.

Release note (sql change): add undocumented experimental_save_rejected
option to CSV IMPORT.
  • Loading branch information
Spas Bojanov authored and spaskob committed Nov 1, 2019
1 parent dc7b64e commit 4c22e38
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 137 deletions.
15 changes: 3 additions & 12 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -135,16 +134,11 @@ func TestConverterFlushesBatches(t *testing.T) {
t.Fatalf("makeInputConverter() error = %v", err)
}

group := ctxgroup.WithContext(ctx)

group.Go(func() error {
return conv.readFiles(ctx, testCase.inputs, converterSpec.Format, externalStorage)
})

conv.start(group)
go func() {
defer close(kvCh)
err = group.Wait()
if err := conv.readFiles(ctx, testCase.inputs, converterSpec.Format, externalStorage); err != nil {
t.Fatalf("Conversion failed: %v", err)
}
}()

lastBatch := 0
Expand All @@ -160,9 +154,6 @@ func TestConverterFlushesBatches(t *testing.T) {
testNumRecords = testNumRecords + lastBatch
testNumBatches++
}
if err != nil {
t.Fatalf("Conversion failed: %v", err)
}

if batchSize == 0 {
expectedNumRecords = testNumRecords
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func importPlanHook(
case "CSV":
telemetry.Count("import.format.csv")
format.Format = roachpb.IOFileFormat_CSV
// Set the default CSV separator for the cases when it is not overwritten.
format.Csv.Comma = ','
if override, ok := opts[csvDelimiter]; ok {
comma, err := util.GetSingleRune(override)
if err != nil {
Expand Down Expand Up @@ -268,6 +270,11 @@ func importPlanHook(
if _, ok := opts[csvStrictQuotes]; ok {
format.Csv.StrictQuotes = true
}
// TODO(spaskob): Refactor so that the save rejected option
// is passed in all import formats not just CSV.
if _, ok := opts[importOptionSaveRejected]; ok {
format.Csv.SaveRejected = true
}
case "DELIMITED":
telemetry.Count("import.format.mysqlout")
format.Format = roachpb.IOFileFormat_MysqlOutfile
Expand Down
54 changes: 37 additions & 17 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,20 @@ d
},
},
{
name: "invalid byte",
create: `b bytes`,
typ: "CSV",
data: `\x0g`,
err: "invalid byte",
name: "invalid byte",
create: `b bytes`,
typ: "CSV",
data: `\x0g`,
rejected: `\x0g` + "\n",
err: "invalid byte",
},
{
name: "bad bytes length",
create: `b bytes`,
typ: "CSV",
data: `\x0`,
err: "odd length hex string",
name: "bad bytes length",
create: `b bytes`,
typ: "CSV",
data: `\x0`,
rejected: `\x0` + "\n",
err: "odd length hex string",
},
{
name: "oversample",
Expand Down Expand Up @@ -252,6 +254,24 @@ d
data: `"abc"de"`,
err: `row 1: reading CSV record: parse error on line 1, column 4: extraneous or missing " in quoted-field`,
},
{
name: "too many imported columns",
create: `i int8`,
typ: "CSV",
data: "1,2\n3\n11,22",
err: "row 1: expected 1 fields, got 2",
rejected: "1,2\n11,22\n",
query: map[string][][]string{`SELECT * from t`: {{"3"}}},
},
{
name: "parsing error",
create: `i int8, j int8`,
typ: "CSV",
data: "not_int,2\n3,4",
err: `row 1: parse "i" as INT8: could not parse "not_int" as type int`,
rejected: "not_int,2\n",
query: map[string][][]string{`SELECT * from t`: {{"3", "4"}}},
},

// MySQL OUTFILE
// If err field is non-empty, the query filed specifies what expect
Expand Down Expand Up @@ -922,7 +942,7 @@ COPY t (a, b, c) FROM stdin;
}

for i, tc := range tests {
if tc.typ != "DELIMITED" && saveRejected {
if tc.typ != "CSV" && tc.typ != "DELIMITED" && saveRejected {
continue
}
if saveRejected {
Expand All @@ -942,7 +962,7 @@ COPY t (a, b, c) FROM stdin;
} else {
q = fmt.Sprintf(`IMPORT %s ($1) %s`, tc.typ, tc.with)
}
t.Log(q, srv.URL, tc.data)
t.Log(q, srv.URL, "\nFile contents:\n", tc.data)
mockRecorder.dataString = tc.data
mockRecorder.rejectedString = ""
if !saveRejected || tc.rejected == "" {
Expand All @@ -955,7 +975,7 @@ COPY t (a, b, c) FROM stdin;
sqlDB.CheckQueryResults(t, query, res)
}
if tc.rejected != mockRecorder.rejectedString {
t.Errorf("expected:\n<%v>\ngot:\n<%v>\n", tc.rejected,
t.Errorf("expected:\n%v\ngot:\n%v\n", tc.rejected,
mockRecorder.rejectedString)
}
}
Expand Down Expand Up @@ -2009,7 +2029,7 @@ func TestImportIntoCSV(t *testing.T) {

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 3 fields, got 2", stripFilenameQuotes),
t, fmt.Sprintf("\"%s\": row 1: expected 3 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]),
)
})
Expand All @@ -2022,7 +2042,7 @@ func TestImportIntoCSV(t *testing.T) {

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes),
t, fmt.Sprintf("\"%s\": row 1: expected 1 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)
})
Expand Down Expand Up @@ -2345,7 +2365,7 @@ func BenchmarkConvertRecord(b *testing.B) {
if len(batch.r) > batchSize {
recordCh <- batch
batch.r = make([][]string, 0, batchSize)
batch.rowOffset = i
batch.rowOffset = int64(i)
}

batch.r = append(batch.r, tpchLineItemDataRows[i%len(tpchLineItemDataRows)])
Expand Down Expand Up @@ -2491,7 +2511,7 @@ func TestImportWorkerFailure(t *testing.T) {
// TODO(mjibson): Although this test passes most of the time it still
// sometimes fails because not all kinds of failures caused by shutting a
// node down are detected and retried.
t.Skip("flakey due to undetected kinds of failures when the node is shutdown")
t.Skip("flaky due to undetected kinds of failures when the node is shutdown")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
Expand Down
22 changes: 17 additions & 5 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"compress/bzip2"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -163,19 +164,30 @@ func readInputFiles(
src.Reader = decompressed

var rejected chan string
if format.Format == roachpb.IOFileFormat_MysqlOutfile && format.MysqlOut.SaveRejected {
if (format.Format == roachpb.IOFileFormat_CSV && format.Csv.SaveRejected) ||
(format.Format == roachpb.IOFileFormat_MysqlOutfile && format.MysqlOut.SaveRejected) {
rejected = make(chan string)
}
if rejected != nil {
grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
var buf []byte
atFirstLine := true
var countRejected int64
for s := range rejected {
countRejected++
if countRejected > 1000 { // TODO(spaskob): turn the magic constant into an option
return pgerror.New(
pgcode.DataCorrupted,
fmt.Sprintf(
"too many parsing errors (%d) encountered for file %s",
countRejected,
dataFile,
),
)
}
buf = append(buf, s...)
atFirstLine = false
}
if atFirstLine {
if countRejected == 0 {
// no rejected rows
return nil
}
Expand Down Expand Up @@ -212,7 +224,7 @@ func readInputFiles(
return errors.Wrap(err, dataFile)
}
} else {
if err := fileFunc(ctx, src, dataFileIndex, dataFile, rejected); err != nil {
if err := fileFunc(ctx, src, dataFileIndex, dataFile, nil /* rejected */); err != nil {
return errors.Wrap(err, dataFile)
}
}
Expand Down
Loading

0 comments on commit 4c22e38

Please sign in to comment.