Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker, utils: show same master status in one response (#817) #830

Merged
merged 2 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
if w.relayHolder != nil {
sourceStatus.RelayStatus = w.relayHolder.Status()
}

unifyMasterBinlogPos(resp, w.cfg.EnableGTID)

if len(resp.SubTaskStatus) == 0 {
resp.Msg = "no sub task started"
}
Expand Down Expand Up @@ -901,3 +904,75 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
location := checkpoint.GlobalPoint()
return &location.Position, nil
}

// unifyMasterBinlogPos eliminates different masterBinlog in one response
// see https://github.com/pingcap/dm/issues/727
func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) {
var (
syncStatus []*pb.SubTaskStatus_Sync
syncMasterBinlog []*mysql.Position
lastestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check
relayMasterBinlog *mysql.Position
)

// uninitialized mysql.Position is less than any initialized mysql.Position
if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped {
var err error
relayMasterBinlog, err = utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.MasterBinlog)
if err != nil {
log.L().Error("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
lastestMasterBinlog = *relayMasterBinlog
}

for _, stStatus := range resp.SubTaskStatus {
if stStatus.Unit == pb.UnitType_Sync {
s := stStatus.Status.(*pb.SubTaskStatus_Sync)
syncStatus = append(syncStatus, s)

position, err := utils.DecodeBinlogPosition(s.Sync.MasterBinlog)
if err != nil {
log.L().Error("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
if lastestMasterBinlog.Compare(*position) < 0 {
lastestMasterBinlog = *position
}
syncMasterBinlog = append(syncMasterBinlog, position)
}
}

// re-check relay
if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped &&
lastestMasterBinlog.Compare(*relayMasterBinlog) != 0 {

resp.SourceStatus.RelayStatus.MasterBinlog = lastestMasterBinlog.String()

// if enableGTID, modify output binlog position doesn't affect RelayCatchUpMaster, skip check
if !enableGTID {
relayPos, err := utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.RelayBinlog)
if err != nil {
log.L().Error("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
catchUp := lastestMasterBinlog.Compare(*relayPos) == 0

resp.SourceStatus.RelayStatus.RelayCatchUpMaster = catchUp
}
}
// re-check syncer
for i, sStatus := range syncStatus {
if lastestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 {
syncerPos, err := utils.DecodeBinlogPosition(sStatus.Sync.SyncerBinlog)
if err != nil {
log.L().Error("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
synced := lastestMasterBinlog.Compare(*syncerPos) == 0

sStatus.Sync.MasterBinlog = lastestMasterBinlog.String()
sStatus.Sync.Synced = synced
}
}
}
110 changes: 110 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,116 @@ func (t *testServer) TestQueryError(c *C) {
c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`)
}

func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
var (
pos1 = "(bin.000001, 3134)"
pos2 = "(bin.000001, 3234)"
pos3 = "(bin.000001, 3334)"
pos4 = "(bin.000001, 3434)"
)

// 1. should modify nothing
resp := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
SourceStatus: &pb.SourceStatus{},
}
resp2 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
Stage: pb.Stage_Stopped,
}},
}
resp3 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
}},
}
resp4 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}},
SourceStatus: &pb.SourceStatus{},
}

for _, r := range []*pb.QueryStatusResponse{resp, resp2, resp3, resp4} {
// clone resp
bytes, _ := r.Marshal()
originReps := &pb.QueryStatusResponse{}
err := originReps.Unmarshal(bytes)
c.Assert(err, IsNil)

unifyMasterBinlogPos(r, false)
c.Assert(r, DeepEquals, originReps)
}

// 2. could work on multiple status
resp = &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
}},
}
unifyMasterBinlogPos(resp, false)

sync1 := resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync1.MasterBinlog, Equals, pos4)
c.Assert(sync1.Synced, IsFalse)
sync2 := resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync2.MasterBinlog, Equals, pos4)
c.Assert(sync2.Synced, IsFalse)
relay := resp.SourceStatus.RelayStatus
c.Assert(relay.MasterBinlog, Equals, pos4)
c.Assert(relay.RelayCatchUpMaster, IsFalse)

// 3. test unifyMasterBinlogPos(..., enableGTID = true)
resp = &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}},
}},
SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
}},
}
unifyMasterBinlogPos(resp, true)

sync1 = resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync1.MasterBinlog, Equals, pos4)
c.Assert(sync1.Synced, IsFalse)
sync2 = resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync2.MasterBinlog, Equals, pos4)
c.Assert(sync2.Synced, IsFalse)
relay = resp.SourceStatus.RelayStatus
c.Assert(relay.MasterBinlog, Equals, pos4)
c.Assert(relay.RelayCatchUpMaster, IsTrue)
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
Expand Down