Skip to content

Commit

Permalink
importccl: enabling saving of bad csv rows to a side file
Browse files Browse the repository at this point in the history
When importing from large csv files it's common to have a few offending rows.
Currently `IMPORT` will abort at the first error which makes for a tedious
task of manually fixing ech problem and re-running. Instead users can specify
new option `WITH experimental_save_rejected` which will not stop on bad rows
but save them in a side file called `<original_csv_file>.rejected` and
continue. The user then can re-run the import command using `IMPORT INTO` using
the rejected file after fixing the problems in it.

Release note (sql change): enable skipping of faulty rows in IMPORT.
  • Loading branch information
Spas Bojanov committed Oct 14, 2019
1 parent f8d702e commit 97788de
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 224 deletions.
13 changes: 10 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
importOptionOversample = "oversample"
importOptionSkipFKs = "skip_foreign_keys"
importOptionDisableGlobMatch = "disable_glob_matching"
importOptionSaveRejected = "experimental_save_rejected"

pgCopyDelimiter = "delimiter"
pgCopyNull = "nullif"
Expand All @@ -75,9 +76,10 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
mysqlOutfileEnclose: sql.KVStringOptRequireValue,
mysqlOutfileEscape: sql.KVStringOptRequireValue,

importOptionSSTSize: sql.KVStringOptRequireValue,
importOptionDecompress: sql.KVStringOptRequireValue,
importOptionOversample: sql.KVStringOptRequireValue,
importOptionSSTSize: sql.KVStringOptRequireValue,
importOptionDecompress: sql.KVStringOptRequireValue,
importOptionOversample: sql.KVStringOptRequireValue,
importOptionSaveRejected: sql.KVStringOptRequireNoValue,

