Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: handle \N as NULL in load data statement #6962

Merged
merged 5 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,55 @@ func generateDatumSlice(vals ...int64) []types.Datum {
}
return datums
}

func (s *testExecSuite) TestGetFieldsFromLine(c *C) {
tests := []struct {
input string
expected []string
}{
{
`"1","a string","100.20"`,
[]string{"1", "a string", "100.20"},
},
{
`"2","a string containing a , comma","102.20"`,
[]string{"2", "a string containing a , comma", "102.20"},
},
{
`"3","a string containing a \" quote","102.20"`,
[]string{"3", "a string containing a \" quote", "102.20"},
},
{
`"4","a string containing a \", quote and comma","102.20"`,
[]string{"4", "a string containing a \", quote and comma", "102.20"},
},
// Test some escape char.
{
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
}

ldInfo := LoadDataInfo{
FieldsInfo: &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
},
}

for _, test := range tests {
got, err := ldInfo.getFieldsFromLine([]byte(test.input))
c.Assert(err, IsNil, Commentf("failed: %s", test.input))
assertEqualStrings(c, got, test.expected)
}

_, err := ldInfo.getFieldsFromLine([]byte(`1,a string,100.20`))
c.Assert(err, NotNil)
}

func assertEqualStrings(c *C, got []field, expect []string) {
c.Assert(len(got), Equals, len(expect))
for i := 0; i < len(got); i++ {
c.Assert(string(got[i].str), Equals, expect[i])
}
}
83 changes: 45 additions & 38 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool)
// If the number of inserted rows reaches the batchRows, then the second return value is true.
// If prevData isn't nil and curData is nil, there are no other data to deal with and the isEOF is true.
func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error) {
// TODO: support enclosed and escape.
// TODO: support escape.
if len(prevData) == 0 && len(curData) == 0 {
return nil, false, nil
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
curData = nil
}

cols, err := GetFieldsFromLine(line, e.FieldsInfo)
cols, err := e.getFieldsFromLine(line)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -614,55 +614,54 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
return curData, reachLimit, nil
}

