Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-ha/: add remove metadata feature #651

Merged
merged 41 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
14d16ba
add remove metadata feature
lichunzhu May 7, 2020
76d4eec
Merge branch 'master' into fixRemoveMeta
lichunzhu May 7, 2020
18d0f7b
add remove-meta to integration test
lichunzhu May 7, 2020
47c7a7a
add annotation
lichunzhu May 7, 2020
9b6214a
add adjust db
lichunzhu May 7, 2020
a0c0061
add decrypt
lichunzhu May 7, 2020
9a7e9de
Merge branch 'master' into fixRemoveMeta
lichunzhu May 7, 2020
9e2d625
add close
lichunzhu May 7, 2020
571f0ee
Merge branch 'fixRemoveMeta' of https://github.com/lichunzhu/dm into …
lichunzhu May 7, 2020
54d62c0
address comments
lichunzhu May 8, 2020
8cffb59
remove metrics
lichunzhu May 8, 2020
ffd5528
move remove-meta from config file to command
lichunzhu May 8, 2020
872471e
fix test
lichunzhu May 8, 2020
8ecc213
fix ut
lichunzhu May 8, 2020
562d42f
address comments
lichunzhu May 12, 2020
0024702
Merge branch 'master' into fixRemoveMeta
lichunzhu May 12, 2020
0713a35
add ut
lichunzhu May 12, 2020
152fcdf
Merge branch 'fixRemoveMeta' of https://github.com/lichunzhu/dm into …
lichunzhu May 12, 2020
6dd58b3
Merge branch 'master' into fixRemoveMeta
lichunzhu May 12, 2020
cf8906c
fix remove meta ut
lichunzhu May 13, 2020
7ad15b8
Merge branch 'master' into fixRemoveMeta
lichunzhu May 13, 2020
e04447d
remove both pesi/opti for any shardmode
lichunzhu May 13, 2020
fb7c8b3
remove shardmeta.MetaTableFormat
lichunzhu May 13, 2020
d2876ef
add part of integration test for remove meta
lichunzhu May 13, 2020
8517c98
fix ut again
lichunzhu May 13, 2020
9dc0a9e
complete sequence_sharding_removemeta test
lichunzhu May 13, 2020
efb33b7
add db operation
lichunzhu May 13, 2020
2e9b9ce
add check ddl locks
lichunzhu May 13, 2020
3f86e9b
address comment
lichunzhu May 14, 2020
b6fb025
add mutex test
lichunzhu May 14, 2020
95b2e41
Merge branch 'master' into fixRemoveMeta
lichunzhu May 14, 2020
22fe815
merge master branch
lichunzhu May 15, 2020
37b8666
Merge branch 'fixRemoveMeta' of https://github.com/lichunzhu/dm into …
lichunzhu May 15, 2020
c578d6b
Merge branch 'master' into fixRemoveMeta
lichunzhu May 15, 2020
2f57b00
fix ut
lichunzhu May 15, 2020
60a0aca
Merge branch 'master' into fixRemoveMeta
lichunzhu May 15, 2020
025b209
merge master
lichunzhu May 19, 2020
59a69a8
add sleep
lichunzhu May 19, 2020
52926d0
add sleep to avoid canceled ddl error
lichunzhu May 19, 2020
c721261
change kill 3 master to kill 2
lichunzhu May 20, 2020
a891447
add todo
lichunzhu May 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.Mode = c.TaskMode
cfg.CaseSensitive = c.CaseSensitive
cfg.MetaSchema = c.MetaSchema
cfg.RemoveMeta = c.RemoveMeta
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
cfg.EnableHeartbeat = c.EnableHeartbeat
cfg.HeartbeatUpdateInterval = c.HeartbeatUpdateInterval
cfg.HeartbeatReportInterval = c.HeartbeatReportInterval
Expand Down
39 changes: 39 additions & 0 deletions dm/master/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/prometheus/client_golang/prometheus"
)

