Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
shardddl: add lock and version for optimistic info (#1035) (#1051)
Browse files Browse the repository at this point in the history
Co-authored-by: gmhdbjd <gmhdbjd@gmail.com>
  • Loading branch information
ti-srebot and GMHDBJD authored Sep 17, 2020
1 parent 5da777d commit 062420d
Show file tree
Hide file tree
Showing 13 changed files with 470 additions and 114 deletions.
64 changes: 46 additions & 18 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -387,11 +388,15 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info)
}
o.logger.Info("receive a shard DDL info", zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted))

// avoid new ddl added while previous ddl resolved and remove lock
// change lock granularity if needed
o.mu.Lock()
if info.IsDeleted {
lock := o.lk.FindLockByInfo(info)
if lock == nil {
// this often happen after the lock resolved.
o.logger.Debug("lock for info not found", zap.Stringer("info", info))
o.mu.Unlock()
continue
}
// handle `DROP TABLE`, need to remove the table schema from the lock,
Expand All @@ -400,6 +405,7 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info)
o.logger.Debug("the table name remove from the table keeper", zap.Bool("removed", removed), zap.Stringer("info", info))
removed = o.tk.RemoveTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable)
o.logger.Debug("a table removed for info from the lock", zap.Bool("removed", removed), zap.Stringer("info", info))
o.mu.Unlock()
continue
}

Expand All @@ -424,8 +430,10 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info)
if err != nil {
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
o.mu.Unlock()
continue
}
o.mu.Unlock()
}
}
}
Expand All @@ -446,9 +454,13 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.
continue
}

// avoid new ddl added while previous ddl resolved and remove lock
// change lock granularity if needed
o.mu.Lock()
lock := o.lk.FindLock(op.ID)
if lock == nil {
o.logger.Warn("no lock for the shard DDL lock operation exist", zap.Stringer("operation", op))
o.mu.Unlock()
continue
}

Expand All @@ -459,17 +471,21 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.
o.logger.Info("mark operation for a table as done", zap.Bool("done", done), zap.Stringer("operation", op))
if !lock.IsResolved() {
o.logger.Info("the lock is still not resolved", zap.Stringer("operation", op))
o.mu.Unlock()
continue
}

// the lock has done, remove the lock.
o.logger.Info("the lock for the shard DDL lock operation has been resolved", zap.Stringer("operation", op))
err := o.removeLock(lock)
deleted, err := o.removeLock(lock)
if err != nil {
o.logger.Error("fail to delete the shard DDL infos and lock operations", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
}
o.logger.Info("the shard DDL infos and lock operations have been cleared", zap.Stringer("operation", op))
if deleted {
o.logger.Info("the shard DDL infos and lock operations have been cleared", zap.Stringer("operation", op))
}
o.mu.Unlock()
}
}
}
Expand Down Expand Up @@ -500,17 +516,15 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk

lock := o.lk.FindLock(lockID)
if lock == nil {
// this aways means others remove the lock concurrently when resolved ddl.
// simply try again.
o.logger.Warn("lock not found after try sync for shard DDL info, try handle lock again", zap.String("lock", lockID), zap.Stringer("info", info))
return o.handleLock(info, tts, skipDone)
// should not happen
return terror.ErrMasterLockNotFound.Generate(lockID)
}

// check whether the lock has resolved.
if lock.IsResolved() {
// remove all operations for this shard DDL lock.
// this is to handle the case where dm-master exit before deleting operations for them.
err = o.removeLock(lock)
_, err = o.removeLock(lock)
if err != nil {
return err
}
Expand All @@ -528,35 +542,49 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
}

// removeLock removes the lock in memory and its information in etcd.
func (o *Optimist) removeLock(lock *optimism.Lock) error {
err := o.deleteInfosOps(lock)
func (o *Optimist) removeLock(lock *optimism.Lock) (bool, error) {
failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd", zap.String("failpoint", "SleepWhenRemoveLock"))
time.Sleep(time.Duration(t) * time.Second)
})
deleted, err := o.deleteInfosOps(lock)
if err != nil {
return err
return deleted, err
}
if !deleted {
return false, nil
}
o.lk.RemoveLock(lock.ID)
metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
return nil
return true, nil
}