// GetFieldsFromLine splits line according to fieldsInfo, this function is exported for testing.
func GetFieldsFromLine(line []byte, fieldsInfo *ast.FieldsClause) ([]string, error) {
type field struct {
str []byte
maybeNull bool
}

// getFieldsFromLine splits line according to fieldsInfo.
func (e *LoadDataInfo) getFieldsFromLine(line []byte) ([]field, error) {
var sep []byte
if fieldsInfo.Enclosed != 0 {
if line[0] != fieldsInfo.Enclosed || line[len(line)-1] != fieldsInfo.Enclosed {
return nil, errors.Errorf("line %s should begin and end with %c", string(line), fieldsInfo.Enclosed)
if e.FieldsInfo.Enclosed != 0 {
if line[0] != e.FieldsInfo.Enclosed || line[len(line)-1] != e.FieldsInfo.Enclosed {
return nil, errors.Errorf("line %s should begin and end with %c", string(line), e.FieldsInfo.Enclosed)
}
line = line[1 : len(line)-1]
sep = make([]byte, 0, len(fieldsInfo.Terminated)+2)
sep = append(sep, fieldsInfo.Enclosed)
sep = append(sep, fieldsInfo.Terminated...)
sep = append(sep, fieldsInfo.Enclosed)
sep = make([]byte, 0, len(e.FieldsInfo.Terminated)+2)
sep = append(sep, e.FieldsInfo.Enclosed)
sep = append(sep, e.FieldsInfo.Terminated...)
sep = append(sep, e.FieldsInfo.Enclosed)
} else {
sep = []byte(fieldsInfo.Terminated)
sep = []byte(e.FieldsInfo.Terminated)
}
rawCols := bytes.Split(line, sep)
cols := escapeCols(rawCols)
return cols, nil
}

func escapeCols(strs [][]byte) []string {
ret := make([]string, len(strs))
for i, v := range strs {
output := escape(v)
ret[i] = string(output)
fields := make([]field, 0, len(rawCols))
for _, v := range rawCols {
f := field{v, false}
fields = append(fields, f.escape())
}
return ret
return fields, nil
}

// escape handles escape characters when running load data statement.
// TODO: escape need to be improved, it should support ESCAPED BY to specify
// the escape character and handle \N escape.
// See http://dev.mysql.com/doc/refman/5.7/en/load-data.html
func escape(str []byte) []byte {
// TODO: escape only support '\' as the `ESCAPED BY` character, it should support specify characters.
func (f *field) escape() field {
pos := 0
for i := 0; i < len(str); i++ {
c := str[i]
if c == '\\' && i+1 < len(str) {
c = escapeChar(str[i+1])
for i := 0; i < len(f.str); i++ {
c := f.str[i]
if c == '\\' && i+1 < len(f.str) {
c = f.escapeChar(f.str[i+1])
i++
}

str[pos] = c
f.str[pos] = c
pos++
}
return str[:pos]
return field{f.str[:pos], f.maybeNull}
}

func escapeChar(c byte) byte {
func (f *field) escapeChar(c byte) byte {
switch c {
case '0':
return 0
Expand All @@ -676,19 +675,27 @@ func escapeChar(c byte) byte {
return '\t'
case 'Z':
return 26
case '\\':
return '\\'
case 'N':
f.maybeNull = true
return c
default:
return c
}
return c
}

func (e *LoadDataInfo) colsToRow(cols []string) types.DatumRow {
func (e *LoadDataInfo) colsToRow(cols []field) types.DatumRow {
for i := 0; i < len(e.row); i++ {
if i >= len(cols) {
e.row[i].SetString("")
e.row[i].SetNull()
continue
}
e.row[i].SetString(cols[i])
// The field with only "\N" in it is handled as NULL in the csv file.
// See http://dev.mysql.com/doc/refman/5.7/en/load-data.html
if cols[i].maybeNull && string(cols[i].str) == "N" {
e.row[i].SetNull()
} else {
e.row[i].SetString(string(cols[i].str))
}
}
row, err := e.insertVal.fillRowData(e.columns, e.row, true)
if err != nil {
Expand Down
98 changes: 26 additions & 72 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,13 +1103,13 @@ func (s *testSuite) TestLoadData(c *C) {
// fields and lines are default, InsertData returns data is nil
tests := []testCase{
// data1 = nil, data2 != nil
{nil, []byte("\n"), []string{"1|0||0"}, nil},
{nil, []byte("\t\n"), []string{"2|0||0"}, nil},
{nil, []byte("\n"), []string{"1|<nil>|<nil>|<nil>"}, nil},
{nil, []byte("\t\n"), []string{"2|0|<nil>|<nil>"}, nil},
{nil, []byte("3\t2\t3\t4\n"), []string{"3|2|3|4"}, nil},
{nil, []byte("3*1\t2\t3\t4\n"), []string{"3|2|3|4"}, nil},
{nil, []byte("4\t2\t\t3\t4\n"), []string{"4|2||3"}, nil},
{nil, []byte("\t1\t2\t3\t4\n"), []string{"5|1|2|3"}, nil},
{nil, []byte("6\t2\t3\n"), []string{"6|2|3|0"}, nil},
{nil, []byte("6\t2\t3\n"), []string{"6|2|3|<nil>"}, nil},
{nil, []byte("\t2\t3\t4\n\t22\t33\t44\n"), []string{"7|2|3|4", "8|22|33|44"}, nil},
{nil, []byte("7\t2\t3\t4\n7\t22\t33\t44\n"), []string{"7|2|3|4"}, nil},

Expand All @@ -1124,7 +1124,7 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("\t2\t3"), []byte("\t4\t5"), nil, []byte("\t2\t3\t4\t5")},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
c.Assert(sc.WarningCount(), Equals, uint16(3))
c.Assert(sc.WarningCount(), Equals, uint16(1))

// lines starting symbol is "" and terminated symbol length is 2, InsertData returns data is nil
ld.LinesInfo.Terminated = "||"
Expand All @@ -1135,9 +1135,9 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("2\t2\t3\t4\t5|"), []byte("|3\t22\t33\t44\t55||"),
[]string{"2|2|3|4", "3|22|33|44"}, nil},
{[]byte("3\t2\t3\t4\t5|"), []byte("|4\t22\t33||"), []string{
"3|2|3|4", "4|22|33|0"}, nil},
"3|2|3|4", "4|22|33|<nil>"}, nil},
{[]byte("4\t2\t3\t4\t5|"), []byte("|5\t22\t33||6\t222||"),
[]string{"4|2|3|4", "5|22|33|0", "6|222||0"}, nil},
[]string{"4|2|3|4", "5|22|33|<nil>", "6|222|<nil>|<nil>"}, nil},
{[]byte("6\t2\t3"), []byte("4\t5||"), []string{"6|2|34|5"}, nil},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
Expand All @@ -1148,12 +1148,12 @@ func (s *testSuite) TestLoadData(c *C) {
ld.LinesInfo.Terminated = "|!#^"
tests = []testCase{
// data1 = nil, data2 != nil
{nil, []byte("xxx|!#^"), []string{"13|0||0"}, nil},
{nil, []byte("xxx\\|!#^"), []string{"14|0||0"}, nil},
{nil, []byte("xxx|!#^"), []string{"13|<nil>|<nil>|<nil>"}, nil},
{nil, []byte("xxx\\|!#^"), []string{"14|0|<nil>|<nil>"}, nil},
{nil, []byte("xxx3\\2\\3\\4|!#^"), []string{"3|2|3|4"}, nil},
{nil, []byte("xxx4\\2\\\\3\\4|!#^"), []string{"4|2||3"}, nil},
{nil, []byte("xxx\\1\\2\\3\\4|!#^"), []string{"15|1|2|3"}, nil},
{nil, []byte("xxx6\\2\\3|!#^"), []string{"6|2|3|0"}, nil},
{nil, []byte("xxx6\\2\\3|!#^"), []string{"6|2|3|<nil>"}, nil},
{nil, []byte("xxx\\2\\3\\4|!#^xxx\\22\\33\\44|!#^"), []string{
"16|2|3|4",
"17|22|33|44"}, nil},
Expand All @@ -1170,27 +1170,27 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("xxx10\\2\\3"), []byte("\\4|!#^"),
[]string{"10|2|3|4"}, nil},
{[]byte("10\\2\\3xx"), []byte("x11\\4\\5|!#^"),
[]string{"11|4|5|0"}, nil},
[]string{"11|4|5|<nil>"}, nil},
{[]byte("xxx21\\2\\3\\4\\5|!"), []byte("#^"),
[]string{"21|2|3|4"}, nil},
{[]byte("xxx22\\2\\3\\4\\5|!"), []byte("#^xxx23\\22\\33\\44\\55|!#^"),
[]string{"22|2|3|4", "23|22|33|44"}, nil},
{[]byte("xxx23\\2\\3\\4\\5|!"), []byte("#^xxx24\\22\\33|!#^"),
[]string{"23|2|3|4", "24|22|33|0"}, nil},
[]string{"23|2|3|4", "24|22|33|<nil>"}, nil},
{[]byte("xxx24\\2\\3\\4\\5|!"), []byte("#^xxx25\\22\\33|!#^xxx26\\222|!#^"),
[]string{"24|2|3|4", "25|22|33|0", "26|222||0"}, nil},
[]string{"24|2|3|4", "25|22|33|<nil>", "26|222|<nil>|<nil>"}, nil},
{[]byte("xxx25\\2\\3\\4\\5|!"), []byte("#^26\\22\\33|!#^xxx27\\222|!#^"),
[]string{"25|2|3|4", "27|222||0"}, nil},
[]string{"25|2|3|4", "27|222|<nil>|<nil>"}, nil},
{[]byte("xxx\\2\\3"), []byte("4\\5|!#^"), []string{"28|2|34|5"}, nil},

// InsertData returns data isn't nil
{nil, []byte("\\2\\3\\4|!#^"), nil, []byte("#^")},
{nil, []byte("\\4\\5"), nil, []byte("\\5")},
{[]byte("\\2\\3"), []byte("\\4\\5"), nil, []byte("\\5")},
{[]byte("xxx1\\2\\3|"), []byte("!#^\\4\\5|!#"),
[]string{"1|2|3|0"}, []byte("!#")},
[]string{"1|2|3|<nil>"}, []byte("!#")},
{[]byte("xxx1\\2\\3\\4\\5|!"), []byte("#^xxx2\\22\\33|!#^3\\222|!#^"),
[]string{"1|2|3|4", "2|22|33|0"}, []byte("#^")},
[]string{"1|2|3|4", "2|22|33|<nil>"}, []byte("#^")},
{[]byte("xx1\\2\\3"), []byte("\\4\\5|!#^"), nil, []byte("#^")},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
Expand All @@ -1199,7 +1199,7 @@ func (s *testSuite) TestLoadData(c *C) {
ld.LinesInfo.Terminated = "xxx"
tests = []testCase{
// data1 = nil, data2 != nil
{nil, []byte("xxxxxx"), []string{"29|0||0"}, nil},
{nil, []byte("xxxxxx"), []string{"29|<nil>|<nil>|<nil>"}, nil},
{nil, []byte("xxx3\\2\\3\\4xxx"), []string{"3|2|3|4"}, nil},
{nil, []byte("xxx\\2\\3\\4xxxxxx\\22\\33\\44xxx"),
[]string{"30|2|3|4", "31|22|33|44"}, nil},
Expand All @@ -1214,19 +1214,19 @@ func (s *testSuite) TestLoadData(c *C) {
{[]byte("xxx32\\2\\3\\4\\5x"), []byte("xxxxx33\\22\\33\\44\\55xxx"),
[]string{"32|2|3|4", "33|22|33|44"}, nil},
{[]byte("xxx33\\2\\3\\4\\5xxx"), []byte("xxx34\\22\\33xxx"),
[]string{"33|2|3|4", "34|22|33|0"}, nil},
[]string{"33|2|3|4", "34|22|33|<nil>"}, nil},
{[]byte("xxx34\\2\\3\\4\\5xx"), []byte("xxxx35\\22\\33xxxxxx36\\222xxx"),
[]string{"34|2|3|4", "35|22|33|0", "36|222||0"}, nil},
[]string{"34|2|3|4", "35|22|33|<nil>", "36|222|<nil>|<nil>"}, nil},

// InsertData returns data isn't nil
{nil, []byte("\\2\\3\\4xxxx"), nil, []byte("xxxx")},
{[]byte("\\2\\3\\4xxx"), nil, []string{"37|0||0"}, nil},
{[]byte("\\2\\3\\4xxx"), nil, []string{"37|<nil>|<nil>|<nil>"}, nil},
{[]byte("\\2\\3\\4xxxxxx11\\22\\33\\44xxx"), nil,
[]string{"38|0||0", "39|0||0"}, nil},
[]string{"38|<nil>|<nil>|<nil>", "39|<nil>|<nil>|<nil>"}, nil},
{[]byte("xx10\\2\\3"), []byte("\\4\\5xxx"), nil, []byte("xxx")},
{[]byte("xxx10\\2\\3"), []byte("\\4xxxx"), []string{"10|2|3|4"}, []byte("x")},
{[]byte("xxx10\\2\\3\\4\\5x"), []byte("xx11\\22\\33xxxxxx12\\222xxx"),
[]string{"10|2|3|4", "40|0||0"}, []byte("xxx")},
[]string{"10|2|3|4", "40|<nil>|<nil>|<nil>"}, []byte("xxx")},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}
Expand All @@ -1247,14 +1247,16 @@ func (s *testSuite) TestLoadDataEscape(c *C) {
{nil, []byte("4\tboth \\t\\n\n"), []string{"4|both \t\n"}, nil},
{nil, []byte("5\tstr \\\\\n"), []string{"5|str \\"}, nil},
{nil, []byte("6\t\\r\\t\\n\\0\\Z\\b\n"), []string{"6|" + string([]byte{'\r', '\t', '\n', 0, 26, '\b'})}, nil},
{nil, []byte("7\trtn0ZbN\n"), []string{"7|" + string([]byte{'r', 't', 'n', '0', 'Z', 'b', 'N'})}, nil},
{nil, []byte("8\trtn0Zb\\N\n"), []string{"8|" + string([]byte{'r', 't', 'n', '0', 'Z', 'b', 'N'})}, nil},
}
deleteSQL := "delete from load_data_test"
selectSQL := "select * from load_data_test;"
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

// TestLoadDataSpecifiedCoumns reuse TestLoadDataEscape's test case :-)
func (s *testSuite) TestLoadDataSpecifiedCoumns(c *C) {
// TestLoadDataSpecifiedColumns reuse TestLoadDataEscape's test case :-)
func (s *testSuite) TestLoadDataSpecifiedColumns(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test; drop table if exists load_data_test;")
tk.MustExec(`create table load_data_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 varchar(255) default "def", c3 int default 0);`)
Expand All @@ -1270,6 +1272,7 @@ func (s *testSuite) TestLoadDataSpecifiedCoumns(c *C) {
{nil, []byte("10\tboth \\t\\n\n"), []string{"4|10|both \t\n|0"}, nil},
{nil, []byte("11\tstr \\\\\n"), []string{"5|11|str \\|0"}, nil},
{nil, []byte("12\t\\r\\t\\n\\0\\Z\\b\n"), []string{"6|12|" + string([]byte{'\r', '\t', '\n', 0, 26, '\b'}) + "|0"}, nil},
{nil, []byte("\\N\ta string\n"), []string{"7|<nil>|a string|0"}, nil},
}
deleteSQL := "delete from load_data_test"
selectSQL := "select * from load_data_test;"
Expand Down Expand Up @@ -1415,55 +1418,6 @@ func (s *testSuite) TestNullDefault(c *C) {
tk.MustQuery("select * from test_null_default").Check(testkit.Rows("<nil>", "1970-01-01 08:20:34"))
}

func (s *testSuite) TestGetFieldsFromLine(c *C) {
tests := []struct {
input string
expected []string
}{
{
`"1","a string","100.20"`,
[]string{"1", "a string", "100.20"},
},
{
`"2","a string containing a , comma","102.20"`,
[]string{"2", "a string containing a , comma", "102.20"},
},
{
`"3","a string containing a \" quote","102.20"`,
[]string{"3", "a string containing a \" quote", "102.20"},
},
{
`"4","a string containing a \", quote and comma","102.20"`,
[]string{"4", "a string containing a \", quote and comma", "102.20"},
},
// Test some escape char.
{
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
}
fieldsInfo := &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
}

for _, test := range tests {
got, err := executor.GetFieldsFromLine([]byte(test.input), fieldsInfo)
c.Assert(err, IsNil, Commentf("failed: %s", test.input))
assertEqualStrings(c, got, test.expected)
}

_, err := executor.GetFieldsFromLine([]byte(`1,a string,100.20`), fieldsInfo)
c.Assert(err, NotNil)
}

func assertEqualStrings(c *C, got []string, expect []string) {
c.Assert(len(got), Equals, len(expect))
for i := 0; i < len(got); i++ {
c.Assert(got[i], Equals, expect[i])
}
}

// TestIssue4067 Test issue https://github.com/pingcap/tidb/issues/4067
func (s *testSuite) TestIssue4067(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down