var (
stmtHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "master",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"type", "task"})
)

// RegisterMetrics registers metrics
func RegisterMetrics() {
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())

registry.MustRegister(stmtHistogram)
}
81 changes: 77 additions & 4 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ import (
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/election"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/pkg/utils"
shardmeta "github.com/pingcap/dm/syncer/sharding-meta"
)

const (
Expand Down Expand Up @@ -165,6 +167,9 @@ func (s *Server) Start(ctx context.Context) (err error) {
// gRPC API server
gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) }

// register metrics before serving
RegisterMetrics()

// start embed etcd server, gRPC API server and HTTP (API, status and debug) server.
s.etcd, err = startEtcd(etcdCfg, gRPCSvr, userHandles, etcdStartTimeout)
if err != nil {
Expand Down Expand Up @@ -360,15 +365,27 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
if len(sourceRespCh) > 0 {
sourceResps = sortCommonWorkerResults(sourceRespCh)
} else {
sources := make([]string, 0, len(stCfgs))
for _, stCfg := range stCfgs {
sources = append(sources, stCfg.SourceID)
}
if cfg.RemoveMeta {
if scm := s.scheduler.GetSubTaskCfgsByTask(cfg.Name); len(scm) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need any mechanism to prevent start-task in another client when removing meta?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add removeMetaLock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need a lock, may start task with same task name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em... may user start-task again when the previous start-task is still removing meta?

resp.Msg = terror.Annotate(terror.ErrSchedulerSubTaskExist.Generate(cfg.Name, sources),
"while remove-meta is true").Error()
return resp, nil
}
err = s.removeMetaData(ctx, cfg)
if err != nil {
resp.Msg = terror.Annotate(err, "while removing metadata").Error()
return resp, nil
}
}
err = s.scheduler.AddSubTasks(subtaskCfgPointersToInstances(stCfgs...)...)
if err != nil {
resp.Msg = errors.ErrorStack(err)
return resp, nil
}
sources := make([]string, 0, len(stCfgs))
for _, stCfg := range stCfgs {
sources = append(sources, stCfg.SourceID)
}
resp.Result = true
sourceResps = s.getSourceRespsAfterOperation(ctx, cfg.Name, sources, []string{}, req)
}
Expand Down Expand Up @@ -1332,6 +1349,62 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return cfg, stCfgs, nil
}

func (s *Server) removeMetaData(ctx context.Context, cfg *config.TaskConfig) error {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
toDB := *cfg.TargetDB
toDB.Adjust()
if len(toDB.Password) > 0 {
pswdTo, err := utils.Decrypt(toDB.Password)
if err != nil {
return err
}
toDB.Password = pswdTo
}

sqls := make([]string, 0, 3)
sqls = append(sqls, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cfg.MetaSchema))
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
// clear loader and syncer checkpoints
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cfg.Name+"_loader_checkpoint"))
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cfg.Name+"_syncer_checkpoint"))

// clear etcd data
if cfg.ShardMode == config.ShardPessimistic {
err := s.pessimist.RemoveMetaData(cfg.Name)
if err != nil {
return err
}
// clear shard meta data for pessimistic
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, fmt.Sprintf(shardmeta.MetaTableFormat, cfg.Name)))
} else if cfg.ShardMode == config.ShardOptimistic {
err := s.optimist.RemoveMetaData(cfg.Name)
if err != nil {
return err
}
}

// clear online ddl schema
if cfg.OnlineDDLScheme != "" {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, fmt.Sprintf("%s_onlineddl", cfg.Name)))
}

baseDB, err := conn.DefaultDBProvider.Apply(toDB)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer baseDB.Close()
dbConn, err := baseDB.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer baseDB.CloseBaseConn(dbConn)
ctctx := tcontext.Background().WithContext(ctx).WithLogger(log.With(zap.String("unit", "remove metadata")))
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
_, err = dbConn.ExecuteSQL(ctctx, stmtHistogram, cfg.Name, sqls)
return err
}

