Skip to content

Commit

Permalink
syncer: load table structure from dump files (pingcap#1699)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and ti-chi-bot committed Jun 9, 2021
1 parent 5269b0a commit ce43e28
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 86 deletions.
4 changes: 2 additions & 2 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ ErrSyncerUnitInjectDDLOnly,[code=36023:class=sync-unit:scope=internal:level=low]
ErrSyncerUnitInjectDDLWithoutSchema,[code=36024:class=sync-unit:scope=internal:level=low], "Message: injected DDL %s without schema name not valid"
ErrSyncerUnitNotSupportedOperate,[code=36025:class=sync-unit:scope=internal:level=medium], "Message: op %s not supported"
ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=medium], "Message: nil request not valid"
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values)"
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values), Workaround: Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure."
ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high], "Message: Old value count doesn't match new value count: %d (old) vs %d (new)"
ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high], "Message: prune DML columns and data mismatch in length: %d (columns) %d (data)"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Pleass check the `filters` config in source and task configuration files."
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files."
ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high], "Message: generate column mapping, Workaround: Please check the `column-mappings` config in task configuration file."
ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high], "Message: mapping row data %v for table `%s`.`%s`"
Expand Down
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ tags = ["internal", "medium"]
[error.DM-sync-unit-36027]
message = "Column count doesn't match value count: %d (columns) vs %d (values)"
description = ""
workaround = ""
workaround = "Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure."
tags = ["internal", "high"]

