Skip to content

Commit

Permalink
executor: check error message and refine LOAD DATA logic (#41640)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Feb 28, 2023
1 parent bdd8d6d commit 1da60a3
Show file tree
Hide file tree
Showing 19 changed files with 618 additions and 438 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewCSVParser(

if len(cfg.StartingBy) > 0 {
if strings.Contains(cfg.StartingBy, terminator) {
return nil, errors.New("starting-by cannot contain (line) terminator")
return nil, errors.Errorf("STARTING BY '%s' cannot contain LINES TERMINATED BY '%s'", cfg.StartingBy, terminator)
}
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,7 @@ yyy",5,xx"xxxx,8
},
}
_, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil)
require.ErrorContains(t, err, "starting-by cannot contain (line) terminator")
require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain LINES TERMINATED BY '\n'")
}

func TestCharsetConversion(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error {
return errors.Trace(err)
}
if u.Scheme == "" {
return errors.Annotate(berrors.ErrStorageInvalidConfig, "scheme not found in endpoint")
return errors.Errorf("scheme not found in endpoint")
}
if u.Host == "" {
return errors.Annotate(berrors.ErrStorageInvalidConfig, "host not found in endpoint")
return errors.Errorf("host not found in endpoint")
}
}
// In some cases, we need to set ForcePathStyle to false.
Expand Down
9 changes: 9 additions & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,15 @@ const (
ErrTempTableNotAllowedWithTTL = 8151
ErrUnsupportedTTLReferencedByFK = 8152
ErrUnsupportedPrimaryKeyTypeWithTTL = 8153
ErrLoadDataFromServerDisk = 8154
ErrLoadParquetFromLocal = 8155
ErrLoadDataEmptyPath = 8156
ErrLoadDataUnsupportedFormat = 8157
ErrLoadDataInvalidURI = 8158
ErrLoadDataCantAccess = 8159
ErrLoadDataCantRead = 8160
ErrLoadDataPhysicalImportTableNotEmpty = 8161
ErrLoadDataWrongFormatConfig = 8162

// Error codes used by TiDB ddl package
ErrUnsupportedDDLOperation = 8200
Expand Down
8 changes: 8 additions & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,14 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil),
ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil),
ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil),
ErrLoadDataFromServerDisk: mysql.Message("Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '%s' needs to specify the clause of LOCAL first", nil),
ErrLoadParquetFromLocal: mysql.Message("Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage", nil),
ErrLoadDataEmptyPath: mysql.Message("The value of INFILE must not be empty when LOAD DATA from LOCAL", nil),
ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil),
ErrLoadDataInvalidURI: mysql.Message("The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil),
ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Please check the URI, access key and secret access key are correct", nil),
ErrLoadDataCantRead: mysql.Message("Failed to read source files. Reason: %s. %s", nil),
ErrLoadDataWrongFormatConfig: mysql.Message("", nil),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil),
Expand Down
40 changes: 40 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,46 @@ error = '''
transaction aborted because lazy uniqueness check is enabled and an error occurred: %s
'''

["executor:8154"]
error = '''
Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '%s' needs to specify the clause of LOCAL first
'''

["executor:8155"]
error = '''
Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage
'''

["executor:8156"]
error = '''
The value of INFILE must not be empty when LOAD DATA from LOCAL
'''

["executor:8157"]
error = '''
The FORMAT '%s' is not supported
'''

["executor:8158"]
error = '''
The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'
'''

["executor:8159"]
error = '''
Access to the source file has been denied. Please check the URI, access key and secret access key are correct
'''

["executor:8160"]
error = '''
Failed to read source files. Reason: %s. %s
'''

["executor:8162"]
error = '''
'''

["executor:8212"]
error = '''
Failed to split region ranges: %s
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//br/pkg/errors",
"//config",
"//ddl",
"//ddl/placement",
Expand Down
47 changes: 12 additions & 35 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,46 +938,23 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID)
return nil
}
insertVal := &InsertValues{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Table: tbl,
Columns: v.Columns,
GenExprs: v.GenCols.Exprs,
isLoadData: true,
txnInUse: sync.Mutex{},
}
loadDataInfo := &LoadDataInfo{
row: make([]types.Datum, 0, len(insertVal.insertColumns)),
InsertValues: insertVal,
Path: v.Path,
Format: v.Format,
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
NullInfo: v.NullInfo,
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
OnDuplicate: v.OnDuplicate,
Ctx: b.ctx,
}
columnNames := loadDataInfo.initFieldMappings()
err := loadDataInfo.initLoadColumns(columnNames)
if !tbl.Meta().IsBaseTable() {
b.err = plannercore.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "LOAD")
return nil
}

worker, err := NewLoadDataWorker(b.ctx, v, tbl)
if err != nil {
b.err = err
return nil
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
loadDataInfo: loadDataInfo,
}
var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.initQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
return &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
loadDataWorker: worker,
}
}

func (b *executorBuilder) buildLoadStats(v *plannercore.LoadStats) Executor {
Expand Down
9 changes: 9 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,13 @@ var (
errUnsupportedFlashbackTmpTable = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Recover/flashback table is not supported on temporary tables", nil))
errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil))
ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword)

ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk)
ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal)
ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath)
ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat)
ErrLoadDataInvalidURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI)
ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess)
ErrLoadDataCantRead = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantRead)
ErrLoadDataWrongFormatConfig = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataWrongFormatConfig)
)
Loading

0 comments on commit 1da60a3

Please sign in to comment.