Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.

syncer: load table structure from dump files (#1699) #1754

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ ErrSyncerUnitInjectDDLOnly,[code=36023:class=sync-unit:scope=internal:level=low]
ErrSyncerUnitInjectDDLWithoutSchema,[code=36024:class=sync-unit:scope=internal:level=low], "Message: injected DDL %s without schema name not valid"
ErrSyncerUnitNotSupportedOperate,[code=36025:class=sync-unit:scope=internal:level=medium], "Message: op %s not supported"
ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=medium], "Message: nil request not valid"
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values)"
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values), Workaround: Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure."
ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high], "Message: Old value count doesn't match new value count: %d (old) vs %d (new)"
ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high], "Message: prune DML columns and data mismatch in length: %d (columns) %d (data)"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Pleass check the `filters` config in source and task configuration files."
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files."
ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high], "Message: generate column mapping, Workaround: Please check the `column-mappings` config in task configuration file."
ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high], "Message: mapping row data %v for table `%s`.`%s`"
Expand Down
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ tags = ["internal", "medium"]
[error.DM-sync-unit-36027]
message = "Column count doesn't match value count: %d (columns) vs %d (values)"
description = ""
workaround = ""
workaround = "Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure."
tags = ["internal", "high"]

[error.DM-sync-unit-36028]
Expand All @@ -1693,7 +1693,7 @@ tags = ["internal", "high"]
[error.DM-sync-unit-36030]
message = "generate binlog event filter"
description = ""
workaround = "Pleass check the `filters` config in source and task configuration files."
workaround = "Please check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-sync-unit-36031]
Expand Down
45 changes: 17 additions & 28 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,22 +888,18 @@ func (l *Loader) prepareDBFiles(files map[string]struct{}) error {
l.totalFileCount.Store(0) // reset
schemaFileCount := 0
for file := range files {
if !strings.HasSuffix(file, "-schema-create.sql") {
db, ok := utils.GetDBFromDumpFilename(file)
if !ok {
continue
}

idx := strings.LastIndex(file, "-schema-create.sql")
if idx > 0 {
schemaFileCount++
db := file[:idx]
if l.skipSchemaAndTable(&filter.Table{Schema: db}) {
l.logger.Warn("ignore schema file", zap.String("schema file", file))
continue
}

l.db2Tables[db] = make(Tables2DataFiles)
l.totalFileCount.Add(1) // for schema
schemaFileCount++
if l.skipSchemaAndTable(&filter.Table{Schema: db}) {
l.logger.Warn("ignore schema file", zap.String("schema file", file))
continue
}

l.db2Tables[db] = make(Tables2DataFiles)
l.totalFileCount.Add(1) // for schema
}

if schemaFileCount == 0 {
Expand All @@ -918,21 +914,11 @@ func (l *Loader) prepareDBFiles(files map[string]struct{}) error {

func (l *Loader) prepareTableFiles(files map[string]struct{}) error {
var tablesNumber float64

for file := range files {
if !strings.HasSuffix(file, "-schema.sql") {
continue
}

idx := strings.LastIndex(file, "-schema.sql")
name := file[:idx]
fields := strings.Split(name, ".")
if len(fields) != 2 {
l.logger.Warn("invalid table schema file", zap.String("file", file))
db, table, ok := utils.GetTableFromDumpFilename(file)
if !ok {
continue
}

db, table := fields[0], fields[1]
if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) {
l.logger.Warn("ignore table file", zap.String("table file", file))
continue
Expand Down Expand Up @@ -1037,7 +1023,7 @@ func (l *Loader) prepare() error {
}

// collect dir files.
files, err := CollectDirFiles(l.cfg.Dir)
files, err := utils.CollectDirFiles(l.cfg.Dir)
if err != nil {
return err
}
Expand Down Expand Up @@ -1484,14 +1470,17 @@ func (l *Loader) cleanDumpFiles() {
l.logger.Warn("error when remove loaded dump folder", zap.String("data folder", l.cfg.Dir), zap.Error(err))
}
} else {
// leave metadata file, only delete sql files
files, err := CollectDirFiles(l.cfg.Dir)
// leave metadata file and table structure files, only delete data files
files, err := utils.CollectDirFiles(l.cfg.Dir)
if err != nil {
l.logger.Warn("fail to collect files", zap.String("data folder", l.cfg.Dir), zap.Error(err))
}
var lastErr error
for f := range files {
if strings.HasSuffix(f, ".sql") {
if strings.HasSuffix(f, "-schema-create.sql") || strings.HasSuffix(f, "-schema.sql") {
continue
}
lastErr = os.Remove(filepath.Join(l.cfg.Dir, f))
}
}
Expand Down
25 changes: 0 additions & 25 deletions loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,11 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"strings"

"github.com/pingcap/dm/pkg/terror"
)

// CollectDirFiles gets files in path.
func CollectDirFiles(path string) (map[string]struct{}, error) {
files := make(map[string]struct{})
err := filepath.Walk(path, func(_ string, f os.FileInfo, err error) error {
if err != nil {
return err
}

if f == nil {
return nil
}

if f.IsDir() {
return nil
}

name := strings.TrimSpace(f.Name())
files[name] = struct{}{}
return nil
})

return files, err
}

// SQLReplace works like strings.Replace but only supports one replacement.
// It uses backquote pairs to quote the old and new word.
func SQLReplace(s, oldStr, newStr string, ansiquote bool) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,10 +964,10 @@ var (
ErrSyncerUnitInjectDDLWithoutSchema = New(codeSyncerUnitInjectDDLWithoutSchema, ClassSyncUnit, ScopeInternal, LevelLow, "injected DDL %s without schema name not valid", "")
ErrSyncerUnitNotSupportedOperate = New(codeSyncerUnitNotSupportedOperate, ClassSyncUnit, ScopeInternal, LevelMedium, "op %s not supported", "")
ErrSyncerUnitNilOperatorReq = New(codeSyncerUnitNilOperatorReq, ClassSyncUnit, ScopeInternal, LevelMedium, "nil request not valid", "")
ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "")
ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure.")
ErrSyncerUnitDMLOldNewValueMismatch = New(codeSyncerUnitDMLOldNewValueMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Old value count doesn't match new value count: %d (old) vs %d (new)", "")
ErrSyncerUnitDMLPruneColumnMismatch = New(codeSyncerUnitDMLPruneColumnMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "prune DML columns and data mismatch in length: %d (columns) %d (data)", "")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Pleass check the `filters` config in source and task configuration files.")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.")
ErrSyncerUnitGenTableRouter = New(codeSyncerUnitGenTableRouter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrSyncerUnitGenColumnMapping = New(codeSyncerUnitGenColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "generate column mapping", "Please check the `column-mappings` config in task configuration file.")
ErrSyncerUnitDoColumnMapping = New(codeSyncerUnitDoColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "mapping row data %v for table `%s`.`%s`", "")
Expand Down
51 changes: 51 additions & 0 deletions pkg/utils/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"

"go.uber.org/zap"

Expand Down Expand Up @@ -94,3 +96,52 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error {
}
return os.Rename(f.Name(), filename)
}

// CollectDirFiles gets files in path.
func CollectDirFiles(path string) (map[string]struct{}, error) {
files := make(map[string]struct{})
err := filepath.Walk(path, func(_ string, f os.FileInfo, err error) error {
if err != nil {
return err
}

if f == nil {
return nil
}

if f.IsDir() {
return nil
}

name := strings.TrimSpace(f.Name())
files[name] = struct{}{}
return nil
})

return files, err
}

// GetDBFromDumpFilename extracts db name from dump filename.
func GetDBFromDumpFilename(filename string) (db string, ok bool) {
if !strings.HasSuffix(filename, "-schema-create.sql") {
return "", false
}

idx := strings.LastIndex(filename, "-schema-create.sql")
return filename[:idx], true
}

// GetTableFromDumpFilename extracts db and table name from dump filename.
func GetTableFromDumpFilename(filename string) (db, table string, ok bool) {
if !strings.HasSuffix(filename, "-schema.sql") {
return "", "", false
}

idx := strings.LastIndex(filename, "-schema.sql")
name := filename[:idx]
fields := strings.Split(name, ".")
if len(fields) != 2 {
return "", "", false
}
return fields[0], fields[1], true
}
Loading