Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
dm-ha/: add remove metadata feature (#651)
Browse files Browse the repository at this point in the history
* remove remove-meta in dm-worker.
* remove shard ddl etcd info, online sql info and pessimistic shard ddl info in sql on dm-master.
  • Loading branch information
lichunzhu authored May 20, 2020
1 parent b308a2d commit 24b0be9
Show file tree
Hide file tree
Showing 86 changed files with 1,037 additions and 225 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ ErrMasterRequestIsNotForwardToLeader,[code=38043:class=dm-master:scope=internal:
ErrMasterIsNotAsyncRequest,[code=38044:class=dm-master:scope=internal:level=medium],"request %s is not an async one, needn't wait for ok"
ErrMasterFailToGetExpectResult,[code=38045:class=dm-master:scope=internal:level=medium],"fail to get expected result"
ErrMasterPessimistNotStarted,[code=38046:class=dm-master:scope=internal:level=medium],"the shardddl pessimist has not started"
ErrMasterOptimistNotStarted,[code=38047:class=dm-master:scope=internal:level=medium],"the shardddl optimist has not started"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
1 change: 0 additions & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ type SubTaskConfig struct {
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
MetaSchema string `toml:"meta-schema" json:"meta-schema"`
RemoveMeta bool `toml:"remove-meta" json:"remove-meta"`
HeartbeatUpdateInterval int `toml:"heartbeat-update-interval" json:"heartbeat-update-interval"`
HeartbeatReportInterval int `toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
EnableHeartbeat bool `toml:"enable-heartbeat" json:"enable-heartbeat"`
Expand Down
5 changes: 1 addition & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,7 @@ type TaskConfig struct {
// we store detail status in meta
// don't save configuration into it
MetaSchema string `yaml:"meta-schema"`
// remove meta from downstreaming database
// now we delete checkpoint and online ddl information
RemoveMeta bool `yaml:"remove-meta"`

EnableHeartbeat bool `yaml:"enable-heartbeat"`
HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"`
HeartbeatReportInterval int `yaml:"heartbeat-report-interval"`
Expand Down Expand Up @@ -510,7 +508,6 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.Mode = c.TaskMode
cfg.CaseSensitive = c.CaseSensitive
cfg.MetaSchema = c.MetaSchema
cfg.RemoveMeta = c.RemoveMeta
cfg.EnableHeartbeat = c.EnableHeartbeat
cfg.HeartbeatUpdateInterval = c.HeartbeatUpdateInterval
cfg.HeartbeatReportInterval = c.HeartbeatReportInterval
Expand Down
8 changes: 1 addition & 7 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -56,7 +55,6 @@ name: test1
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -80,7 +78,7 @@ mysql-instances:
err := taskConfig.Decode(errorTaskConfig1)
// field server-id is not a member of TaskConfig
c.Check(err, NotNil)
c.Assert(err, ErrorMatches, "*line 19: field server-id not found in type config.MySQLInstance*")
c.Assert(err, ErrorMatches, "*line 18: field server-id not found in type config.MySQLInstance*")

err = taskConfig.Decode(errorTaskConfig2)
// field name duplicate
Expand All @@ -94,7 +92,6 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -113,7 +110,6 @@ task-mode: all
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -131,7 +127,6 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
Expand Down Expand Up @@ -210,7 +205,6 @@ task-mode: all
is-sharding: true
shard-mode: "optimistic"
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
Expand Down
15 changes: 12 additions & 3 deletions dm/ctl/master/start_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ import (
"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/terror"
)

// NewStartTaskCmd creates a StartTask command
func NewStartTaskCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "start-task [-s source ...] <config-file>",
Use: "start-task [-s source ...] [--remove-meta] <config-file>",
Short: "start a task as defined in the config file",
Run: startTaskFunc,
}
cmd.Flags().BoolP("remove-meta", "", false, "whether to remove task's meta data")
return cmd
}

Expand All @@ -54,14 +56,21 @@ func startTaskFunc(cmd *cobra.Command, _ []string) {
return
}

removeMeta, err := cmd.Flags().GetBool("remove-meta")
if err != nil {
common.PrintLines("%s", terror.Message(err))
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start task
cli := common.MasterClient()
resp, err := cli.StartTask(ctx, &pb.StartTaskRequest{
Task: string(content),
Sources: sources,
Task: string(content),
Sources: sources,
RemoveMeta: removeMeta,
})
if err != nil {
common.PrintLines("can not start task:\n%v", errors.ErrorStack(err))
Expand Down
4 changes: 1 addition & 3 deletions dm/dm-ansible/conf/task_advanced.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ name: test # global unique
task-mode: all # full/incremental/all
is-sharding: true # whether multi dm-worker do one sharding job
meta-schema: "dm_meta" # meta schema in downstreaming database to store meta informaton of dm
remove-meta: false # remove meta from downstreaming database, now we delete checkpoint and online ddl information
enable-heartbeat: false # whether to enable heartbeat for calculating lag between master and syncer
# heartbeat-update-interval: 1 # interval to do heartbeat and save timestamp, default 1s
# heartbeat-report-interval: 10 # interval to report time lap to prometheus, default 10s
Expand All @@ -23,8 +22,7 @@ mysql-instances: # one or more source database, config more source d
# `full` / `all`:
# never be used
# `incremental`:
# if `remove-meta` is true, this will be used
# else if checkpoints already exists in `meta-schema`, this will not be used
# if checkpoints already exists in `meta-schema`, this will not be used
# otherwise, this will be used
meta:
binlog-name: mysql-bin.000001
Expand Down
78 changes: 74 additions & 4 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/cputil"
"github.com/pingcap/dm/pkg/election"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
Expand Down Expand Up @@ -81,6 +83,9 @@ type Server struct {
leaderClient pb.MasterClient
leaderGrpcConn *grpc.ClientConn

// removeMetaLock locks start task when removing meta
removeMetaLock sync.RWMutex

// WaitGroup for background functions.
bgFunWg sync.WaitGroup

Expand Down Expand Up @@ -360,15 +365,31 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
if len(sourceRespCh) > 0 {
sourceResps = sortCommonWorkerResults(sourceRespCh)
} else {
sources := make([]string, 0, len(stCfgs))
for _, stCfg := range stCfgs {
sources = append(sources, stCfg.SourceID)
}
s.removeMetaLock.Lock()
if req.RemoveMeta {
if scm := s.scheduler.GetSubTaskCfgsByTask(cfg.Name); len(scm) > 0 {
resp.Msg = terror.Annotate(terror.ErrSchedulerSubTaskExist.Generate(cfg.Name, sources),
"while remove-meta is true").Error()
s.removeMetaLock.Unlock()
return resp, nil
}
err = s.removeMetaData(ctx, cfg)
if err != nil {
resp.Msg = terror.Annotate(err, "while removing metadata").Error()
s.removeMetaLock.Unlock()
return resp, nil
}
}
err = s.scheduler.AddSubTasks(subtaskCfgPointersToInstances(stCfgs...)...)
s.removeMetaLock.Unlock()
if err != nil {
resp.Msg = errors.ErrorStack(err)
return resp, nil
}
sources := make([]string, 0, len(stCfgs))
for _, stCfg := range stCfgs {
sources = append(sources, stCfg.SourceID)
}
resp.Result = true
sourceResps = s.getSourceRespsAfterOperation(ctx, cfg.Name, sources, []string{}, req)
}
Expand Down Expand Up @@ -1332,6 +1353,55 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return cfg, stCfgs, nil
}

func (s *Server) removeMetaData(ctx context.Context, cfg *config.TaskConfig) error {
toDB := *cfg.TargetDB
toDB.Adjust()
if len(toDB.Password) > 0 {
pswdTo, err := utils.Decrypt(toDB.Password)
if err != nil {
return err
}
toDB.Password = pswdTo
}

// clear shard meta data for pessimistic/optimist
err := s.pessimist.RemoveMetaData(cfg.Name)
if err != nil {
return err
}
err = s.optimist.RemoveMetaData(cfg.Name)
if err != nil {
return err
}

// set up db and clear meta data in downstream db
baseDB, err := conn.DefaultDBProvider.Apply(toDB)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer baseDB.Close()
dbConn, err := baseDB.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer baseDB.CloseBaseConn(dbConn)
ctctx := tcontext.Background().WithContext(ctx).WithLogger(log.With(zap.String("job", "remove metadata")))

sqls := make([]string, 0, 4)
// clear loader and syncer checkpoints
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name)))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name)))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)))

_, err = dbConn.ExecuteSQL(ctctx, nil, cfg.Name, sqls)
return err
}

func extractWorkerError(result *pb.ProcessResult) error {
if result != nil && len(result.Errors) > 0 {
return terror.ErrMasterOperRespNotSuccess.Generate(utils.JoinProcessErrors(result.Errors))
Expand Down
Loading

0 comments on commit 24b0be9

Please sign in to comment.