// deleteInfosOps DELETEs shard DDL lock info and operations.
func (o *Optimist) deleteInfosOps(lock *optimism.Lock) error {
func (o *Optimist) deleteInfosOps(lock *optimism.Lock) (bool, error) {
infos := make([]optimism.Info, 0)
ops := make([]optimism.Operation, 0)
for source, schemaTables := range lock.Ready() {
for schema, tables := range schemaTables {
for table := range tables {
// NOTE: we rely on only `task`, `source`, `upSchema`, and `upTable` used for deletion.
infos = append(infos, optimism.NewInfo(lock.Task, source, schema, table, lock.DownSchema, lock.DownTable, nil, nil, nil))
// NOTE: we rely on only `task`, `source`, `upSchema`, `upTable` and `Version` used for deletion.
info := optimism.NewInfo(lock.Task, source, schema, table, lock.DownSchema, lock.DownTable, nil, nil, nil)
info.Version = lock.GetVersion(source, schema, table)
infos = append(infos, info)
ops = append(ops, optimism.NewOperation(lock.ID, lock.Task, source, schema, table, nil, optimism.ConflictNone, false))
}
}
}
// NOTE: we rely on only `task`, `downSchema`, and `downTable` used for deletion.
initSchema := optimism.NewInitSchema(lock.Task, lock.DownSchema, lock.DownTable, nil)
rev, err := optimism.DeleteInfosOperationsSchema(o.cli, infos, ops, initSchema)
rev, deleted, err := optimism.DeleteInfosOperationsSchema(o.cli, infos, ops, initSchema)
if err != nil {
return err
return deleted, err
}
o.logger.Info("delete shard DDL infos and lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev))
return nil
if deleted {
o.logger.Info("delete shard DDL infos and lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev))
} else {
o.logger.Info("fail to delete shard DDL infos and lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev))
}
return deleted, nil
}
12 changes: 6 additions & 6 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
// NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction.
switch ev := e.Event.(type) {
case *replication.QueryEvent:
parser2, err := event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err))
parser2, err2 := event.GetParserForStatusVars(ev.StatusVars)
if err2 != nil {
log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err2))
parser2 = parser.New()
}

Expand All @@ -74,9 +74,9 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
// GTID not enabled, can't get GTIDs for the position.
return nil, errors.Errorf("should have a GTIDEvent before the DDL QueryEvent %+v", e.Header)
}
err = latestGSet.Update(nextGTIDStr)
if err != nil {
return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr)
err2 = latestGSet.Update(nextGTIDStr)
if err2 != nil {
return nil, terror.Annotatef(err2, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr)
}
latestPos = e.Header.LogPos
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func init() {
DefaultDBProvider = &DefaultDBProviderImpl{}
}

// mock is used in unit test
var mock sqlmock.Sqlmock
// mockDB is used in unit test
var mockDB sqlmock.Sqlmock

// Apply will build BaseDB with DBConfig
func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) {
Expand Down Expand Up @@ -105,9 +105,9 @@ func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) {
}
failpoint.Inject("failDBPing", func(_ failpoint.Value) {
db.Close()
db, mock, _ = sqlmock.New()
mock.ExpectPing()
mock.ExpectClose()
db, mockDB, _ = sqlmock.New()
mockDB.ExpectPing()
mockDB.ExpectClose()
})

err = db.Ping()
Expand Down
2 changes: 1 addition & 1 deletion pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func (t *testBaseDBSuite) TestFailDBPing(c *C) {
c.Assert(db, IsNil)
c.Assert(err, NotNil)

err = mock.ExpectationsWereMet()
err = mockDB.ExpectationsWereMet()
c.Assert(err, IsNil)
}
5 changes: 5 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Info struct {
// only used to report to the caller of the watcher, do not marsh it.
// if it's true, it means the Info has been deleted in etcd.
IsDeleted bool `json:"-"`

// only set it when get/watch from etcd
Version int64 `json:"-"`
}

