Skip to content

Commit

Permalink
Merge branch 'master' into model2-ut
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Nov 11, 2022
2 parents c0a60cc + a3fff42 commit 8a398a4
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
// If MDL is enabled, updatedMap is used to check if all the servers report the least version.
// updatedMap is initialed to record all the server in every loop. We delete a server from the map if it gets the metadata lock(the key version equal the given version.
// updatedMap should be empty if all the servers get the metadata lock.
updatedMap := make(map[string]struct{})
updatedMap := make(map[string]string)
for {
if util.IsContextDone(ctx) {
// ctx is canceled or timeout.
Expand All @@ -278,9 +278,23 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
if err != nil {
return err
}
updatedMap = make(map[string]struct{})
updatedMap = make(map[string]string)
instance2id := make(map[string]string)

// Set updatedMap according to the serverInfos, and remove some invalid serverInfos.
for _, info := range serverInfos {
updatedMap[info.ID] = struct{}{}
instance := fmt.Sprintf("%s:%d", info.IP, info.Port)
if id, ok := instance2id[instance]; ok {
if info.StartTimestamp > serverInfos[id].StartTimestamp {
// Replace it.
delete(updatedMap, id)
updatedMap[info.ID] = fmt.Sprintf("instance ip %s, port %d, id %s", info.IP, info.Port, info.ID)
instance2id[instance] = info.ID
}
} else {
updatedMap[info.ID] = fmt.Sprintf("instance ip %s, port %d, id %s", info.IP, info.Port, info.ID)
instance2id[instance] = info.ID
}
}
}

Expand Down Expand Up @@ -315,6 +329,9 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
}
if len(updatedMap) > 0 {
succ = false
for _, info := range updatedMap {
logutil.BgLogger().Info("[ddl] syncer check all versions, someone is not synced", zap.String("info", info), zap.Any("ddl id", jobID), zap.Any("ver", latestVer))
}
}
} else {
for _, kv := range resp.Kvs {
Expand All @@ -337,7 +354,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
notMatchVerCnt++
break
}
updatedMap[string(kv.Key)] = struct{}{}
updatedMap[string(kv.Key)] = ""
}
}

Expand Down

0 comments on commit 8a398a4

Please sign in to comment.