From dc72ee8bf4a83c97b4962102649608899704c77c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 27 Jul 2020 13:51:14 +0800 Subject: [PATCH] worker, utils: show same master status in one response #817 (#830) --- dm/worker/server.go | 75 ++++++++++++++++++++++++++ dm/worker/server_test.go | 110 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+) diff --git a/dm/worker/server.go b/dm/worker/server.go index 1ac211a781..25b6c2ce58 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -522,6 +522,9 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* if w.relayHolder != nil { sourceStatus.RelayStatus = w.relayHolder.Status() } + + unifyMasterBinlogPos(resp, w.cfg.EnableGTID) + if len(resp.SubTaskStatus) == 0 { resp.Msg = "no sub task started" } @@ -901,3 +904,75 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) location := checkpoint.GlobalPoint() return &location.Position, nil } + +// unifyMasterBinlogPos eliminates different masterBinlog in one response +// see https://github.com/pingcap/dm/issues/727 +func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) { + var ( + syncStatus []*pb.SubTaskStatus_Sync + syncMasterBinlog []*mysql.Position + lastestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check + relayMasterBinlog *mysql.Position + ) + + // uninitialized mysql.Position is less than any initialized mysql.Position + if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped { + var err error + relayMasterBinlog, err = utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.MasterBinlog) + if err != nil { + log.L().Error("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + lastestMasterBinlog = *relayMasterBinlog + } + + for _, stStatus := range resp.SubTaskStatus { + if stStatus.Unit == pb.UnitType_Sync { + s := stStatus.Status.(*pb.SubTaskStatus_Sync) + syncStatus = append(syncStatus, s) + + position, err := utils.DecodeBinlogPosition(s.Sync.MasterBinlog) + if err != nil { + log.L().Error("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + if lastestMasterBinlog.Compare(*position) < 0 { + lastestMasterBinlog = *position + } + syncMasterBinlog = append(syncMasterBinlog, position) + } + } + + // re-check relay + if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped && + lastestMasterBinlog.Compare(*relayMasterBinlog) != 0 { + + resp.SourceStatus.RelayStatus.MasterBinlog = lastestMasterBinlog.String() + + // if enableGTID, modify output binlog position doesn't affect RelayCatchUpMaster, skip check + if !enableGTID { + relayPos, err := utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.RelayBinlog) + if err != nil { + log.L().Error("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + catchUp := lastestMasterBinlog.Compare(*relayPos) == 0 + + resp.SourceStatus.RelayStatus.RelayCatchUpMaster = catchUp + } + } + // re-check syncer + for i, sStatus := range syncStatus { + if lastestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 { + syncerPos, err := utils.DecodeBinlogPosition(sStatus.Sync.SyncerBinlog) + if err != nil { + log.L().Error("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + synced := lastestMasterBinlog.Compare(*syncerPos) == 0 + + sStatus.Sync.MasterBinlog = lastestMasterBinlog.String() + sStatus.Sync.Synced = synced + } + } +} diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 9565056167..fe25c2b039 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -452,6 +452,116 @@ func (t *testServer) TestQueryError(c *C) { c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`) } +func (t *testServer) TestUnifyMasterBinlogPos(c *C) { + var ( + pos1 = "(bin.000001, 3134)" + pos2 = "(bin.000001, 3234)" + pos3 = "(bin.000001, 3334)" + pos4 = "(bin.000001, 3434)" + ) + + // 1. should modify nothing + resp := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Name: "test", + Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"}, + }}, + SourceStatus: &pb.SourceStatus{}, + } + resp2 := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Name: "test", + Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"}, + }}, + SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{ + Stage: pb.Stage_Stopped, + }}, + } + resp3 := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Name: "test", + Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"}, + }}, + SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{ + MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true, + }}, + } + resp4 := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Unit: pb.UnitType_Load, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}}, + }}, + SourceStatus: &pb.SourceStatus{}, + } + + for _, r := range []*pb.QueryStatusResponse{resp, resp2, resp3, resp4} { + // clone resp + bytes, _ := r.Marshal() + originReps := &pb.QueryStatusResponse{} + err := originReps.Unmarshal(bytes) + c.Assert(err, IsNil) + + unifyMasterBinlogPos(r, false) + c.Assert(r, DeepEquals, originReps) + } + + // 2. could work on multiple status + resp = &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Unit: pb.UnitType_Load, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}}, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}}, + }}, + SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{ + MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true, + }}, + } + unifyMasterBinlogPos(resp, false) + + sync1 := resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync1.MasterBinlog, Equals, pos4) + c.Assert(sync1.Synced, IsFalse) + sync2 := resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync2.MasterBinlog, Equals, pos4) + c.Assert(sync2.Synced, IsFalse) + relay := resp.SourceStatus.RelayStatus + c.Assert(relay.MasterBinlog, Equals, pos4) + c.Assert(relay.RelayCatchUpMaster, IsFalse) + + // 3. test unifyMasterBinlogPos(..., enableGTID = true) + resp = &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Unit: pb.UnitType_Load, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}}, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}}, + }}, + SourceStatus: &pb.SourceStatus{RelayStatus: &pb.RelayStatus{ + MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true, + }}, + } + unifyMasterBinlogPos(resp, true) + + sync1 = resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync1.MasterBinlog, Equals, pos4) + c.Assert(sync1.Synced, IsFalse) + sync2 = resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync2.MasterBinlog, Equals, pos4) + c.Assert(sync2.Synced, IsFalse) + relay = resp.SourceStatus.RelayStatus + c.Assert(relay.MasterBinlog, Equals, pos4) + c.Assert(relay.RelayCatchUpMaster, IsTrue) +} + func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) { switch subTaskCfg.Name { case "test1":