Skip to content

Commit

Permalink
executor: handle \N as NULL in load data statement (#6962)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and zz-jason committed Jul 3, 2018
1 parent bfcea6d commit b236944
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 110 deletions.
52 changes: 52 additions & 0 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,55 @@ func generateDatumSlice(vals ...int64) []types.Datum {
}
return datums
}

func (s *testExecSuite) TestGetFieldsFromLine(c *C) {
tests := []struct {
input string
expected []string
}{
{
`"1","a string","100.20"`,
[]string{"1", "a string", "100.20"},
},
{
`"2","a string containing a , comma","102.20"`,
[]string{"2", "a string containing a , comma", "102.20"},
},
{
`"3","a string containing a \" quote","102.20"`,
[]string{"3", "a string containing a \" quote", "102.20"},
},
{
`"4","a string containing a \", quote and comma","102.20"`,
[]string{"4", "a string containing a \", quote and comma", "102.20"},
},
// Test some escape char.
{
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
}

ldInfo := LoadDataInfo{
FieldsInfo: &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
},
}

for _, test := range tests {
got, err := ldInfo.getFieldsFromLine([]byte(test.input))
c.Assert(err, IsNil, Commentf("failed: %s", test.input))
assertEqualStrings(c, got, test.expected)
}

_, err := ldInfo.getFieldsFromLine([]byte(`1,a string,100.20`))
c.Assert(err, NotNil)
}

func assertEqualStrings(c *C, got []field, expect []string) {
c.Assert(len(got), Equals, len(expect))
for i := 0; i < len(got); i++ {
c.Assert(string(got[i].str), Equals, expect[i])
}
}
83 changes: 45 additions & 38 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool)
// If the number of inserted rows reaches the batchRows, then the second return value is true.
// If prevData isn't nil and curData is nil, there are no other data to deal with and the isEOF is true.
func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error) {
// TODO: support enclosed and escape.
// TODO: support escape.
if len(prevData) == 0 && len(curData) == 0 {
return nil, false, nil
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
curData = nil
}

cols, err := GetFieldsFromLine(line, e.FieldsInfo)
cols, err := e.getFieldsFromLine(line)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -614,55 +614,54 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
return curData, reachLimit, nil
}