func extractWorkerError(result *pb.ProcessResult) error {
if result != nil && len(result.Errors) > 0 {
return terror.ErrMasterOperRespNotSuccess.Generate(utils.JoinProcessErrors(result.Errors))
Expand Down
25 changes: 25 additions & 0 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ func (o *Optimist) ShowLocks(task string, sources []string) []*pb.DDLLock {
return ret
}

// RemoveMetaData removes meta data for a specified task
// NOTE: this function can only be used when the specified task is not running
func (o *Optimist) RemoveMetaData(task string) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

infos, ops, _, err := optimism.GetInfosOperationsThroughTask(o.cli, task)
if err != nil {
return err
}
for _, info := range infos {
o.lk.RemoveLockThroughInfo(info)
}
for _, op := range ops {
o.lk.RemoveLock(op.ID)
}

o.tk.RemoveTableThroughTask(task)
_, err = optimism.DeleteInfosOperationsTablesThroughTask(o.cli, task)
return err
}

// run runs jobs in the background.
func (o *Optimist) run(ctx context.Context, revSource, revInfo, revOperation int64) error {
for {
Expand Down
21 changes: 21 additions & 0 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,27 @@ func (p *Pessimist) UnlockLock(ctx context.Context, ID, replaceOwner string, for
return nil
}

// RemoveMetaData removes meta data for a specified task
// NOTE: this function can only be used when the specified task is not running
func (p *Pessimist) RemoveMetaData(task string) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return terror.ErrMasterPessimistNotStarted.Generate()
}

infos, ops, _, err := pessimism.GetInfosOperationsThroughTask(p.cli, task)
for _, info := range infos {
p.lk.RemoveLockThroughInfo(info)
}
for _, op := range ops {
p.lk.RemoveLock(op.ID)
}

_, err = pessimism.DeleteInfosOperationsThroughTask(p.cli, task)
return err
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, opm map[string]map[string]pessimism.Operation) error {
// construct locks based on the shard DDL info.
Expand Down
1 change: 1 addition & 0 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *Server) JoinMaster(endpoints []string) error {
ctx1, cancel1 = context.WithTimeout(ctx, 3*time.Second)
resp, err := client.RegisterWorker(ctx1, req)
cancel1()
conn.Close()
if err != nil {
log.L().Error("fail to register worker", zap.Error(err))
continue
Expand Down
19 changes: 19 additions & 0 deletions pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (lk *LockKeeper) RemoveLock(lockID string) bool {
return ok
}

// RemoveLockThroughInfo removes a lock.
func (lk *LockKeeper) RemoveLockThroughInfo(info Info) bool {
lockID := genDDLLockID(info)
return lk.RemoveLock(lockID)
}

// FindLock finds a lock.
func (lk *LockKeeper) FindLock(lockID string) *Lock {
lk.mu.RLock()
Expand Down Expand Up @@ -193,6 +199,19 @@ func (tk *TableKeeper) RemoveTable(task, source, upSchema, upTable, downSchema,
return removed
}

// RemoveTableThroughTask removes tables from the source tables through task name.
// it returns whether removed (exit before).
func (tk *TableKeeper) RemoveTableThroughTask(task string) bool {
tk.mu.Lock()
defer tk.mu.Unlock()

if _, ok := tk.tables[task]; !ok {
return false
}
delete(tk.tables, task)
return true
}

// FindTables finds source tables by task name and downstream table name.
func (tk *TableKeeper) FindTables(task, downSchema, downTable string) []TargetTable {
tk.mu.RLock()
Expand Down
33 changes: 33 additions & 0 deletions pkg/shardddl/optimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,39 @@ func GetAllOperations(cli *clientv3.Client) (map[string]map[string]map[string]ma
return opm, resp.Header.Revision, nil
}

// GetInfosOperationsThroughTask gets all shard DDL info and operation in etcd currently.
// This function should often be called by DM-master.
func GetInfosOperationsThroughTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error) {
respTxn, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli,
clientv3.OpGet(common.ShardDDLOptimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix()),
clientv3.OpGet(common.ShardDDLOptimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix()))

if err != nil {
return nil, nil, 0, err
}
infoResp := respTxn.Responses[0].GetResponseRange()
opsResp := respTxn.Responses[1].GetResponseRange()
var (
infos = make([]Info, 0, len(infoResp.Kvs))
ops = make([]Operation, 0, len(opsResp.Kvs))
)
for _, kv := range infoResp.Kvs {
info, err2 := infoFromJSON(string(kv.Value))
if err2 != nil {
return nil, nil, 0, err2
}
infos = append(infos, info)
}
for _, kv := range opsResp.Kvs {
op, err2 := operationFromJSON(string(kv.Value))
if err2 != nil {
return nil, nil, 0, err2
}
ops = append(ops, op)
}
return infos, ops, respTxn.Header.Revision, nil
}

// WatchOperationPut watches PUT operations for DDL lock operation.
// If want to watch all operations matching, pass empty string for `task`, `source`, `upSchema` and `upTable`.
// This function can be called by DM-worker and DM-master.
Expand Down
11 changes: 11 additions & 0 deletions pkg/shardddl/optimism/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package optimism
import (
"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
)

Expand Down Expand Up @@ -59,3 +60,13 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...)
return rev, err
}

// DeleteInfosOperationsTablesThroughTask deletes the shard DDL infos and operations in etcd.
func DeleteInfosOperationsTablesThroughTask(cli *clientv3.Client, task string) (int64, error) {
opsDel := make([]clientv3.Op, 0, 3)
opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLOptimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix()))
opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLOptimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix()))
opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLOptimismSourceTablesKeyAdapter.Encode(task), clientv3.WithPrefix()))
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...)
return rev, err
}
6 changes: 6 additions & 0 deletions pkg/shardddl/pessimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (lk *LockKeeper) RemoveLock(lockID string) bool {
return ok
}

