diff --git a/pkg/lightning/restore/check_info.go b/pkg/lightning/restore/check_info.go index 140f1dffb..d30d7a132 100644 --- a/pkg/lightning/restore/check_info.go +++ b/pkg/lightning/restore/check_info.go @@ -16,6 +16,7 @@ package restore import ( "context" "fmt" + "io" "path/filepath" "reflect" "strings" @@ -285,6 +286,42 @@ func hasDefault(col *model.ColumnInfo) bool { col.IsGenerated() || mysql.HasAutoIncrementFlag(col.Flag) } +func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.SourceFileMeta) (cols []string, colCnt int, err error) { + var reader storage.ReadSeekCloser + if dataFileMeta.Type == md.SourceTypeParquet { + reader, err = md.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) + } else { + reader, err = rc.store.Open(ctx, dataFileMeta.Path) + } + if err != nil { + return nil, 0, errors.Trace(err) + } + + var parser md.Parser + blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize) + switch dataFileMeta.Type { + case md.SourceTypeCSV: + hasHeader := rc.cfg.Mydumper.CSV.Header + parser = md.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) + case md.SourceTypeSQL: + parser = md.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) + case md.SourceTypeParquet: + parser, err = md.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) + if err != nil { + return nil, 0, errors.Trace(err) + } + default: + panic(fmt.Sprintf("unknown file type '%s'", dataFileMeta.Type)) + } + defer parser.Close() + + err = parser.ReadRow() + if err != nil && errors.Cause(err) != io.EOF { + return nil, 0, errors.Trace(err) + } + return parser.Columns(), len(parser.LastRow().Row), nil +} + // SchemaIsValid checks the import file and cluster schema is match. func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMeta) ([]string, error) { msgs := make([]string, 0) @@ -294,67 +331,49 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe "please give a schema file in source dir or create table manually", tableInfo.DB, tableInfo.Name)) return msgs, nil } - if info != nil { - igCols := make(map[string]struct{}) - igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive) - if err != nil { - return nil, errors.Trace(err) - } - for _, col := range igCol.Columns { - igCols[col] = struct{}{} - } - if len(tableInfo.DataFiles) == 0 { - log.Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) - return nil, nil + igCols := make(map[string]struct{}) + igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive) + if err != nil { + return nil, errors.Trace(err) + } + for _, col := range igCol.Columns { + igCols[col] = struct{}{} + } + + if len(tableInfo.DataFiles) == 0 { + log.Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) + return nil, nil + } + + colCountFromTiDB := len(info.Core.Columns) + core := info.Core + defaultCols := make(map[string]struct{}) + for _, col := range core.Columns { + if hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.Flag)) { + // this column has default value or it's auto random id, so we can ignore it + defaultCols[col.Name.L] = struct{}{} } + } + // tidb_rowid have a default value. + defaultCols[model.ExtraHandleName.String()] = struct{}{} + + for _, dataFile := range tableInfo.DataFiles { // get columns name from data file. - dataFileMeta := tableInfo.DataFiles[0].FileMeta - var reader storage.ReadSeekCloser - if dataFileMeta.Type == md.SourceTypeParquet { - reader, err = md.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) - } else { - reader, err = rc.store.Open(ctx, dataFileMeta.Path) - } - if err != nil { - return nil, errors.Trace(err) - } + dataFileMeta := dataFile.FileMeta - var parser md.Parser - blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize) - switch dataFileMeta.Type { - case md.SourceTypeCSV: - hasHeader := rc.cfg.Mydumper.CSV.Header - parser = md.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) - case md.SourceTypeSQL: - parser = md.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) - case md.SourceTypeParquet: - parser, err = md.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) - if err != nil { - return nil, errors.Trace(err) - } - default: + if tp := dataFileMeta.Type; tp != md.SourceTypeCSV && tp != md.SourceTypeSQL && tp != md.SourceTypeParquet { msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String())) + return msgs, nil } - err = parser.ReadRow() + colsFromDataFile, colCountFromDataFile, err := rc.readColumnsAndCount(ctx, dataFileMeta) if err != nil { return nil, errors.Trace(err) } - - colsFromDataFile := parser.Columns() - colCountFromDataFile := len(parser.LastRow().Row) - - colCountFromTiDB := len(info.Core.Columns) - core := info.Core - defaultCols := make(map[string]struct{}) - for _, col := range core.Columns { - if hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.Flag)) { - // this column has default value or it's auto random id, so we can ignore it - defaultCols[col.Name.L] = struct{}{} - } + if colsFromDataFile == nil && colCountFromDataFile == 0 { + log.Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) + continue } - // tidb_rowid have a default value. - defaultCols[model.ExtraHandleName.String()] = struct{}{} if colsFromDataFile == nil { // when there is no columns name in data file. we must insert data in order. @@ -392,9 +411,13 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe colMap[model.ExtraHandleName.String()] = struct{}{} for _, col := range colsFromDataFile { if _, ok := colMap[col]; !ok { + checkMsg := "please check table schema" + if dataFileMeta.Type == md.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { + checkMsg += " and csv file header" + } msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+ - "please check table schema or use tables.ignoreColumns to ignore %s", - tableInfo.DB, tableInfo.Name, col, col)) + "%s or use tables.ignoreColumns to ignore %s", + tableInfo.DB, tableInfo.Name, col, checkMsg, col)) } else { // remove column for next iteration delete(colMap, col) @@ -410,6 +433,9 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe tableInfo.DB, tableInfo.Name, col, col)) } } + if len(msgs) > 0 { + return msgs, nil + } } return msgs, nil } diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 981b9320f..4d9448cec 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -727,10 +727,18 @@ func (rc *Controller) restoreSchema(ctx context.Context) error { } rc.dbInfos = dbInfos - err = rc.DataCheck(ctx) - if err != nil { - return errors.Trace(err) + if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() { + if err = rc.DataCheck(ctx); err != nil { + return errors.Trace(err) + } + // print check template only if check requirements is true. + fmt.Println(rc.checkTemplate.Output()) + if !rc.checkTemplate.Success() { + return errors.Errorf("tidb-lightning pre-check failed." + + " Please fix the failed check(s) or set --check-requirements=false to skip checks") + } } + // Load new checkpoints err = rc.checkpointsDB.Initialize(ctx, rc.cfg, dbInfos) if err != nil { @@ -751,14 +759,6 @@ func (rc *Controller) restoreSchema(ctx context.Context) error { return errors.Trace(err) } - if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() { - // print check template only if check requirements is true. - fmt.Println(rc.checkTemplate.Output()) - if !rc.checkTemplate.Success() { - return errors.Errorf("tidb-lightning pre-check failed." + - " Please fix the failed check(s) or set --check-requirements=false to skip checks") - } - } return nil } @@ -1838,7 +1838,7 @@ func (rc *Controller) DataCheck(ctx context.Context) error { for _, tableInfo := range dbInfo.Tables { // if hasCheckpoint is true, the table will start import from the checkpoint // so we can skip TableHasDataInCluster and SchemaIsValid check. - var noCheckpoint bool + noCheckpoint := true if rc.cfg.Checkpoint.Enable { if msgs, noCheckpoint, err = rc.CheckpointIsValid(ctx, tableInfo); err != nil { return errors.Trace(err)