Skip to content

Commit

Permalink
openapi(dm): support show task status in list task API (#3971)
Browse files Browse the repository at this point in the history
ref #3583
  • Loading branch information
Ehco1996 authored Jan 4, 2022
1 parent 1df27c6 commit 35dc03c
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 124 deletions.
167 changes: 99 additions & 68 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,33 @@ func (s *Server) DMAPIDeleteTask(c *gin.Context, taskName string, params openapi
}

// DMAPIGetTaskList url is:(GET /api/v1/tasks).
func (s *Server) DMAPIGetTaskList(c *gin.Context) {
func (s *Server) DMAPIGetTaskList(c *gin.Context, params openapi.DMAPIGetTaskListParams) {
// get sub task config by task name task name->source name->subtask config
subTaskConfigMap := s.scheduler.GetSubTaskCfgs()
taskList := config.SubTaskConfigsToOpenAPITask(subTaskConfigMap)

// fill status
if params.WithStatus != nil && *params.WithStatus {
// get source list for all task
sourceNameM := make(map[string]struct{}) // use map to avoid duplicate source name
for _, task := range taskList {
for _, sourceConf := range task.SourceConfig.SourceConf {
sourceNameM[sourceConf.SourceName] = struct{}{}
}
}
sourceList := utils.SetToSlice(sourceNameM)
// get status from workers
workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, "", false)
// fill status for every task
for idx := range taskList {
subTaskStatusList, err := getOpenAPISubtaskStatusByTaskName(taskList[idx].Name, workerStatusList)
if err != nil {
_ = c.Error(err)
return
}
taskList[idx].StatusList = &subTaskStatusList
}
}
resp := openapi.GetTaskListResponse{Total: len(taskList), Data: taskList}
c.IndentedJSON(http.StatusOK, resp)
}
Expand All @@ -548,73 +571,10 @@ func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params open
}
// 2. get status from workers
workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, taskName, specifiedSource)
subTaskStatusList := make([]openapi.SubTaskStatus, len(workerStatusList))
for i, workerStatus := range workerStatusList {
if workerStatus == nil || workerStatus.SourceStatus == nil {
// this should not happen unless the rpc in the worker server has been modified
_ = c.Error(terror.ErrOpenAPICommonError.New("worker's query-status response is nil"))
return
}
sourceStatus := workerStatus.SourceStatus
// find right task name
var subTaskStatus *pb.SubTaskStatus
for _, cfg := range workerStatus.SubTaskStatus {
if cfg.Name == taskName {
subTaskStatus = cfg
}
}
if subTaskStatus == nil {
// this may not happen
_ = c.Error(terror.ErrOpenAPICommonError.Generatef("can not find subtask status task name: %s.", taskName))
return
}
openapiSubTaskStatus := openapi.SubTaskStatus{
Name: taskName,
SourceName: sourceStatus.GetSource(),
WorkerName: sourceStatus.GetWorker(),
Stage: subTaskStatus.GetStage().String(),
Unit: subTaskStatus.GetUnit().String(),
UnresolvedDdlLockId: &subTaskStatus.UnresolvedDDLLockID,
}
// add load status
if loadS := subTaskStatus.GetLoad(); loadS != nil {
openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{
FinishedBytes: loadS.FinishedBytes,
MetaBinlog: loadS.MetaBinlog,
MetaBinlogGtid: loadS.MetaBinlogGTID,
Progress: loadS.Progress,
TotalBytes: loadS.TotalBytes,
}
}
// add syncer status
if syncerS := subTaskStatus.GetSync(); syncerS != nil {
openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{
BinlogType: syncerS.GetBinlogType(),
BlockingDdls: syncerS.GetBlockingDDLs(),
MasterBinlog: syncerS.GetMasterBinlog(),
MasterBinlogGtid: syncerS.GetMasterBinlogGtid(),
RecentTps: syncerS.RecentTps,
SecondsBehindMaster: syncerS.SecondsBehindMaster,
Synced: syncerS.Synced,
SyncerBinlog: syncerS.SyncerBinlog,
SyncerBinlogGtid: syncerS.SyncerBinlogGtid,
TotalEvents: syncerS.TotalEvents,
TotalTps: syncerS.TotalTps,
}
if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups))
for i, unResolvedGroup := range unResolvedGroups {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{
DdlList: unResolvedGroup.DDLs,
FirstLocation: unResolvedGroup.FirstLocation,
Synced: unResolvedGroup.Synced,
Target: unResolvedGroup.Target,
Unsynced: unResolvedGroup.Unsynced,
}
}
}
}
subTaskStatusList[i] = openapiSubTaskStatus
subTaskStatusList, err := getOpenAPISubtaskStatusByTaskName(taskName, workerStatusList)
if err != nil {
_ = c.Error(err)
return
}
resp := openapi.GetTaskStatusResponse{Total: len(subTaskStatusList), Data: subTaskStatusList}
c.IndentedJSON(http.StatusOK, resp)
Expand Down Expand Up @@ -958,3 +918,74 @@ func terrorHTTPErrorHandler() gin.HandlerFunc {
c.IndentedJSON(http.StatusBadRequest, openapi.ErrorWithMessage{ErrorMsg: msg, ErrorCode: code})
}
}