// GetFieldsFromLine splits line according to fieldsInfo, this function is exported for testing.
func GetFieldsFromLine(line []byte, fieldsInfo *ast.FieldsClause) ([]string, error) {
type field struct {
str []byte
maybeNull bool
}

// getFieldsFromLine splits line according to fieldsInfo.
func (e *LoadDataInfo) getFieldsFromLine(line []byte) ([]field, error) {
var sep []byte
if fieldsInfo.Enclosed != 0 {
if line[0] != fieldsInfo.Enclosed || line[len(line)-1] != fieldsInfo.Enclosed {
return nil, errors.Errorf("line %s should begin and end with %c", string(line), fieldsInfo.Enclosed)
if e.FieldsInfo.Enclosed != 0 {
if line[0] != e.FieldsInfo.Enclosed || line[len(line)-1] != e.FieldsInfo.Enclosed {
return nil, errors.Errorf("line %s should begin and end with %c", string(line), e.FieldsInfo.Enclosed)
}
line = line[1 : len(line)-1]
sep = make([]byte, 0, len(fieldsInfo.Terminated)+2)
sep = append(sep, fieldsInfo.Enclosed)
sep = append(sep, fieldsInfo.Terminated...)
sep = append(sep, fieldsInfo.Enclosed)
sep = make([]byte, 0, len(e.FieldsInfo.Terminated)+2)
sep = append(sep, e.FieldsInfo.Enclosed)
sep = append(sep, e.FieldsInfo.Terminated...)
sep = append(sep, e.FieldsInfo.Enclosed)
} else {
sep = []byte(fieldsInfo.Terminated)
sep = []byte(e.FieldsInfo.Terminated)
}
rawCols := bytes.Split(line, sep)
cols := escapeCols(rawCols)
return cols, nil
}

func escapeCols(strs [][]byte) []string {
ret := make([]string, len(strs))
for i, v := range strs {
output := escape(v)
ret[i] = string(output)
fields := make([]field, 0, len(rawCols))
for _, v := range rawCols {
f := field{v, false}
fields = append(fields, f.escape())
}
return ret
return fields, nil
}

// escape handles escape characters when running load data statement.
// TODO: escape need to be improved, it should support ESCAPED BY to specify
// the escape character and handle \N escape.
// See http://dev.mysql.com/doc/refman/5.7/en/load-data.html
func escape(str []byte) []byte {
// TODO: escape only support '\' as the `ESCAPED BY` character, it should support specify characters.
func (f *field) escape() field {
pos := 0
for i := 0; i < len(str); i++ {
c := str[i]
if c == '\\' && i+1 < len(str) {
c = escapeChar(str[i+1])
for i := 0; i < len(f.str); i++ {
c := f.str[i]
if c == '\\' && i+1 < len(f.str) {
c = f.escapeChar(f.str[i+1])
i++
}

str[pos] = c
f.str[pos] = c
pos++
}
return str[:pos]
return field{f.str[:pos], f.maybeNull}
}

func escapeChar(c byte) byte {
func (f *field) escapeChar(c byte) byte {
switch c {
case '0':
return 0
Expand All @@ -676,19 +675,27 @@ func escapeChar(c byte) byte {
return '\t'
case 'Z':
return 26
case '\\':
return '\\'
case 'N':
f.maybeNull = true
return c
default:
return c
}
return c
}

func (e *LoadDataInfo) colsToRow(cols []string) types.DatumRow {
func (e *LoadDataInfo) colsToRow(cols []field) types.DatumRow {
for i := 0; i < len(e.row); i++ {
if i >= len(cols) {
e.row[i].SetString("")
e.row[i].SetNull()
continue
}
e.row[i].SetString(cols[i])
// The field with only "\N" in it is handled as NULL in the csv file.
// See http://dev.mysql.com/doc/refman/5.7/en/load-data.html
if cols[i].maybeNull && string(cols[i].str) == "N" {
e.row[i].SetNull()
} else {
e.row[i].SetString(string(cols[i].str))
}
}
row, err := e.insertVal.fillRowData(e.columns, e.row, true)
if err != nil {
Expand Down
98 changes: 26 additions & 72 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,13 +1103,13 @@ func (s *testSuite) TestLoadData(c *C) {
// fields and lines are default, InsertData returns data is nil
tests := []testCase{
// data1 = nil, data2 != nil
{nil, []byte("\n"), []string{"1|0||0"}, nil},
{nil, []byte("\t\n"), []string{"2|0||0"}, nil},
{nil, []byte("\n"), []string{"1|<nil>|<nil>|<nil>"}, nil},
{nil, []byte("\t\n"), []string{"2|0|<nil>|<nil>"}, nil},
{nil, []byte("3\t2\t3\t4\n"), []string{"3|2|3|4"}, nil},
{nil, []byte("3*1\t2\t3\t4\n"), []string{"3|2|3|4"}, nil},
{nil, []byte("4\t2\t\t3\t4\n"), []string{"4|2||3"}, nil},
{nil, []byte("\t1\t2\t3\t4\n"), []string{"5|1|2|3"}, nil},
{nil, []byte("6\t2\t3\n"), []string{"6|2|3|0"}, nil},
{nil, []byte("6\t2\t3\n"), []string{"6|2|3|<nil>"}, nil},
{nil, []byte("\t2\t3\t4\n\t22\t33\t44\n"), []string{"7|2|3|4", "8|22|33|44"}, nil},
{nil, []byte("7\t2\t3\t4\n7\t22\t33\t44\n"), []string{"7|2|3|4"}, nil},

Expand All @@ -1124,7 +1124,7 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("\t2\t3"), []byte("\t4\t5"), nil, []byte("\t2\t3\t4\t5")},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
c.Assert(sc.WarningCount(), Equals, uint16(3))
c.Assert(sc.WarningCount(), Equals, uint16(1))

// lines starting symbol is "" and terminated symbol length is 2, InsertData returns data is nil
ld.LinesInfo.Terminated = "||"
Expand All @@ -1135,9 +1135,9 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("2\t2\t3\t4\t5|"), []byte("|3\t22\t33\t44\t55||"),
[]string{"2|2|3|4", "3|22|33|44"}, nil},
{[]byte("3\t2\t3\t4\t5|"), []byte("|4\t22\t33||"), []string{
"3|2|3|4", "4|22|33|0"}, nil},
"3|2|3|4", "4|22|33|<nil>"}, nil},
{[]byte("4\t2\t3\t4\t5|"), []byte("|5\t22\t33||6\t222||"),
[]string{"4|2|3|4", "5|22|33|0", "6|222||0"}, nil},
[]string{"4|2|3|4", "5|22|33|<nil>", "6|222|<nil>|<nil>"}, nil},
{[]byte("6\t2\t3"), []byte("4\t5||"), []string{"6|2|34|5"}, nil},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
Expand All @@ -1148,12 +1148,12 @@ func (s *testSuite) TestLoadData(c *C) {
ld.LinesInfo.Terminated = "|!#^"
tests = []testCase{
// data1 = nil, data2 != nil
{nil, []byte("xxx|!#^"), []string{"13|0||0"}, nil},
{nil, []byte("xxx\\|!#^"), []string{"14|0||0"}, nil},
{nil, []byte("xxx|!#^"), []string{"13|<nil>|<nil>|<nil>"}, nil},
{nil, []byte("xxx\\|!#^"), []string{"14|0|<nil>|<nil>"}, nil},
{nil, []byte("xxx3\\2\\3\\4|!#^"), []string{"3|2|3|4"}, nil},
{nil, []byte("xxx4\\2\\\\3\\4|!#^"), []string{"4|2||3"}, nil},
{nil, []byte("xxx\\1\\2\\3\\4|!#^"), []string{"15|1|2|3"}, nil},
{nil, []byte("xxx6\\2\\3|!#^"), []string{"6|2|3|0"}, nil},
{nil, []byte("xxx6\\2\\3|!#^"), []string{"6|2|3|<nil>"}, nil},
{nil, []byte("xxx\\2\\3\\4|!#^xxx\\22\\33\\44|!#^"), []string{
"16|2|3|4",
"17|22|33|44"}, nil},
Expand All @@ -1170,27 +1170,27 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("xxx10\\2\\3"), []byte("\\4|!#^"),
[]string{"10|2|3|4"}, nil},
{[]byte("10\\2\\3xx"), []byte("x11\\4\\5|!#^"),
[]string{"11|4|5|0"}, nil},
[]string{"11|4|5|<nil>"}, nil},
{[]byte("xxx21\\2\\3\\4\\5|!"), []byte("#^"),
[]string{"21|2|3|4"}, nil},
{[]byte("xxx22\\2\\3\\4\\5|!"), []byte("#^xxx23\\22\\33\\44\\55|!#^"),
[]string{"22|2|3|4", "23|22|33|44"}, nil},
{[]byte("xxx23\\2\\3\\4\\5|!"), []byte("#^xxx24\\22\\33|!#^"),
[]string{"23|2|3|4", "24|22|33|0"}, nil},
[]string{"23|2|3|4", "24|22|33|<nil>"}, nil},
{[]byte("xxx24\\2\\3\\4\\5|!"), []byte("#^xxx25\\22\\33|!#^xxx26\\222|!#^"),
[]string{"24|2|3|4", "25|22|33|0", "26|222||0"}, nil},
[]string{"24|2|3|4", "25|22|33|<nil>", "26|222|<nil>|<nil>"}, nil},
{[]byte("xxx25\\2\\3\\4\\5|!"), []byte("#^26\\22\\33|!#^xxx27\\222|!#^"),
[]string{"25|2|3|4", "27|222||0"}, nil},
[]string{"25|2|3|4", "27|222|<nil>|<nil>"}, nil},
{[]byte("xxx\\2\\3"), []byte("4\\5|!#^"), []string{"28|2|34|5"}, nil},

// InsertData returns data isn't nil
{nil, []byte("\\2\\3\\4|!#^"), nil, []byte("#^")},
{nil, []byte("\\4\\5"), nil, []byte("\\5")},
{[]byte("\\2\\3"), []byte("\\4\\5"), nil, []byte("\\5")},
{[]byte("xxx1\\2\\3|"), []byte("!#^\\4\\5|!#"),
[]string{"1|2|3|0"}, []byte("!#")},
[]string{"1|2|3|<nil>"}, []byte("!#")},
{[]byte("xxx1\\2\\3\\4\\5|!"), []byte("#^xxx2\\22\\33|!#^3\\222|!#^"),
[]string{"1|2|3|4", "2|22|33|0"}, []byte("#^")},
[]string{"1|2|3|4", "2|22|33|<nil>"}, []byte("#^")},
{[]byte("xx1\\2\\3"), []byte("\\4\\5|!#^"), nil, []byte("#^")},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
Expand All @@ -1199,7 +1199,7 @@ func (s *testSuite) TestLoadData(c *C) {
ld.LinesInfo.Terminated = "xxx"
tests = []testCase{
// data1 = nil, data2 != nil
{nil, []byte("xxxxxx"), []string{"29|0||0"}, nil},
{nil, []byte("xxxxxx"), []string{"29|<nil>|<nil>|<nil>"}, nil},
{nil, []byte("xxx3\\2\\3\\4xxx"), []string{"3|2|3|4"}, nil},
{nil, []byte("xxx\\2\\3\\4xxxxxx\\22\\33\\44xxx"),
[]string{"30|2|3|4", "31|22|33|44"}, nil},
Expand All @@ -1214,19 +1214,19 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("xxx32\\2\\3\\4\\5x"), []byte("xxxxx33\\22\\33\\44\\55xxx"),
[]string{"32|2|3|4", "33|22|33|44"}, nil},
{[]byte("xxx33\\2\\3\\4\\5xxx"), []byte("xxx34\\22\\33xxx"),
[]string{"33|2|3|4", "34|22|33|0"}, nil},
[]string{"33|2|3|4", "34|22|33|<nil>"}, nil},
{[]byte("xxx34\\2\\3\\4\\5xx"), []byte("xxxx35\\22\\33xxxxxx36\\222xxx"),
[]string{"34|2|3|4", "35|22|33|0", "36|222||0"}, nil},
[]string{"34|2|3|4", "35|22|33|<nil>", "36|222|<nil>|<nil>"}, nil},

// InsertData returns data isn't nil
{nil, []byte("\\2\\3\\4xxxx"), nil, []byte("xxxx")},
{[]byte("\\2\\3\\4xxx"), nil, []string{"37|0||0"}, nil},
{[]byte("\\2\\3\\4xxx"), nil, []string{"37|<nil>|<nil>|<nil>"}, nil},
{[]byte("\\2\\3\\4xxxxxx11\\22\\33\\44xxx"), nil,
[]string{"38|0||0", "39|0||0"}, nil},
[]string{"38|<nil>|<nil>|<nil>", "39|<nil>|<nil>|<nil>"}, nil},
{[]byte("xx10\\2\\3"), []byte("\\4\\5xxx"), nil, []byte("xxx")},
{[]byte("xxx10\\2\\3"), []byte("\\4xxxx"), []string{"10|2|3|4"}, []byte("x")},
{[]byte("xxx10\\2\\3\\4\\5x"), []byte("xx11\\22\\33xxxxxx12\\222xxx"),
[]string{"10|2|3|4", "40|0||0"}, []byte("xxx")},
[]string{"10|2|3|4", "40|<nil>|<nil>|<nil>"}, []byte("xxx")},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}
Expand All @@ -1247,14 +1247,16 @@ func (s *testSuite) TestLoadDataEscape(c *C) {
{nil, []byte("4\tboth \\t\\n\n"), []string{"4|both \t\n"}, nil},
{nil, []byte("5\tstr \\\\\n"), []string{"5|str \\"}, nil},
{nil, []byte("6\t\\r\\t\\n\\0\\Z\\b\n"), []string{"6|" + string([]byte{'\r', '\t', '\n', 0, 26, '\b'})}, nil},
{nil, []byte("7\trtn0ZbN\n"), []string{"7|" + string([]byte{'r', 't', 'n', '0', 'Z', 'b', 'N'})}, nil},
{nil, []byte("8\trtn0Zb\\N\n"), []string{"8|" + string([]byte{'r', 't', 'n', '0', 'Z', 'b', 'N'})}, nil},
}
deleteSQL := "delete from load_data_test"
selectSQL := "select * from load_data_test;"
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