importOptionSkipFKs: sql.KVStringOptRequireNoValue,
importOptionDisableGlobMatch: sql.KVStringOptRequireNoValue,
Expand Down Expand Up @@ -311,6 +313,11 @@ func importPlanHook(
if override, ok := opts[csvNullIf]; ok {
format.MysqlOut.NullEncoding = &override
}
// TODO(spaskob): Refactor so that the save rejected option
// is passed in all import formats not just DELIMITED.
if _, ok := opts[importOptionSaveRejected]; ok {
format.MysqlOut.SaveRejected = true
}
case "MYSQLDUMP":
telemetry.Count("import.format.mysqldump")
format.Format = roachpb.IOFileFormat_Mysqldump
Expand Down
277 changes: 181 additions & 96 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand All @@ -62,13 +63,14 @@ func TestImportData(t *testing.T) {
sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`)

tests := []struct {
name string
create string
with string
typ string
data string
err string
query map[string][][]string
name string
create string
with string
typ string
data string
err string
rejected string
query map[string][][]string
}{
{
name: "duplicate unique index key",
Expand Down Expand Up @@ -236,33 +238,64 @@ d
},

// MySQL OUTFILE
// If err field is non-empty, the query filed specifies what expect
// to get from the rows that are parsed correctly (see option experimental_save_rejected).
{
name: "too many imported columns",
create: `i int8`,
name: "empty file",
create: `a string`,
typ: "DELIMITED",
data: "1\t2",
err: "row 1: too many columns, expected 1",
data: "",
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: "cannot parse data",
create: `i int8, j int8`,
name: "empty field",
create: `a string, b string`,
typ: "DELIMITED",
data: "bad_int\t2",
err: "row 1: parse",
data: "\t",
query: map[string][][]string{`SELECT * from t`: {{"", ""}}},
},
{
name: "unexpected number of columns",
create: `a string, b string`,
name: "empty line",
create: `a string`,
typ: "DELIMITED",
data: "1,2",
err: "row 1: unexpected number of columns, expected 2 got 1",
data: "\n",
query: map[string][][]string{`SELECT * from t`: {{""}}},
},
{
name: "unexpected number of columns in 1st row",
create: `a string, b string`,
typ: "DELIMITED",
data: "1,2\n3\t4",
err: "row 1: unexpected number of columns, expected 2 got 1",
name: "too many imported columns",
create: `i int8`,
typ: "DELIMITED",
data: "1\t2\n3",
err: "row 1: too many columns, got 2 expected 1",
rejected: "1\t2\n",
query: map[string][][]string{`SELECT * from t`: {{"3"}}},
},
{
name: "cannot parse data",
create: `i int8, j int8`,
typ: "DELIMITED",
data: "bad_int\t2\n3\t4",
err: "row 1: parse",
rejected: "bad_int\t2\n",
query: map[string][][]string{`SELECT * from t`: {{"3", "4"}}},
},
{
name: "unexpected number of columns",
create: `a string, b string`,
typ: "DELIMITED",
data: "1,2\n3\t4",
err: "row 1: unexpected number of columns, expected 2 got 1",
rejected: "1,2\n",
query: map[string][][]string{`SELECT * from t`: {{"3", "4"}}},
},
{
name: "unexpected number of columns in 1st row",
create: `a string, b string`,
typ: "DELIMITED",
data: "1,2\n3\t4",
err: "row 1: unexpected number of columns, expected 2 got 1",
rejected: "1,2\n",
query: map[string][][]string{`SELECT * from t`: {{"3", "4"}}},
},
{
name: "field enclosure",
Expand Down Expand Up @@ -295,44 +328,54 @@ d
},
},
{
name: "unmatched field enclosure",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "$foo\tnormal",
err: "row 1: unmatched field enclosure at start of field",
},
{
name: "unmatched field enclosure at end",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "foo$\tnormal",
err: "row 1: unmatched field enclosure at end of field",
},
{
name: "unmatched field enclosure 2nd field",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "normal\t$foo",
err: "row 1: unmatched field enclosure at start of field",
},
{
name: "unmatched field enclosure at end 2nd field",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "normal\tfoo$",
err: "row 1: unmatched field enclosure at end of field",
},
{
name: "unmatched literal",
create: `i int8`,
with: `WITH fields_escaped_by = '\'`,
typ: "DELIMITED",
data: `\`,
err: "row 1: unmatched literal",
name: "unmatched field enclosure",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "$foo\tnormal\nbaz\tbar",
err: "row 1: unmatched field enclosure at start of field",
rejected: "$foo\tnormal\nbaz\tbar",
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: "unmatched field enclosure at end",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "foo$\tnormal\nbar\tbaz",
err: "row 1: unmatched field enclosure at end of field",
rejected: "foo$\tnormal\n",
query: map[string][][]string{`SELECT * from t`: {{"bar", "baz"}}},
},
{
name: "unmatched field enclosure 2nd field",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "normal\t$foo",
err: "row 1: unmatched field enclosure at start of field",
rejected: "normal\t$foo",
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: "unmatched field enclosure at end 2nd field",
create: `a string, b string`,
with: `WITH fields_enclosed_by = '$'`,
typ: "DELIMITED",
data: "normal\tfoo$",
err: "row 1: unmatched field enclosure at end of field",
rejected: "normal\tfoo$",
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: "unmatched literal",
create: `i int8`,
with: `WITH fields_escaped_by = '\'`,
typ: "DELIMITED",
data: `\`,
err: "row 1: unmatched literal",
rejected: `\`,
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: "escaped field enclosure",
Expand Down Expand Up @@ -366,20 +409,24 @@ d
},
},
{
name: `\N with trailing char`,
create: `s STRING`,
with: `WITH fields_escaped_by = '\'`,
typ: "DELIMITED",
data: "\\N1",
err: "row 1: unexpected data after null encoding",
name: `\N with trailing char`,
create: `s STRING`,
with: `WITH fields_escaped_by = '\'`,
typ: "DELIMITED",
data: "\\N1\nfoo",
err: "row 1: unexpected data after null encoding",
rejected: "\\N1\n",
query: map[string][][]string{`SELECT * from t`: {{"foo"}}},
},
{
name: `double null`,
create: `s STRING`,
with: `WITH fields_escaped_by = '\'`,
typ: "DELIMITED",
data: "\\N\\N",
err: "row 1: unexpected null encoding",
name: `double null`,
create: `s STRING`,
with: `WITH fields_escaped_by = '\'`,
typ: "DELIMITED",
data: `\N\N`,
err: "row 1: unexpected null encoding",
rejected: `\N\N`,
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: `null and \N without escape`,
Expand Down Expand Up @@ -445,9 +492,7 @@ d
with: `WITH fields_terminated_by = ',', skip = '4'`,
typ: "DELIMITED",
data: "a string, b string\nfoo,normal\nbar,baz",
query: map[string][][]string{
`SELECT * from t`: {},
},
query: map[string][][]string{`SELECT * from t`: {}},
},
{
name: "skip -1 lines",
Expand Down Expand Up @@ -829,10 +874,22 @@ COPY t (a, b, c) FROM stdin;
},
}

