Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: prettify data check error output #1331

Merged
merged 8 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 79 additions & 53 deletions pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package restore
import (
"context"
"fmt"
"io"
"path/filepath"
"reflect"
"strings"
Expand Down Expand Up @@ -285,6 +286,42 @@ func hasDefault(col *model.ColumnInfo) bool {
col.IsGenerated() || mysql.HasAutoIncrementFlag(col.Flag)
}

func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.SourceFileMeta) (cols []string, colCnt int, err error) {
var reader storage.ReadSeekCloser
if dataFileMeta.Type == md.SourceTypeParquet {
reader, err = md.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize)
} else {
reader, err = rc.store.Open(ctx, dataFileMeta.Path)
}
if err != nil {
return nil, 0, errors.Trace(err)
}

var parser md.Parser
blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize)
switch dataFileMeta.Type {
case md.SourceTypeCSV:
hasHeader := rc.cfg.Mydumper.CSV.Header
parser = md.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader)
case md.SourceTypeSQL:
parser = md.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers)
case md.SourceTypeParquet:
parser, err = md.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path)
if err != nil {
return nil, 0, errors.Trace(err)
}
default:
panic(fmt.Sprintf("unknown file type '%s'", dataFileMeta.Type))
}
defer parser.Close()

err = parser.ReadRow()
if err != nil && errors.Cause(err) != io.EOF {
return nil, 0, errors.Trace(err)
}
return parser.Columns(), len(parser.LastRow().Row), nil
}

// SchemaIsValid checks the import file and cluster schema is match.
func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMeta) ([]string, error) {
msgs := make([]string, 0)
Expand All @@ -294,67 +331,49 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe
"please give a schema file in source dir or create table manually", tableInfo.DB, tableInfo.Name))
return msgs, nil
}
if info != nil {
igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}

if len(tableInfo.DataFiles) == 0 {
log.Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name))
return nil, nil
igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}

if len(tableInfo.DataFiles) == 0 {
log.Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name))
return nil, nil
}

colCountFromTiDB := len(info.Core.Columns)
core := info.Core
defaultCols := make(map[string]struct{})
for _, col := range core.Columns {
if hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.Flag)) {
// this column has default value or it's auto random id, so we can ignore it
defaultCols[col.Name.L] = struct{}{}
}
}
// tidb_rowid have a default value.
defaultCols[model.ExtraHandleName.String()] = struct{}{}

for _, dataFile := range tableInfo.DataFiles {
// get columns name from data file.
dataFileMeta := tableInfo.DataFiles[0].FileMeta
var reader storage.ReadSeekCloser
if dataFileMeta.Type == md.SourceTypeParquet {
reader, err = md.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize)
} else {
reader, err = rc.store.Open(ctx, dataFileMeta.Path)
}
if err != nil {
return nil, errors.Trace(err)
}
dataFileMeta := dataFile.FileMeta

var parser md.Parser
blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize)
switch dataFileMeta.Type {
case md.SourceTypeCSV:
hasHeader := rc.cfg.Mydumper.CSV.Header
parser = md.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader)
case md.SourceTypeSQL:
parser = md.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers)
case md.SourceTypeParquet:
parser, err = md.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path)
if err != nil {
return nil, errors.Trace(err)
}
default:
if tp := dataFileMeta.Type; tp != md.SourceTypeCSV && tp != md.SourceTypeSQL && tp != md.SourceTypeParquet {
msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String()))
return msgs, nil
}
err = parser.ReadRow()
colsFromDataFile, colCountFromDataFile, err := rc.readColumnsAndCount(ctx, dataFileMeta)
if err != nil {
return nil, errors.Trace(err)
}

colsFromDataFile := parser.Columns()
colCountFromDataFile := len(parser.LastRow().Row)

colCountFromTiDB := len(info.Core.Columns)
core := info.Core
defaultCols := make(map[string]struct{})
for _, col := range core.Columns {
if hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.Flag)) {
// this column has default value or it's auto random id, so we can ignore it
defaultCols[col.Name.L] = struct{}{}
}
if colsFromDataFile == nil && colCountFromDataFile == 0 {
log.Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path))
continue
}
// tidb_rowid have a default value.
defaultCols[model.ExtraHandleName.String()] = struct{}{}

if colsFromDataFile == nil {
// when there is no columns name in data file. we must insert data in order.
Expand Down Expand Up @@ -392,9 +411,13 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe
colMap[model.ExtraHandleName.String()] = struct{}{}
for _, col := range colsFromDataFile {
if _, ok := colMap[col]; !ok {
checkMsg := "please check table schema"
if dataFileMeta.Type == md.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header {
checkMsg += " and csv file header"
}
msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+
"please check table schema or use tables.ignoreColumns to ignore %s",
tableInfo.DB, tableInfo.Name, col, col))
"%s or use tables.ignoreColumns to ignore %s",
tableInfo.DB, tableInfo.Name, col, checkMsg, col))
} else {
// remove column for next iteration
delete(colMap, col)
Expand All @@ -410,6 +433,9 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe
tableInfo.DB, tableInfo.Name, col, col))
}
}
if len(msgs) > 0 {
return msgs, nil
}
}
return msgs, nil
}
24 changes: 12 additions & 12 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,18 @@ func (rc *Controller) restoreSchema(ctx context.Context) error {
}
rc.dbInfos = dbInfos

err = rc.DataCheck(ctx)
if err != nil {
return errors.Trace(err)
if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() {
if err = rc.DataCheck(ctx); err != nil {
return errors.Trace(err)
}
// print check template only if check requirements is true.
fmt.Println(rc.checkTemplate.Output())
if !rc.checkTemplate.Success() {
return errors.Errorf("tidb-lightning pre-check failed." +
" Please fix the failed check(s) or set --check-requirements=false to skip checks")
}
}

// Load new checkpoints
err = rc.checkpointsDB.Initialize(ctx, rc.cfg, dbInfos)
if err != nil {
Expand All @@ -751,14 +759,6 @@ func (rc *Controller) restoreSchema(ctx context.Context) error {
return errors.Trace(err)
}

if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() {
// print check template only if check requirements is true.
fmt.Println(rc.checkTemplate.Output())
if !rc.checkTemplate.Success() {
return errors.Errorf("tidb-lightning pre-check failed." +
" Please fix the failed check(s) or set --check-requirements=false to skip checks")
}
}
return nil
}

Expand Down Expand Up @@ -1838,7 +1838,7 @@ func (rc *Controller) DataCheck(ctx context.Context) error {
for _, tableInfo := range dbInfo.Tables {
// if hasCheckpoint is true, the table will start import from the checkpoint
// so we can skip TableHasDataInCluster and SchemaIsValid check.
var noCheckpoint bool
noCheckpoint := true
if rc.cfg.Checkpoint.Enable {
if msgs, noCheckpoint, err = rc.CheckpointIsValid(ctx, tableInfo); err != nil {
return errors.Trace(err)
Expand Down