Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

shardddl: add infoOpLock for pessmist shardddl #1257

Merged
merged 3 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/failpoint"
)

var (
Expand All @@ -52,6 +53,8 @@ type Pessimist struct {

// taskSources used to get all sources relative to the given task.
taskSources func(task string) []string

infoOpMu sync.Mutex
}

// NewPessimist creates a new Pessimist instance.
Expand Down Expand Up @@ -464,17 +467,18 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
return
}
p.logger.Info("receive a shard DDL info", zap.Stringer("info", info))

p.infoOpMu.Lock()
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
if err != nil {
// if the lock become synced, and `done` for `exec`/`skip` operation received,
// but the `done` operations have not been deleted,
// then the DM-worker should not put any new DDL info until the old operation has been deleted.
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
// currently, only DDL mismatch will cause error
metrics.ReportDDLError(info.Task, metrics.InfoErrSyncLock)
p.infoOpMu.Unlock()
continue
} else if !synced {
p.logger.Info("the shard DDL lock has not synced", zap.String("lock", lockID), zap.Int("remain", remain))
p.infoOpMu.Unlock()
continue
}
p.logger.Info("the shard DDL lock has synced", zap.String("lock", lockID))
Expand All @@ -483,8 +487,8 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
if err != nil {
p.logger.Error("fail to handle the shard DDL lock", zap.String("lock", lockID), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
continue
}
p.infoOpMu.Unlock()
}
}
}
Expand All @@ -505,14 +509,17 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
continue
}

p.infoOpMu.Lock()
lock := p.lk.FindLock(op.ID)
if lock == nil {
p.logger.Warn("no lock for the shard DDL lock operation exist", zap.Stringer("operation", op))
p.infoOpMu.Unlock()
continue
} else if synced, _ := lock.IsSynced(); !synced {
// this should not happen in normal case.
p.logger.Warn("the lock for the shard DDL lock operation has not synced", zap.Stringer("operation", op))
metrics.ReportDDLError(op.Task, metrics.OpErrLockUnSynced)
p.infoOpMu.Unlock()
continue
}

Expand All @@ -527,13 +534,15 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
}
p.logger.Info("the lock info for the shard DDL lock operation has been cleared", zap.Stringer("operation", op))
p.infoOpMu.Unlock()
continue
}

// one of the non-owner dm-worker instance has done the operation,
// still need to wait for more `done` from other non-owner dm-worker instances.
if op.Source != lock.Owner {
p.logger.Info("the shard DDL lock operation of a non-owner has done", zap.Stringer("operation", op), zap.String("owner", lock.Owner))
p.infoOpMu.Unlock()
continue
}

Expand All @@ -544,6 +553,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
p.logger.Error("fail to put skip shard DDL lock operations for non-owner", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLError(op.Task, metrics.OpErrPutNonOwnerOp)
}
p.infoOpMu.Unlock()
}
}
}
Expand Down Expand Up @@ -631,6 +641,35 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
if err != nil {
return err
}

failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timer := time.NewTimer(time.Duration(t) * time.Second)
defer timer.Stop()
OUTER:
for {
select {
case <-timer.C:
log.L().Info("failed to wait new DDL info", zap.Int("wait second", t))
break OUTER
case <-ticker.C:
// manually check etcd
infos, _, err := pessimism.GetAllInfo(p.cli)
if err == nil {
if _, ok := infos[lock.Task]; ok {
log.L().Info("found new DDL info")
break OUTER
}
}
}
}
})
p.lk.RemoveLock(lock.ID)
metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
return nil
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ function check_log_contain_with_retry() {
log2=$3
fi
rc=0
for ((k=1;k<11;k++)); do
for ((k=1;k<31;k++)); do
if [[ ! -f $log1 ]]; then
sleep 2
echo "check log contain failed $k-th time (file not exist), retry later"
Expand Down
13 changes: 11 additions & 2 deletions tests/shardddl3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,11 @@ function DM_RemoveLock_CASE() {
check_log_contain_with_retry "wait new ddl info putted into etcd" $WORK_DIR/master/log/dm-master.log
run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"

check_log_contain_with_retry "fail to delete shard DDL infos and lock operations" $WORK_DIR/master/log/dm-master.log
if [[ "$1" = "pessimistic" ]]; then
check_log_contain_with_retry "found new DDL info" $WORK_DIR/master/log/dm-master.log
else
check_log_contain_with_retry "fail to delete shard DDL infos and lock operations" $WORK_DIR/master/log/dm-master.log
fi

run_sql_source1 "alter table ${shardddl1}.${tb1} change a a bigint default 10;"
run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;"
Expand All @@ -700,13 +704,18 @@ function DM_RemoveLock_CASE() {
function DM_RemoveLock() {
ps aux | grep dm-master |awk '{print $2}'|xargs kill || true
check_port_offline $MASTER_PORT1 20
export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(10)"
export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(30)"
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member -w" \
"bound" 2

run_case RemoveLock "double-source-pessimistic" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
"clean_table" "pessimistic"
run_case RemoveLock "double-source-optimistic" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
Expand Down