func getOpenAPISubtaskStatusByTaskName(taskName string,
workerStatusList []*pb.QueryStatusResponse) ([]openapi.SubTaskStatus, error) {
subTaskStatusList := make([]openapi.SubTaskStatus, 0, len(workerStatusList))
for _, workerStatus := range workerStatusList {
if workerStatus == nil || workerStatus.SourceStatus == nil {
// this should not happen unless the rpc in the worker server has been modified
return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil")
}
sourceStatus := workerStatus.SourceStatus
// find right task name
var subTaskStatus *pb.SubTaskStatus
for _, cfg := range workerStatus.SubTaskStatus {
if cfg.Name == taskName {
subTaskStatus = cfg
}
}
if subTaskStatus == nil {
// not find
continue
}
openapiSubTaskStatus := openapi.SubTaskStatus{
Name: taskName,
SourceName: sourceStatus.GetSource(),
WorkerName: sourceStatus.GetWorker(),
Stage: subTaskStatus.GetStage().String(),
Unit: subTaskStatus.GetUnit().String(),
UnresolvedDdlLockId: &subTaskStatus.UnresolvedDDLLockID,
}
// add load status
if loadS := subTaskStatus.GetLoad(); loadS != nil {
openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{
FinishedBytes: loadS.FinishedBytes,
MetaBinlog: loadS.MetaBinlog,
MetaBinlogGtid: loadS.MetaBinlogGTID,
Progress: loadS.Progress,
TotalBytes: loadS.TotalBytes,
}
}
// add sync status
if syncerS := subTaskStatus.GetSync(); syncerS != nil {
openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{
BinlogType: syncerS.GetBinlogType(),
BlockingDdls: syncerS.GetBlockingDDLs(),
MasterBinlog: syncerS.GetMasterBinlog(),
MasterBinlogGtid: syncerS.GetMasterBinlogGtid(),
RecentTps: syncerS.RecentTps,
SecondsBehindMaster: syncerS.SecondsBehindMaster,
Synced: syncerS.Synced,
SyncerBinlog: syncerS.SyncerBinlog,
SyncerBinlogGtid: syncerS.SyncerBinlogGtid,
TotalEvents: syncerS.TotalEvents,
TotalTps: syncerS.TotalTps,
}
if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups))
for i, unResolvedGroup := range unResolvedGroups {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{
DdlList: unResolvedGroup.DDLs,
FirstLocation: unResolvedGroup.FirstLocation,
Synced: unResolvedGroup.Synced,
Target: unResolvedGroup.Target,
Unsynced: unResolvedGroup.Unsynced,
}
}
}
}
subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus)
}
return subTaskStatusList, nil
}
18 changes: 17 additions & 1 deletion dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,22 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(resultTaskStatusWithStatus, check.DeepEquals, resultTaskStatus)

// list task with status
result = testutil.NewRequest().Get(taskURL+"?with_status=true").GoWithHTTPHandler(t.testT, s.openapiHandles)
// check http status code
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultListTask openapi.GetTaskListResponse
err = result.UnmarshalBodyToObject(&resultListTask)
c.Assert(err, check.IsNil)
c.Assert(resultListTask.Data, check.HasLen, 1)
c.Assert(resultListTask.Total, check.Equals, 1)
c.Assert(resultListTask.Data[0].StatusList, check.NotNil)
statusList := *resultListTask.Data[0].StatusList
c.Assert(statusList, check.HasLen, 1)
status := statusList[0]
c.Assert(status.WorkerName, check.Equals, workerName1)
c.Assert(status.Name, check.Equals, task.Name)

// stop task
result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", taskURL, task.Name)).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusNoContent)
Expand Down Expand Up @@ -845,7 +861,7 @@ func mockTaskQueryStatus(
}
mockWorkerClient.EXPECT().QueryStatus(
gomock.Any(),
&pb.QueryStatusRequest{Name: taskName},
gomock.Any(),
).Return(queryResp, nil).MaxTimes(maxRetryNum)
}

Expand Down
32 changes: 25 additions & 7 deletions dm/openapi/gen.client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 35dc03c

Please sign in to comment.