Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
worker, utils: show same master status in one response #817 (#830)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jul 27, 2020
1 parent 763a438 commit dc72ee8
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 0 deletions.
75 changes: 75 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
if w.relayHolder != nil {
sourceStatus.RelayStatus = w.relayHolder.Status()
}

unifyMasterBinlogPos(resp, w.cfg.EnableGTID)

if len(resp.SubTaskStatus) == 0 {
resp.Msg = "no sub task started"
}
Expand Down Expand Up @@ -901,3 +904,75 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
location := checkpoint.GlobalPoint()
return &location.Position, nil
}

// unifyMasterBinlogPos eliminates different masterBinlog in one response
// see https://github.com/pingcap/dm/issues/727
func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) {
var (
syncStatus []*pb.SubTaskStatus_Sync
syncMasterBinlog []*mysql.Position
lastestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check
relayMasterBinlog *mysql.Position
)

// uninitialized mysql.Position is less than any initialized mysql.Position
if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped {
var err error
relayMasterBinlog, err = utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.MasterBinlog)
if err != nil {
log.L().Error("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
lastestMasterBinlog = *relayMasterBinlog
}

for _, stStatus := range resp.SubTaskStatus {
if stStatus.Unit == pb.UnitType_Sync {
s := stStatus.Status.(*pb.SubTaskStatus_Sync)
syncStatus = append(syncStatus, s)

position, err := utils.DecodeBinlogPosition(s.Sync.MasterBinlog)
if err != nil {
log.L().Error("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
if lastestMasterBinlog.Compare(*position) < 0 {
lastestMasterBinlog = *position
}
syncMasterBinlog = append(syncMasterBinlog, position)
}
}

// re-check relay
if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped &&
lastestMasterBinlog.Compare(*relayMasterBinlog) != 0 {

resp.SourceStatus.RelayStatus.MasterBinlog = lastestMasterBinlog.String()

// if enableGTID, modify output binlog position doesn't affect RelayCatchUpMaster, skip check
if !enableGTID {
relayPos, err := utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.RelayBinlog)
if err != nil {
log.L().Error("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
catchUp := lastestMasterBinlog.Compare(*relayPos) == 0

resp.SourceStatus.RelayStatus.RelayCatchUpMaster = catchUp
}
}
// re-check syncer
for i, sStatus := range syncStatus {
if lastestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 {
syncerPos, err := utils.DecodeBinlogPosition(sStatus.Sync.SyncerBinlog)
if err != nil {
log.L().Error("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
synced := lastestMasterBinlog.Compare(*syncerPos) == 0

sStatus.Sync.MasterBinlog = lastestMasterBinlog.String()
sStatus.Sync.Synced = synced
}
}
}
110 changes: 110 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,116 @@ func (t *testServer) TestQueryError(c *C) {
c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`)
}

func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
var (
pos1 = "(bin.000001, 3134)"
pos2 = "(bin.000001, 3234)"
pos3 = "(bin.000001, 3334)"
pos4 = "(bin.000001, 3434)"
)

// 1. should modify nothing
resp := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
SourceStatus: &pb.SourceStatus{},
}
resp2 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
Stage: pb.Stage_Stopped,
}},
}
resp3 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
}},
}
resp4 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}},
SourceStatus: &pb.SourceStatus{},
}

for _, r := range []*pb.QueryStatusResponse{resp, resp2, resp3, resp4} {
// clone resp
bytes, _ := r.Marshal()
originReps := &pb.QueryStatusResponse{}
err := originReps.Unmarshal(bytes)
c.Assert(err, IsNil)

unifyMasterBinlogPos(r, false)
c.Assert(r, DeepEquals, originReps)
}

// 2. could work on multiple status
resp = &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
}},
}
unifyMasterBinlogPos(resp, false)

sync1 := resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync1.MasterBinlog, Equals, pos4)
c.Assert(sync1.Synced, IsFalse)
sync2 := resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync2.MasterBinlog, Equals, pos4)
c.Assert(sync2.Synced, IsFalse)
relay := resp.SourceStatus.RelayStatus
c.Assert(relay.MasterBinlog, Equals, pos4)
c.Assert(relay.RelayCatchUpMaster, IsFalse)

// 3. test unifyMasterBinlogPos(..., enableGTID = true)
resp = &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
}},
}
unifyMasterBinlogPos(resp, true)

sync1 = resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync1.MasterBinlog, Equals, pos4)
c.Assert(sync1.Synced, IsFalse)
sync2 = resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync2.MasterBinlog, Equals, pos4)
c.Assert(sync2.Synced, IsFalse)
relay = resp.SourceStatus.RelayStatus
c.Assert(relay.MasterBinlog, Equals, pos4)
c.Assert(relay.RelayCatchUpMaster, IsTrue)
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
Expand Down

0 comments on commit dc72ee8

Please sign in to comment.