// TestLoadDataSpecifiedCoumns reuse TestLoadDataEscape's test case :-)
func (s *testSuite) TestLoadDataSpecifiedCoumns(c *C) {
// TestLoadDataSpecifiedColumns reuse TestLoadDataEscape's test case :-)
func (s *testSuite) TestLoadDataSpecifiedColumns(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test; drop table if exists load_data_test;")
tk.MustExec(`create table load_data_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 varchar(255) default "def", c3 int default 0);`)
Expand All @@ -1270,6 +1272,7 @@ func (s *testSuite) TestLoadDataSpecifiedCoumns(c *C) {
{nil, []byte("10\tboth \\t\\n\n"), []string{"4|10|both \t\n|0"}, nil},
{nil, []byte("11\tstr \\\\\n"), []string{"5|11|str \\|0"}, nil},
{nil, []byte("12\t\\r\\t\\n\\0\\Z\\b\n"), []string{"6|12|" + string([]byte{'\r', '\t', '\n', 0, 26, '\b'}) + "|0"}, nil},
{nil, []byte("\\N\ta string\n"), []string{"7|<nil>|a string|0"}, nil},
}
deleteSQL := "delete from load_data_test"
selectSQL := "select * from load_data_test;"
Expand Down Expand Up @@ -1415,55 +1418,6 @@ func (s *testSuite) TestNullDefault(c *C) {
tk.MustQuery("select * from test_null_default").Check(testkit.Rows("<nil>", "1970-01-01 08:20:34"))
}

func (s *testSuite) TestGetFieldsFromLine(c *C) {
tests := []struct {
input string
expected []string
}{
{
`"1","a string","100.20"`,
[]string{"1", "a string", "100.20"},
},
{
`"2","a string containing a , comma","102.20"`,
[]string{"2", "a string containing a , comma", "102.20"},
},
{
`"3","a string containing a \" quote","102.20"`,
[]string{"3", "a string containing a \" quote", "102.20"},
},
{
`"4","a string containing a \", quote and comma","102.20"`,
[]string{"4", "a string containing a \", quote and comma", "102.20"},
},
// Test some escape char.
{
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
}
fieldsInfo := &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
}

for _, test := range tests {
got, err := executor.GetFieldsFromLine([]byte(test.input), fieldsInfo)
c.Assert(err, IsNil, Commentf("failed: %s", test.input))
assertEqualStrings(c, got, test.expected)
}

_, err := executor.GetFieldsFromLine([]byte(`1,a string,100.20`), fieldsInfo)
c.Assert(err, NotNil)
}

func assertEqualStrings(c *C, got []string, expect []string) {
c.Assert(len(got), Equals, len(expect))
for i := 0; i < len(got); i++ {
c.Assert(got[i], Equals, expect[i])
}
}

// TestIssue4067 Test issue https://github.com/pingcap/tidb/issues/4067
func (s *testSuite) TestIssue4067(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down

0 comments on commit b236944

Please sign in to comment.