Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning/parser: add more options to support LOAD DATA #41573

Merged
merged 3 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,21 @@ type CSVConfig struct {
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
// EscapedBy has higher priority than BackslashEscape, currently it must be a single character if set.
EscapedBy string `toml:"escaped-by" json:"escaped-by"`

// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
AllowEmptyLine bool `toml:"-" json:"-"`
// For non-empty Delimiter (for example quotes), null elements inside quotes are not considered as null except for
// `\N` (when escape-by is `\`). That is to say, `\N` is special for null because it always means null.
QuotedNullIsText bool
QuotedNullIsText bool `toml:"-" json:"-"`
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html
// > If the field begins with the ENCLOSED BY character, instances of that character are recognized as terminating a
// > field value only if followed by the field or line TERMINATED BY sequence.
// This means we will meet unescaped quote in a quoted field
// > The "BIG" boss -> The "BIG" boss
// This means we will meet unescaped quote in a unquoted field
UnescapedQuote bool `toml:"-" json:"-"`
}

type MydumperRuntime struct {
Expand Down
48 changes: 40 additions & 8 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type CSVParser struct {
// in LOAD DATA, empty line should be treated as a valid record
allowEmptyLine bool
quotedNullIsText bool
unescapedQuote bool
}

type field struct {
Expand Down Expand Up @@ -173,6 +174,7 @@ func NewCSVParser(
shouldParseHeader: shouldParseHeader,
allowEmptyLine: cfg.AllowEmptyLine,
quotedNullIsText: cfg.QuotedNullIsText,
unescapedQuote: cfg.UnescapedQuote,
}, nil
}

Expand Down Expand Up @@ -276,22 +278,32 @@ func (parser *CSVParser) skipBytes(n int) {
parser.pos += int64(n)
}

// tryReadExact peeks the bytes ahead, and if it matches `content` exactly will
// consume it (advance the cursor) and return `true`.
func (parser *CSVParser) tryReadExact(content []byte) (bool, error) {
// tryPeekExact peeks the bytes ahead, and if it matches `content` exactly will
// return (true, false, nil). If meet EOF it will return (false, true, nil).
// For other errors it will return (false, false, err).
func (parser *CSVParser) tryPeekExact(content []byte) (matched bool, eof bool, err error) {
if len(content) == 0 {
return true, nil
return true, false, nil
}
bs, err := parser.peekBytes(len(content))
if err == nil {
if bytes.Equal(bs, content) {
parser.skipBytes(len(content))
return true, nil
return true, false, nil
}
} else if errors.Cause(err) == io.EOF {
err = nil
return false, true, nil
}
return false, err
return false, false, err
}

// tryReadExact peeks the bytes ahead, and if it matches `content` exactly will
// consume it (advance the cursor) and return `true`.
func (parser *CSVParser) tryReadExact(content []byte) (bool, error) {
matched, _, err := parser.tryPeekExact(content)
if matched {
parser.skipBytes(len(content))
}
return matched, err
}

func (parser *CSVParser) tryReadNewLine(b byte) (bool, error) {
Expand Down Expand Up @@ -487,6 +499,11 @@ outside:
fieldIsQuoted = false
case csvTokenDelimiter:
if prevToken != csvTokenComma && prevToken != csvTokenNewLine {
if parser.unescapedQuote {
whitespaceLine = false
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
continue
}
parser.logSyntaxError()
return nil, errors.AddStack(errUnexpectedQuoteField)
}
Expand Down Expand Up @@ -576,6 +593,21 @@ func (parser *CSVParser) readQuotedField() error {
if doubledDelimiter {
// consume the double quotation mark and continue
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
} else if parser.unescapedQuote {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
// allow unescaped quote inside quoted field, so we only finish
// reading the field when we see a delimiter + comma/newline.
comma, _, err2 := parser.tryPeekExact(parser.comma)
if comma || err2 != nil {
return err2
}
newline, eof, err2 := parser.tryPeekExact(parser.newLine)
if eof || newline {
return nil
}
if err2 != nil {
return err2
}
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
} else {
// the field is completed, exit.
return nil
Expand Down
69 changes: 61 additions & 8 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,20 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), ioWorkers, false, nil)

func TestMySQL(t *testing.T) {
cfg := config.CSVConfig{
Separator: ",",
Delimiter: `"`,
EscapedBy: `\`,
NotNull: false,
Null: []string{`\N`},
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
EscapedBy: `\`,
NotNull: false,
Null: []string{`\N`},
}

parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"\"","\\","\?"
"\
",\N,\\N`), int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.NoError(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
Expand All @@ -408,7 +409,7 @@ func TestMySQL(t *testing.T) {
}, parser.LastRow())
assertPosEqual(t, parser, 15, 1)

require.Nil(t, parser.ReadRow())
require.NoError(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 2,
Row: []types.Datum{
Expand All @@ -428,14 +429,66 @@ func TestMySQL(t *testing.T) {
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.NoError(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})),
},
Length: 23,
}, parser.LastRow())

cfg.UnescapedQuote = true
parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`3,"a string containing a " quote",102.20
`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.NoError(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum("3"),
types.NewStringDatum(`a string containing a " quote`),
types.NewStringDatum("102.20"),
},
Length: 36,
}, parser.LastRow())

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`3,"a string containing a " quote","102.20"`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.NoError(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum("3"),
types.NewStringDatum(`a string containing a " quote`),
types.NewStringDatum("102.20"),
},
Length: 36,
}, parser.LastRow())

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"a"b",c"d"e`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.NoError(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(`a"b`),
types.NewStringDatum(`c"d"e`),
},
Length: 8,
}, parser.LastRow())
}

func TestCustomEscapeChar(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ func (e *LoadDataInfo) GenerateCSVConfig() *config.CSVConfig {
AllowEmptyLine: true,
// TODO: set it through NULL DEFINED BY OPTIONALLY ENCLOSED
QuotedNullIsText: true,
UnescapedQuote: true,
}
}

Expand Down
63 changes: 32 additions & 31 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,38 +1556,39 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) {
dbt.MustExec("drop table if exists pn")
})

// TODO: disabled
// Test with upper case variables.
cli.runTestsOnNewDB(t, func(config *mysql.Config) {
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
config.AllowAllFiles = true
config.Params["sql_mode"] = "''"
}, "LoadData", func(dbt *testkit.DBTestKit) {
dbt.MustExec("drop table if exists pn")
dbt.MustExec("create table pn (c1 int, c2 int, c3 int)")
dbt.MustExec("set @@tidb_dml_batch_size = 1")
_, err1 := dbt.GetDB().Exec(fmt.Sprintf(`load data local infile %q into table pn FIELDS TERMINATED BY ',' (c1, @VAL1, @VAL2) SET c3 = @VAL2 * 100, c2 = CAST(@VAL1 AS UNSIGNED)`, path))
require.NoError(t, err1)
var (
a int
b int
c int
)
rows := dbt.MustQuery("select * from pn")
require.Truef(t, rows.Next(), "unexpected data")
err = rows.Scan(&a, &b, &c)
require.NoError(t, err)
require.Equal(t, 1, a)
require.Equal(t, 2, b)
require.Equal(t, 300, c)
require.Truef(t, rows.Next(), "unexpected data")
err = rows.Scan(&a, &b, &c)
require.NoError(t, err)
require.Equal(t, 4, a)
require.Equal(t, 5, b)
require.Equal(t, 600, c)
require.Falsef(t, rows.Next(), "unexpected data")
require.NoError(t, rows.Close())
dbt.MustExec("drop table if exists pn")
})
//cli.runTestsOnNewDB(t, func(config *mysql.Config) {
// config.AllowAllFiles = true
// config.Params["sql_mode"] = "''"
//}, "LoadData", func(dbt *testkit.DBTestKit) {
// dbt.MustExec("drop table if exists pn")
// dbt.MustExec("create table pn (c1 int, c2 int, c3 int)")
// dbt.MustExec("set @@tidb_dml_batch_size = 1")
// _, err1 := dbt.GetDB().Exec(fmt.Sprintf(`load data local infile %q into table pn FIELDS TERMINATED BY ',' (c1, @VAL1, @VAL2) SET c3 = @VAL2 * 100, c2 = CAST(@VAL1 AS UNSIGNED)`, path))
// require.NoError(t, err1)
// var (
// a int
// b int
// c int
// )
// rows := dbt.MustQuery("select * from pn")
// require.Truef(t, rows.Next(), "unexpected data")
// err = rows.Scan(&a, &b, &c)
// require.NoError(t, err)
// require.Equal(t, 1, a)
// require.Equal(t, 2, b)
// require.Equal(t, 300, c)
// require.Truef(t, rows.Next(), "unexpected data")
// err = rows.Scan(&a, &b, &c)
// require.NoError(t, err)
// require.Equal(t, 4, a)
// require.Equal(t, 5, b)
// require.Equal(t, 600, c)
// require.Falsef(t, rows.Next(), "unexpected data")
// require.NoError(t, rows.Close())
// dbt.MustExec("drop table if exists pn")
//})
}

func (cli *testServerClient) runTestConcurrentUpdate(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion server/tidb_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
// this test will change `kv.TxnTotalSizeLimit` which may affect other test suites,
// so we must make it running in serial.
func TestLoadData1(t *testing.T) {
t.Skip("it is a break test. ")
ts := createTidbTestSuite(t)

ts.runTestLoadDataWithColumnList(t, ts.server)
Expand Down