var dataString string
var mockRecorder struct {
syncutil.Mutex
dataString, rejectedString string
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mockRecorder.Lock()
defer mockRecorder.Unlock()
if r.Method == "GET" {
fmt.Fprint(w, dataString)
fmt.Fprint(w, mockRecorder.dataString)
}
if r.Method == "PUT" {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}
mockRecorder.rejectedString = string(body)
}
}))
defer srv.Close()
Expand All @@ -842,29 +899,57 @@ COPY t (a, b, c) FROM stdin;
sqlDB.Exec(t, `CREATE TABLE blah (i int8)`)
sqlDB.Exec(t, `DROP TABLE blah`)

for i, tc := range tests {
t.Run(fmt.Sprintf("%s: %s", tc.typ, tc.name), func(t *testing.T) {
dbName := fmt.Sprintf("d%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE DATABASE %s; USE %[1]s`, dbName))
defer sqlDB.Exec(t, fmt.Sprintf(`DROP DATABASE %s`, dbName))
var q string
if tc.create != "" {
q = fmt.Sprintf(`IMPORT TABLE t (%s) %s DATA ($1) %s`, tc.create, tc.typ, tc.with)
} else {
q = fmt.Sprintf(`IMPORT %s ($1) %s`, tc.typ, tc.with)
for _, saveRejected := range []bool{false, true} {
// this test is big and slow as is, so we can't afford to double it in race.
if util.RaceEnabled && saveRejected {
continue
}

for i, tc := range tests {
if tc.typ != "DELIMITED" && saveRejected {
continue
}
t.Log(q)
dataString = tc.data
sqlDB.ExpectErr(t, tc.err, q, srv.URL)
for query, res := range tc.query {
sqlDB.CheckQueryResults(t, query, res)
if saveRejected {
if tc.with == "" {
tc.with = "WITH experimental_save_rejected"
} else {
tc.with += ", experimental_save_rejected"
}
}
})
t.Run(fmt.Sprintf("%s/%s: save_rejected=%v", tc.typ, tc.name, saveRejected), func(t *testing.T) {
dbName := fmt.Sprintf("d%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE DATABASE %s; USE %[1]s`, dbName))
defer sqlDB.Exec(t, fmt.Sprintf(`DROP DATABASE %s`, dbName))
var q string
if tc.create != "" {
q = fmt.Sprintf(`IMPORT TABLE t (%s) %s DATA ($1) %s`, tc.create, tc.typ, tc.with)
} else {
q = fmt.Sprintf(`IMPORT %s ($1) %s`, tc.typ, tc.with)
}
t.Log(q, srv.URL, tc.data)
mockRecorder.dataString = tc.data
mockRecorder.rejectedString = ""
if !saveRejected || tc.rejected == "" {
sqlDB.ExpectErr(t, tc.err, q, srv.URL)
} else {
sqlDB.Exec(t, q, srv.URL)
}
if tc.err == "" || saveRejected {
for query, res := range tc.query {
sqlDB.CheckQueryResults(t, query, res)
}
if tc.rejected != mockRecorder.rejectedString {
t.Errorf("expected:\n<%v>\ngot:\n<%v>\n", tc.rejected,
mockRecorder.rejectedString)
}
}
})
}
}

t.Run("mysqlout multiple", func(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE mysqlout; USE mysqlout`)
dataString = "1"
mockRecorder.dataString = "1"
sqlDB.Exec(t, `IMPORT TABLE t (s STRING) DELIMITED DATA ($1, $1)`, srv.URL)
sqlDB.CheckQueryResults(t, `SELECT * FROM t`, [][]string{{"1"}, {"1"}})
})
Expand Down
Loading

0 comments on commit 97788de

Please sign in to comment.