[error.DM-sync-unit-36028]
Expand All @@ -1693,7 +1693,7 @@ tags = ["internal", "high"]
[error.DM-sync-unit-36030]
message = "generate binlog event filter"
description = ""
workaround = "Pleass check the `filters` config in source and task configuration files."
workaround = "Please check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-sync-unit-36031]
Expand Down
45 changes: 17 additions & 28 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,22 +888,18 @@ func (l *Loader) prepareDBFiles(files map[string]struct{}) error {
l.totalFileCount.Store(0) // reset
schemaFileCount := 0
for file := range files {
if !strings.HasSuffix(file, "-schema-create.sql") {
db, ok := utils.GetDBFromDumpFilename(file)
if !ok {
continue
}

idx := strings.LastIndex(file, "-schema-create.sql")
if idx > 0 {
schemaFileCount++
db := file[:idx]
if l.skipSchemaAndTable(&filter.Table{Schema: db}) {
l.logger.Warn("ignore schema file", zap.String("schema file", file))
continue
}

l.db2Tables[db] = make(Tables2DataFiles)
l.totalFileCount.Add(1) // for schema
schemaFileCount++
if l.skipSchemaAndTable(&filter.Table{Schema: db}) {
l.logger.Warn("ignore schema file", zap.String("schema file", file))
continue
}

l.db2Tables[db] = make(Tables2DataFiles)
l.totalFileCount.Add(1) // for schema
}

if schemaFileCount == 0 {
Expand All @@ -918,21 +914,11 @@ func (l *Loader) prepareDBFiles(files map[string]struct{}) error {

func (l *Loader) prepareTableFiles(files map[string]struct{}) error {
var tablesNumber float64

for file := range files {
if !strings.HasSuffix(file, "-schema.sql") {
continue
}

idx := strings.LastIndex(file, "-schema.sql")
name := file[:idx]
fields := strings.Split(name, ".")
if len(fields) != 2 {
l.logger.Warn("invalid table schema file", zap.String("file", file))
db, table, ok := utils.GetTableFromDumpFilename(file)
if !ok {
continue
}

db, table := fields[0], fields[1]
if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) {
l.logger.Warn("ignore table file", zap.String("table file", file))
continue
Expand Down Expand Up @@ -1037,7 +1023,7 @@ func (l *Loader) prepare() error {
}

// collect dir files.
files, err := CollectDirFiles(l.cfg.Dir)
files, err := utils.CollectDirFiles(l.cfg.Dir)
if err != nil {
return err
}
Expand Down Expand Up @@ -1484,14 +1470,17 @@ func (l *Loader) cleanDumpFiles() {
l.logger.Warn("error when remove loaded dump folder", zap.String("data folder", l.cfg.Dir), zap.Error(err))
}
} else {
// leave metadata file, only delete sql files
files, err := CollectDirFiles(l.cfg.Dir)
// leave metadata file and table structure files, only delete data files
files, err := utils.CollectDirFiles(l.cfg.Dir)
if err != nil {
l.logger.Warn("fail to collect files", zap.String("data folder", l.cfg.Dir), zap.Error(err))
}
var lastErr error
for f := range files {
if strings.HasSuffix(f, ".sql") {
if strings.HasSuffix(f, "-schema-create.sql") || strings.HasSuffix(f, "-schema.sql") {
continue
}
lastErr = os.Remove(filepath.Join(l.cfg.Dir, f))
}
}
Expand Down
25 changes: 0 additions & 25 deletions loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,11 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"strings"

"github.com/pingcap/dm/pkg/terror"
)

// CollectDirFiles gets files in path.
func CollectDirFiles(path string) (map[string]struct{}, error) {
files := make(map[string]struct{})
err := filepath.Walk(path, func(_ string, f os.FileInfo, err error) error {
if err != nil {
return err
}

if f == nil {
return nil
}

if f.IsDir() {
return nil
}

name := strings.TrimSpace(f.Name())
files[name] = struct{}{}
return nil
})

return files, err
}

// SQLReplace works like strings.Replace but only supports one replacement.
// It uses backquote pairs to quote the old and new word.
func SQLReplace(s, oldStr, newStr string, ansiquote bool) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,10 +964,10 @@ var (
ErrSyncerUnitInjectDDLWithoutSchema = New(codeSyncerUnitInjectDDLWithoutSchema, ClassSyncUnit, ScopeInternal, LevelLow, "injected DDL %s without schema name not valid", "")
ErrSyncerUnitNotSupportedOperate = New(codeSyncerUnitNotSupportedOperate, ClassSyncUnit, ScopeInternal, LevelMedium, "op %s not supported", "")
ErrSyncerUnitNilOperatorReq = New(codeSyncerUnitNilOperatorReq, ClassSyncUnit, ScopeInternal, LevelMedium, "nil request not valid", "")
ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "")
ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure.")
ErrSyncerUnitDMLOldNewValueMismatch = New(codeSyncerUnitDMLOldNewValueMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Old value count doesn't match new value count: %d (old) vs %d (new)", "")
ErrSyncerUnitDMLPruneColumnMismatch = New(codeSyncerUnitDMLPruneColumnMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "prune DML columns and data mismatch in length: %d (columns) %d (data)", "")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Pleass check the `filters` config in source and task configuration files.")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.")
ErrSyncerUnitGenTableRouter = New(codeSyncerUnitGenTableRouter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrSyncerUnitGenColumnMapping = New(codeSyncerUnitGenColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "generate column mapping", "Please check the `column-mappings` config in task configuration file.")
ErrSyncerUnitDoColumnMapping = New(codeSyncerUnitDoColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "mapping row data %v for table `%s`.`%s`", "")
Expand Down
51 changes: 51 additions & 0 deletions pkg/utils/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"

"go.uber.org/zap"

Expand Down Expand Up @@ -94,3 +96,52 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error {
}
return os.Rename(f.Name(), filename)
}

// CollectDirFiles gets files in path.
func CollectDirFiles(path string) (map[string]struct{}, error) {
files := make(map[string]struct{})
err := filepath.Walk(path, func(_ string, f os.FileInfo, err error) error {
if err != nil {
return err
}

if f == nil {
return nil
}

if f.IsDir() {
return nil
}

name := strings.TrimSpace(f.Name())
files[name] = struct{}{}
return nil
})

return files, err
}

// GetDBFromDumpFilename extracts db name from dump filename.
func GetDBFromDumpFilename(filename string) (db string, ok bool) {
if !strings.HasSuffix(filename, "-schema-create.sql") {
return "", false
}

idx := strings.LastIndex(filename, "-schema-create.sql")
return filename[:idx], true
}

// GetTableFromDumpFilename extracts db and table name from dump filename.
func GetTableFromDumpFilename(filename string) (db, table string, ok bool) {
if !strings.HasSuffix(filename, "-schema.sql") {
return "", "", false
}

idx := strings.LastIndex(filename, "-schema.sql")
name := filename[:idx]
fields := strings.Split(name, ".")
if len(fields) != 2 {
return "", "", false
}
return fields[0], fields[1], true
}
Loading

0 comments on commit ce43e28

Please sign in to comment.