// RemoveLockThroughInfo removes a lock through given info.
func (lk *LockKeeper) RemoveLockThroughInfo(info Info) bool {
lockID := genDDLLockID(info)
return lk.RemoveLock(lockID)
}

// FindLock finds a lock.
func (lk *LockKeeper) FindLock(lockID string) *Lock {
lk.mu.RLock()
Expand Down
32 changes: 32 additions & 0 deletions pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,38 @@ func GetAllOperations(cli *clientv3.Client) (map[string]map[string]Operation, in
return opm, resp.Header.Revision, nil
}

// GetInfosOperationsThroughTask gets all DDL lock infos and operations in etcd currently.
func GetInfosOperationsThroughTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error) {
respTxn, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli,
clientv3.OpGet(common.ShardDDLPessimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix()),
clientv3.OpGet(common.ShardDDLPessimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix()))

if err != nil {
return nil, nil, 0, err
}
infoResp := respTxn.Responses[0].GetResponseRange()
opsResp := respTxn.Responses[1].GetResponseRange()
var (
infos = make([]Info, 0, len(infoResp.Kvs))
ops = make([]Operation, 0, len(opsResp.Kvs))
)
for _, kv := range infoResp.Kvs {
info, err2 := infoFromJSON(string(kv.Value))
if err2 != nil {
return nil, nil, 0, err2
}
infos = append(infos, info)
}
for _, kv := range opsResp.Kvs {
op, err2 := operationFromJSON(string(kv.Value))
if err2 != nil {
return nil, nil, 0, err2
}
ops = append(ops, op)
}
return infos, ops, respTxn.Header.Revision, nil
}

// WatchOperationPut watches PUT operations for DDL lock operation.
// If want to watch all operations, pass empty string for `task` and `source`.
// This function can be called by DM-worker and DM-master.
Expand Down
Loading