// NewInfo creates a new Info instance.
Expand Down Expand Up @@ -125,6 +128,7 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[stri
if err2 != nil {
return nil, 0, err2
}
info.Version = kv.Version

if _, ok := ifm[info.Task]; !ok {
ifm[info.Task] = make(map[string]map[string]map[string]Info)
Expand Down Expand Up @@ -174,6 +178,7 @@ func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64,
switch ev.Type {
case mvccpb.PUT:
info, err = infoFromJSON(string(ev.Kv.Value))
info.Version = ev.Kv.Version
case mvccpb.DELETE:
info, err = infoFromJSON(string(ev.PrevKv.Value))
info.IsDeleted = true
Expand Down
56 changes: 43 additions & 13 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1], HasLen, 1)
c.Assert(ifm[task1][source1], HasLen, 1)
c.Assert(ifm[task1][source1][upSchema], HasLen, 1)
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11)
i11WithVer := i11
i11WithVer.Version = 2
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)

// put another key and get again with 2 info.
rev4, err := PutInfo(etcdTestCli, i12)
Expand All @@ -136,8 +138,10 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm, HasLen, 1)
c.Assert(ifm, HasKey, task1)
c.Assert(ifm[task1], HasLen, 2)
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11)
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12)
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
i12WithVer := i12
i12WithVer.Version = 1
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer)

// start the watcher.
wch := make(chan Info, 10)
Expand All @@ -149,24 +153,50 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), watchTimeout)
defer cancel()
WatchInfo(ctx, etcdTestCli, rev4+1, wch, ech) // revision+1
close(wch) // close the chan
close(ech)
}()

// put another key for a different task.
// version start from 1
_, err = PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
wg.Wait()
infoWithVer := <-wch
i21WithVer := i21
i21WithVer.Version = 1
c.Assert(infoWithVer, DeepEquals, i21WithVer)
c.Assert(len(ech), Equals, 0)

// watch should only get i21.
c.Assert(len(wch), Equals, 1)
c.Assert(<-wch, DeepEquals, i21)
// put again
// version increase
_, err = PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
infoWithVer = <-wch
i21WithVer.Version++
c.Assert(infoWithVer, DeepEquals, i21WithVer)
c.Assert(len(ech), Equals, 0)

// delete i12.
deleteOp := deleteInfoOp(i12)
// delete i21.
deleteOp := deleteInfoOp(i21)
resp, err := etcdTestCli.Txn(context.Background()).Then(deleteOp).Commit()
c.Assert(err, IsNil)
<-wch

// put again
// version reset to 1
_, err = PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
infoWithVer = <-wch
i21WithVer.Version = 1
c.Assert(infoWithVer, DeepEquals, i21WithVer)
c.Assert(len(ech), Equals, 0)

close(wch) // close the chan
close(ech)
wg.Wait()

// delete i12.
deleteOp = deleteInfoOp(i12)
resp, err = etcdTestCli.Txn(context.Background()).Then(deleteOp).Commit()
c.Assert(err, IsNil)

// get again.
ifm, _, err = GetAllInfo(etcdTestCli)
Expand All @@ -175,9 +205,9 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm, HasKey, task1)
c.Assert(ifm, HasKey, task2)
c.Assert(ifm[task1], HasLen, 1)
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11)
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
c.Assert(ifm[task2], HasLen, 1)
c.Assert(ifm[task2][source1][upSchema][upTable], DeepEquals, i21)
c.Assert(ifm[task2][source1][upSchema][upTable], DeepEquals, i21WithVer)

// watch the deletion for i12.
wch = make(chan Info, 10)
Expand Down
2 changes: 1 addition & 1 deletion pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e
l = lk.locks[lockID]
}

newDDLs, err := l.TrySync(info.Source, info.UpSchema, info.UpTable, info.DDLs, info.TableInfoAfter, tts)
newDDLs, err := l.TrySync(info.Source, info.UpSchema, info.UpTable, info.DDLs, info.TableInfoAfter, tts, info.Version)
return lockID, newDDLs, err
}

Expand Down
Loading

0 comments on commit 062420d

Please sign in to comment.