From 35dc03c988dba8565a4f763d99f8986991d86f9f Mon Sep 17 00:00:00 2001 From: Ehco Date: Tue, 4 Jan 2022 11:16:35 +0800 Subject: [PATCH] openapi(dm): support show task status in list task API (#3971) ref pingcap/tiflow#3583 --- dm/dm/master/openapi.go | 167 ++++++++++++--------- dm/dm/master/openapi_test.go | 18 ++- dm/openapi/gen.client.go | 32 +++- dm/openapi/gen.server.go | 97 +++++++----- dm/openapi/gen.types.go | 7 + dm/openapi/spec/dm.yaml | 12 ++ dm/tests/openapi/client/openapi_task_check | 24 ++- dm/tests/openapi/run.sh | 53 ++++++- 8 files changed, 286 insertions(+), 124 deletions(-) diff --git a/dm/dm/master/openapi.go b/dm/dm/master/openapi.go index 3fc03950243..96bf8169d80 100644 --- a/dm/dm/master/openapi.go +++ b/dm/dm/master/openapi.go @@ -521,10 +521,33 @@ func (s *Server) DMAPIDeleteTask(c *gin.Context, taskName string, params openapi } // DMAPIGetTaskList url is:(GET /api/v1/tasks). -func (s *Server) DMAPIGetTaskList(c *gin.Context) { +func (s *Server) DMAPIGetTaskList(c *gin.Context, params openapi.DMAPIGetTaskListParams) { // get sub task config by task name task name->source name->subtask config subTaskConfigMap := s.scheduler.GetSubTaskCfgs() taskList := config.SubTaskConfigsToOpenAPITask(subTaskConfigMap) + + // fill status + if params.WithStatus != nil && *params.WithStatus { + // get source list for all task + sourceNameM := make(map[string]struct{}) // use map to avoid duplicate source name + for _, task := range taskList { + for _, sourceConf := range task.SourceConfig.SourceConf { + sourceNameM[sourceConf.SourceName] = struct{}{} + } + } + sourceList := utils.SetToSlice(sourceNameM) + // get status from workers + workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, "", false) + // fill status for every task + for idx := range taskList { + subTaskStatusList, err := getOpenAPISubtaskStatusByTaskName(taskList[idx].Name, workerStatusList) + if err != nil { + _ = c.Error(err) + return + } + taskList[idx].StatusList = &subTaskStatusList + } + } resp := openapi.GetTaskListResponse{Total: len(taskList), Data: taskList} c.IndentedJSON(http.StatusOK, resp) } @@ -548,73 +571,10 @@ func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params open } // 2. get status from workers workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, taskName, specifiedSource) - subTaskStatusList := make([]openapi.SubTaskStatus, len(workerStatusList)) - for i, workerStatus := range workerStatusList { - if workerStatus == nil || workerStatus.SourceStatus == nil { - // this should not happen unless the rpc in the worker server has been modified - _ = c.Error(terror.ErrOpenAPICommonError.New("worker's query-status response is nil")) - return - } - sourceStatus := workerStatus.SourceStatus - // find right task name - var subTaskStatus *pb.SubTaskStatus - for _, cfg := range workerStatus.SubTaskStatus { - if cfg.Name == taskName { - subTaskStatus = cfg - } - } - if subTaskStatus == nil { - // this may not happen - _ = c.Error(terror.ErrOpenAPICommonError.Generatef("can not find subtask status task name: %s.", taskName)) - return - } - openapiSubTaskStatus := openapi.SubTaskStatus{ - Name: taskName, - SourceName: sourceStatus.GetSource(), - WorkerName: sourceStatus.GetWorker(), - Stage: subTaskStatus.GetStage().String(), - Unit: subTaskStatus.GetUnit().String(), - UnresolvedDdlLockId: &subTaskStatus.UnresolvedDDLLockID, - } - // add load status - if loadS := subTaskStatus.GetLoad(); loadS != nil { - openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{ - FinishedBytes: loadS.FinishedBytes, - MetaBinlog: loadS.MetaBinlog, - MetaBinlogGtid: loadS.MetaBinlogGTID, - Progress: loadS.Progress, - TotalBytes: loadS.TotalBytes, - } - } - // add syncer status - if syncerS := subTaskStatus.GetSync(); syncerS != nil { - openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{ - BinlogType: syncerS.GetBinlogType(), - BlockingDdls: syncerS.GetBlockingDDLs(), - MasterBinlog: syncerS.GetMasterBinlog(), - MasterBinlogGtid: syncerS.GetMasterBinlogGtid(), - RecentTps: syncerS.RecentTps, - SecondsBehindMaster: syncerS.SecondsBehindMaster, - Synced: syncerS.Synced, - SyncerBinlog: syncerS.SyncerBinlog, - SyncerBinlogGtid: syncerS.SyncerBinlogGtid, - TotalEvents: syncerS.TotalEvents, - TotalTps: syncerS.TotalTps, - } - if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 { - openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups)) - for i, unResolvedGroup := range unResolvedGroups { - openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{ - DdlList: unResolvedGroup.DDLs, - FirstLocation: unResolvedGroup.FirstLocation, - Synced: unResolvedGroup.Synced, - Target: unResolvedGroup.Target, - Unsynced: unResolvedGroup.Unsynced, - } - } - } - } - subTaskStatusList[i] = openapiSubTaskStatus + subTaskStatusList, err := getOpenAPISubtaskStatusByTaskName(taskName, workerStatusList) + if err != nil { + _ = c.Error(err) + return } resp := openapi.GetTaskStatusResponse{Total: len(subTaskStatusList), Data: subTaskStatusList} c.IndentedJSON(http.StatusOK, resp) @@ -958,3 +918,74 @@ func terrorHTTPErrorHandler() gin.HandlerFunc { c.IndentedJSON(http.StatusBadRequest, openapi.ErrorWithMessage{ErrorMsg: msg, ErrorCode: code}) } } + +func getOpenAPISubtaskStatusByTaskName(taskName string, + workerStatusList []*pb.QueryStatusResponse) ([]openapi.SubTaskStatus, error) { + subTaskStatusList := make([]openapi.SubTaskStatus, 0, len(workerStatusList)) + for _, workerStatus := range workerStatusList { + if workerStatus == nil || workerStatus.SourceStatus == nil { + // this should not happen unless the rpc in the worker server has been modified + return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") + } + sourceStatus := workerStatus.SourceStatus + // find right task name + var subTaskStatus *pb.SubTaskStatus + for _, cfg := range workerStatus.SubTaskStatus { + if cfg.Name == taskName { + subTaskStatus = cfg + } + } + if subTaskStatus == nil { + // not find + continue + } + openapiSubTaskStatus := openapi.SubTaskStatus{ + Name: taskName, + SourceName: sourceStatus.GetSource(), + WorkerName: sourceStatus.GetWorker(), + Stage: subTaskStatus.GetStage().String(), + Unit: subTaskStatus.GetUnit().String(), + UnresolvedDdlLockId: &subTaskStatus.UnresolvedDDLLockID, + } + // add load status + if loadS := subTaskStatus.GetLoad(); loadS != nil { + openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{ + FinishedBytes: loadS.FinishedBytes, + MetaBinlog: loadS.MetaBinlog, + MetaBinlogGtid: loadS.MetaBinlogGTID, + Progress: loadS.Progress, + TotalBytes: loadS.TotalBytes, + } + } + // add sync status + if syncerS := subTaskStatus.GetSync(); syncerS != nil { + openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{ + BinlogType: syncerS.GetBinlogType(), + BlockingDdls: syncerS.GetBlockingDDLs(), + MasterBinlog: syncerS.GetMasterBinlog(), + MasterBinlogGtid: syncerS.GetMasterBinlogGtid(), + RecentTps: syncerS.RecentTps, + SecondsBehindMaster: syncerS.SecondsBehindMaster, + Synced: syncerS.Synced, + SyncerBinlog: syncerS.SyncerBinlog, + SyncerBinlogGtid: syncerS.SyncerBinlogGtid, + TotalEvents: syncerS.TotalEvents, + TotalTps: syncerS.TotalTps, + } + if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 { + openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups)) + for i, unResolvedGroup := range unResolvedGroups { + openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{ + DdlList: unResolvedGroup.DDLs, + FirstLocation: unResolvedGroup.FirstLocation, + Synced: unResolvedGroup.Synced, + Target: unResolvedGroup.Target, + Unsynced: unResolvedGroup.Unsynced, + } + } + } + } + subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus) + } + return subTaskStatusList, nil +} diff --git a/dm/dm/master/openapi_test.go b/dm/dm/master/openapi_test.go index e929927e7cf..9a2c28c44cd 100644 --- a/dm/dm/master/openapi_test.go +++ b/dm/dm/master/openapi_test.go @@ -558,6 +558,22 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) { c.Assert(err, check.IsNil) c.Assert(resultTaskStatusWithStatus, check.DeepEquals, resultTaskStatus) + // list task with status + result = testutil.NewRequest().Get(taskURL+"?with_status=true").GoWithHTTPHandler(t.testT, s.openapiHandles) + // check http status code + c.Assert(result.Code(), check.Equals, http.StatusOK) + var resultListTask openapi.GetTaskListResponse + err = result.UnmarshalBodyToObject(&resultListTask) + c.Assert(err, check.IsNil) + c.Assert(resultListTask.Data, check.HasLen, 1) + c.Assert(resultListTask.Total, check.Equals, 1) + c.Assert(resultListTask.Data[0].StatusList, check.NotNil) + statusList := *resultListTask.Data[0].StatusList + c.Assert(statusList, check.HasLen, 1) + status := statusList[0] + c.Assert(status.WorkerName, check.Equals, workerName1) + c.Assert(status.Name, check.Equals, task.Name) + // stop task result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", taskURL, task.Name)).GoWithHTTPHandler(t.testT, s.openapiHandles) c.Assert(result.Code(), check.Equals, http.StatusNoContent) @@ -845,7 +861,7 @@ func mockTaskQueryStatus( } mockWorkerClient.EXPECT().QueryStatus( gomock.Any(), - &pb.QueryStatusRequest{Name: taskName}, + gomock.Any(), ).Return(queryResp, nil).MaxTimes(maxRetryNum) } diff --git a/dm/openapi/gen.client.go b/dm/openapi/gen.client.go index d4aebd80ee9..d98ff44c1d2 100644 --- a/dm/openapi/gen.client.go +++ b/dm/openapi/gen.client.go @@ -172,7 +172,7 @@ type ClientInterface interface { DMAPUpdateTaskConfig(ctx context.Context, taskName string, reqEditors ...RequestEditorFn) (*http.Response, error) // DMAPIGetTaskList request - DMAPIGetTaskList(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) + DMAPIGetTaskList(ctx context.Context, params *DMAPIGetTaskListParams, reqEditors ...RequestEditorFn) (*http.Response, error) // DMAPIStartTask request with any body DMAPIStartTaskWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) @@ -561,8 +561,8 @@ func (c *Client) DMAPUpdateTaskConfig(ctx context.Context, taskName string, reqE return c.Client.Do(req) } -func (c *Client) DMAPIGetTaskList(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) { - req, err := NewDMAPIGetTaskListRequest(c.Server) +func (c *Client) DMAPIGetTaskList(ctx context.Context, params *DMAPIGetTaskListParams, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewDMAPIGetTaskListRequest(c.Server, params) if err != nil { return nil, err } @@ -1582,7 +1582,7 @@ func NewDMAPUpdateTaskConfigRequest(server string, taskName string) (*http.Reque } // NewDMAPIGetTaskListRequest generates requests for DMAPIGetTaskList -func NewDMAPIGetTaskListRequest(server string) (*http.Request, error) { +func NewDMAPIGetTaskListRequest(server string, params *DMAPIGetTaskListParams) (*http.Request, error) { var err error serverURL, err := url.Parse(server) @@ -1600,6 +1600,24 @@ func NewDMAPIGetTaskListRequest(server string) (*http.Request, error) { return nil, err } + queryValues := queryURL.Query() + + if params.WithStatus != nil { + if queryFrag, err := runtime.StyleParamWithLocation("form", true, "with_status", runtime.ParamLocationQuery, *params.WithStatus); err != nil { + return nil, err + } else if parsed, err := url.ParseQuery(queryFrag); err != nil { + return nil, err + } else { + for k, v := range parsed { + for _, v2 := range v { + queryValues.Add(k, v2) + } + } + } + } + + queryURL.RawQuery = queryValues.Encode() + req, err := http.NewRequest("GET", queryURL.String(), nil) if err != nil { return nil, err @@ -2238,7 +2256,7 @@ type ClientWithResponsesInterface interface { DMAPUpdateTaskConfigWithResponse(ctx context.Context, taskName string, reqEditors ...RequestEditorFn) (*DMAPUpdateTaskConfigResponse, error) // DMAPIGetTaskList request - DMAPIGetTaskListWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*DMAPIGetTaskListResponse, error) + DMAPIGetTaskListWithResponse(ctx context.Context, params *DMAPIGetTaskListParams, reqEditors ...RequestEditorFn) (*DMAPIGetTaskListResponse, error) // DMAPIStartTask request with any body DMAPIStartTaskWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*DMAPIStartTaskResponse, error) @@ -3299,8 +3317,8 @@ func (c *ClientWithResponses) DMAPUpdateTaskConfigWithResponse(ctx context.Conte } // DMAPIGetTaskListWithResponse request returning *DMAPIGetTaskListResponse -func (c *ClientWithResponses) DMAPIGetTaskListWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*DMAPIGetTaskListResponse, error) { - rsp, err := c.DMAPIGetTaskList(ctx, reqEditors...) +func (c *ClientWithResponses) DMAPIGetTaskListWithResponse(ctx context.Context, params *DMAPIGetTaskListParams, reqEditors ...RequestEditorFn) (*DMAPIGetTaskListResponse, error) { + rsp, err := c.DMAPIGetTaskList(ctx, params, reqEditors...) if err != nil { return nil, err } diff --git a/dm/openapi/gen.server.go b/dm/openapi/gen.server.go index 518da77bf7c..af78dd4e77b 100644 --- a/dm/openapi/gen.server.go +++ b/dm/openapi/gen.server.go @@ -91,7 +91,7 @@ type ServerInterface interface { DMAPUpdateTaskConfig(c *gin.Context, taskName string) // get task list // (GET /api/v1/tasks) - DMAPIGetTaskList(c *gin.Context) + DMAPIGetTaskList(c *gin.Context, params DMAPIGetTaskListParams) // create and start task // (POST /api/v1/tasks) DMAPIStartTask(c *gin.Context) @@ -532,11 +532,26 @@ func (siw *ServerInterfaceWrapper) DMAPUpdateTaskConfig(c *gin.Context) { // DMAPIGetTaskList operation middleware func (siw *ServerInterfaceWrapper) DMAPIGetTaskList(c *gin.Context) { + var err error + + // Parameter object where we will unmarshal all parameters from the context + var params DMAPIGetTaskListParams + + // ------------- Optional query parameter "with_status" ------------- + if paramValue := c.Query("with_status"); paramValue != "" { + } + + err = runtime.BindQueryParameter("form", true, false, "with_status", c.Request.URL.Query(), ¶ms.WithStatus) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": fmt.Sprintf("Invalid format for parameter with_status: %s", err)}) + return + } + for _, middleware := range siw.HandlerMiddlewares { middleware(c) } - siw.Handler.DMAPIGetTaskList(c) + siw.Handler.DMAPIGetTaskList(c, params) } // DMAPIStartTask operation middleware @@ -998,45 +1013,45 @@ var swaggerSpec = []string{ "v4LR59Loti6YD5ic0sWvarILOZcr/Y/IEpIATXUH8DTr91hCskCtZWorD6BDIsDTREZNYE51t69pLA7D", "CCRRusCkS+OvKtVrSMouWBgPTN9iJa9Zb7tUEEiPK6vdemsOxaTe7lW/2bcZgl+5YzVKpmGqYhPhmG1J", "byX+lpCEOj04j3AgUKh2omLNNJbCSG8Qu2VYl99V6+MXl4mXAj6Nne2Pkh63cKWKAJRKPQAFkibFWiVB", - "nJsyda/fK2rW7sW0Se2WoFB+mfqgyFLoNpgYLxgUKOf3KrYlX5kxQI3pd+8EU7J+pj+uyEAlpbfGNibq", - "gyMo4AfIUdaN68F6Brmp82eInqdRJDdCAoZiRHTXFoxUJ1DBVFAN6uTfFCC0CHWFIav7d1KlSmu3WnWo", - "HFcSXCAllHJiDqDICoMRukFRTSXiBaEMaSPkyE/Ix5n7mTNFw5gSakEYR100uIHBdL/Ve2QSKARiKjLS", - "qtsPjG94Adf/HzGatEN156GAFi5vw3uhRdZJ8uZfqWgj6xsQKE4iw0LN3SvFql9awPb1Ic4hjlA4VUxe", - "yyc2pLMcXXymY93xtpaMz4baveGuHdSKTGkQIM494K5XEqjP1a9jw4fWX9MoMtpPYtgRUpaKd3QOpF7K", - "ta1coJ4vDCjhmAtEAkeJURkXIhiNQGZvMDHOq6oa6j4cyqSJm6t2/Hw2ADlPmdRcZUlNBXUJhJzOXZSW", - "dl+GyCFmdUO9NczWnxoTW5tZD5iKJUMwLLdB7VbFRCFMfyDxF1BifHSn449j78zjPefU+ovWqX364IQE", - "bD0OsEyShwEYSqLpDIqg3Mg4rjdq2XNJv33JKMH/ypdScwD0JwpS9Uhqx+sUEoHVUu4uqyTqiL7qRu6N", - "Q3+skLuCjZGCzzF0RQqFi1TXbJXAtVhitDMPRtt7O4Ptd8HbwXiM3g7g3pudwV4wmr3bDd+8n++M9seD", - "t6Pd8e72Tn/0ZvftbrgTWMPf7bzZHmyPdsLZ9u5eGO6E++PB+O3IeWisnKS1DoGpF6Zxp+HLhJYxtOuM", - "9J+mENCQmvfp0JLT6gFlwJC0hGFLX6UU6NyxCgyN27zNqg6/017j2vNUNUHZgfciubqjzq63xcmtNs6C", - "w0eGmifv7zTTIYOg9nE8O4DgHQPvijZWL9UEGec5pF2+7ibtvLFA35Gj7CjZk8Tog1schQFkYRadl8Pf", - "2eCXB+avayVLX15b6KqHO8TrAKtwwtpYbjMIytZ2em95S4cvq/CYxAgp4oBQkadKsh3zClnG98RgxwXE", - "rIN6bEOeE/UNIlyKmxsQXqRxmjG+iS0j63WM3KeR44l6JJq7IlxEr5T0mso4DQ6Uv9BeV6rFit4ypqlX", - "cpAJtaCm+M+baphtRah7NAY0twLcqUBESGpFRzRwJBOOzsCnBJGDzyfg6NOhpAmLevu9pRAJ3x8OQxrw", - "rQSTRQCTrYDGw38thwKHs4EUroE2iJiSIdfSrfyKOVURKhZqJ7UFbhDjeu03WztbI5XdTBCBCe7t93ak", - "ZCmWEEsF7RAmeHgzHppTfUOdk1evjMLNT3OfhGq5g88nrhPRquCgUwPq6+3RyASiWdMjTHQ+S+7nn1z3", - "+xXquElwGk9gKyJUxEjH43Lru48IRu3ku2NpHfYrNuJpHEO26u1LTAKDYPvqiUyeBFxwyWtmSO+L/NpD", - "mOE3/Yey33ea3yKkc0UOSn2azyNMkEbbuc48JpDBGGkq/1FLhVrgZR6UfC4Zppdl23sWDD1bXnS1oMBm", - "l2tBvtQYZ9ehGF8YRanGa+UikU6EzPRYRwkrjus/j4Q5rgfYMAmzLkBZS8IMYYbfjHFYS8KMUesgYTZ4", - "fgmzYPixJax8nU0jIcN4KwPOKVkfkTiiwf9cfjr3iFIZLDlXfrSgzm4hDYBaroAqpEEFIuMTNIDz98nZ", - "aSdw5MAWcJZCF0t84Gj3sl31FJdstDGzXNk4reqsUt6+qlj6OkVsZfE0FstpPsLBw+5is4N/H1XxOa4U", - "cTCpfZwmyprfKiSoDilIkUVdKuLgPtTr25Ausz5W4wV/oOHq0fabXXdS36BZDczkcnc1lI+fAYSXpoP0", - "5Q+AoFubti6y1oVs+M1KtLSbEfsup1ahi+hMHaNOCb5Oyye9/BalnPfpZFG85wbu+rXMG9Xd6zTReXwY", - "cdODmvXYqjDO1Cpc2kHN8EC9sPtoPOO8W2sDWFYzGYAPZdhhAlOuM5xK+TRorc9y5EV2Ov2FM+6XLqb2", - "pRFV0cJqVJ+nRPd5Zy1cDyU2QzyNu1H7Qg19JfcTkltT4ynpbd152sER1Ceju7iDT0Bc/wmrJ/ULK6fB", - "NyQEzg6R6WKGzwftyh7Db/qPwoXpwCyqBvjyeKXfUPDxLF/svePyznrQk3JpuU16s5hU18Puz6MCMtHJ", - "YhWn9TbFYD1B2Fc7sXhXrnRIYO820ViapvanNJb5oaIutjI/v/tyGK2x2eZZkiuVizg3RFHZN4bb164/", - "BkvRpKPuMic+f2TVVTn0+lfRXCHmT626BIOEz811+X4um5hhG5N+eiJWq3cm/FV4LWOE3PmiAOqrDXV9", - "pYW7BORXQ93m024Ii7MCz1C1rN1M7cCISkhG5lL+l2NdrGMaFS9Y3/jfpVZQ4PqJ6gXm9wm+X7XAB8AL", - "rRVYVK0T1CNRQ31+oEVJn6hBz0Lw8gGl9am//STQbI7zaM6D3IsXvqmzCOsUjUossZbltk/JOkx2DklH", - "g+07RbGZDQimkqIOcPmI2O9qCTeHMqMfTnNnxriByknqobK+kfyVzptA51TRqpnUFa3czdl9dXPb3Nx7", - "+LcqYznRN3A8hZ9T/+WxVy+3s5cL9a/OGR+nXYzu6dW8VG3a73JZpKPFpfaTcva6D7pdcqNdLM1NpnNo", - "PWbSbSpdGlReMj99ecpeP7s8eLe53S/34A3dR9Gpn+WVOzaVO0yzzD3Y44GtMXlTzIeV5J4DEt4vff4S", - "jNZrs8738owbO3YezMVrdvDkvTuvLP3aU7SxsuRsLHpkUZLfzSK0ZkRj/1rhq0y9MJnq+y9s8KE844DO", - "OPf8Cumm58fLksctFl83X/4qIa8S8h261Bp+bnljDWCjGHoLGif5z+u+iuLai/8ogvj4yYjWH3X+qzRi", - "Fb9AvYa8Nnut3dqTrd/c+JGy6mtlwJ7BymxoJ7Ti1ox7qtyprsBiNxk3lW84WtF0K6QxxETdb9STSDYT", - "eH/2rvlKpZAGD7xHaXid4uBqoI+Q6JLWwCx+V2GrnkvZml9zeRYgDXj524Fa/q4kfg4gswsy8nHZg7sv", - "d/8OAAD//7mGQmsCjQAA", + "nJsyda/fK2rW7sW0Se2WoFB+mfrAylLcJ0HQ2iqlYvMYLxgUKBeiKgkls5oxQI3pd28vUwrkTH9cEaxK", + "nnAN3EzUB0dQwA+Qo6zF10PKDHLTPJBRb55GkdwICRiKEdGtYDBS7UUFp0I1qJPTVIDQoikqXF7dv5Mq", + "VQZy62qHHnNl1gVSki4n5gCKrNoYoRsU1fQsXhDKkLZsjqSHfJz5tDlTNIwpoRaEcdTFLBgYTEtdvfEm", + "gUIgpsItbQ/8wPiGF3D9/xGjSTtUdx4KaIn1dtEXqmmdzHH+lQphsmYEgeIkMizU3BJTrPqlBWxfc+Mc", + "4giFU8XkNR3UkCNztAaaNnjH21qGPxtqN5y7dlCrXKVBgDj3gLtenaE+V7+ODR9af02jyGg/iWFHnFqq", + "CNI5kHop17ZygXoSMqCEYy4QCRx1S2WxiGA0ApkRw8R4xKoUqZt7KJN2c656/PPZAOQ8ZVJzlSU1FdQl", + "EHI6d6VbOhMy7g4xq1v/rWG2/tTY7drMesBULBmCYbm3arcqJgph+gOJv4AS4/g7owkce2ce7zmn1l+0", + "Tu3TByckYOtxgGWSPAzAUBJNZ1AE5e7Icb37y55LBgNLRgn+V76UmgOgP1GQqkdSO16nkAislnK3biVR", + "R/RVN3JvHPoDkNy/bAw/fN6mK/wo/K66ZqtEw8USo515MNre2xlsvwveDsZj9HYA997sDPaC0ezdbvjm", + "/XxntD8evB3tjne3d/qjN7tvd8OdwBr+bufN9mB7tBPOtnf3wnAn3B8Pxm9HzpNo5cyvdbJMvTDdQA1f", + "JrSMoV1n+uBpqgsN+X6fDi15wh5QBgxJSxi2NGtKgc4dq8DQuM3brOrwO+01rj1PVROUowIvkqs76ux6", + "W5zcauMsOHxkqHny/vY1HTIIap/xswMI3jGar2hj9VJNkHGeQ9rl627Szhur/h05yg69PZmRPrjFURhA", + "FmYhfzmmng1+eWBSvFYH9SXLhS6luEO8DrAKJ6yNNTyDoGxtp/eW94n4UhWPSYyQIg4IFXn+Jdsxr5Bl", + "fE8MdlxAzDqoxzbkOVHfIMKluLkB4UVuqBnjm9iHsl4byn26Q56o8aK51cJF9EqdsKk21OBA+av3daVa", + "rOitjZoiKAeZUAtqOgp4U2G0rbJ1j26D5v6COxWICEmt6IgGjmTC0Rn4lCBy8PkEHH06lDRhUW+/txQi", + "4fvDYUgDvpVgsghgshXQePiv5VDgcDaQwjXQBhFTMuRaupVfMacqQsVC7aS2wA1iXK/9Zmtna6RSpgki", + "MMG9/d6OlCzFEmKpoB3CBA9vxkNzVHCoE/3qlVG4+RHxk1Atd/D5xHXMWlUxdGpAfb09GplANOukhInO", + "Z8n9/JPrJsJCHTcJTuOxbkWEihjpeFxuffcRwagdp3csrcN+xUY8jWPIVr19iUlgEGzfZ5HJk4ALLnnN", + "DOl9kV97CDP8pv9Q9vtO81uEdK7IQalP83mECdJoO9eZxwQyGCNN5T9qqVALvMyDks8lw/SyFH7PgqFn", + "y4suQRTY7HLXyJca4+w6FOMLoyjVeK3cTtKJkJke6yhhxR0AzyNhjjsHNkzCrFtV1pIwQ5jhN2Mc1pIw", + "Y9Q6SJgNnl/CLBh+bAkr35HTSMgw3sqAc0rWRySOaPA/l5/OPaJUBkvOlZ9XqLNbSAOgliugCmlQgcj4", + "BA3g/H1ydtoJHDmwBZyl0MUSHzjavWxXPcXNHW3MLFc2Tqs6AJX3xCqWvk4RW1k8jcVymo9w8LC7gu3g", + "30dVfI57ShxMap/RibKOugoJqkMKUmRRl4o4uA/1+oqly6w51njBH2i4erT9Zneo1DdoVgMzudxdDeXj", + "ZwDhpekgfaMEIOjWpq2LrHUhG36zEi3tZsS+IKpV6CI6U2ezU4Kv0/LxMb9FKed9OlkU72GEu34t80Z1", + "SzxNdB4fRtw0tmaNuyqMM7UKl3ZQMzxQL+w+Gs84L+zaAJbVTAbgQxl2mMCU6wynUj4NWuuzHHmRHXl/", + "4Yz7pYupfWlEVbSwut/nKdHN41lf2EOJzRBP427UvlBDX8n9hOTW1HhKelsXqXZwBPVx6y7u4BMQ139s", + "60n9wsoR8w0JgbOTabqY4fNBu7LH8Jv+o3BhOjCLqgG+PF7pNxR8PMsXe++4vLMe9KRcWu693iwm1fWw", + "+/OogEx0sljFEcBNMVhPEPbVjkHelSsdEti7TTSWplP+KY1lflKpi63MDwW/HEZrbLZ5luRK5XbPDVFU", + "9jXk9l3uj8FSNOmou8wx0h9ZdVVO0v5VNFeI+VOrLsEg4XNzB7+fyyZm2Makn56I1eqdCX8VXssYIXe+", + "KID6vkRdX2nhLgH51VC3+bQbwuKswDNULWvXXTswohKSkbnp/+VYF+uYRsUL1j8j0KVWUOD6ieoF5kcP", + "vl+1wAfAC60VWFStE9QjUUN9fqBFSZ+oQc9C8PIBpfWpv/0k0GyO82jOg9yLF76pswjrFI1KLLGW5baP", + "3jpMdg5JR4PtO0WxmQ0IppKiDnD5iNjvagk3hzKjH05zZ8a4gcpJ6qGyvub8lc6bQOdU0aqZ1BWt3M3Z", + "7dooo4i9sW0yG+5p38PFVknTib5Z5Clcrfovqr062p0dbah/Tc+4We2SfE/H6qUq9H6XSzAdyqX2U3n2", + "ug+6NXOjvTzNTaZ5aT1m0p0yXXpkXjI/fXnKdkO7Qnm3uQ049+AN3crRqaXmlTs2lTtMv8492OOB3Tl5", + "X86HleSeAxLeL4P/EozWa7/Q9/KMG5uGHszFazYR5e1Dryz92ta0sbLk7G16ZFGS380itGZEY/8K46tM", + "vTCZ6vvvjPChPOOAzjj3/Lrqpqfoy5LHLRZfN2X/KiGvEvIdGuUafkZ6Yw1goxh6ayon+c8Gv4ri2ov/", + "KIL4+MmI1h+r/qv0ghW/rL2GvDZ7rd06pK2Lpn+krPpaGbBnsDIb2oytuDXjnip3qlu42E3GTeVLllY0", + "3QppDDFRVyz1JJLNBN6f82u+1SmkwQOvchpepzi4GuhTLLqkNTCL31XYqudStuZXap4FSANe/naglr8r", + "iZ8DyOyOjnxc9uDuy92/AwAA///+CW8W2o0AAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/dm/openapi/gen.types.go b/dm/openapi/gen.types.go index b9e0e80806b..65f09abd8e4 100644 --- a/dm/openapi/gen.types.go +++ b/dm/openapi/gen.types.go @@ -361,6 +361,7 @@ type Task struct { // source-related configuration SourceConfig TaskSourceConfig `json:"source_config"` + StatusList *[]SubTaskStatus `json:"status_list,omitempty"` // table migrate rule TableMigrateRule []TaskTableMigrateRule `json:"table_migrate_rule"` @@ -543,6 +544,12 @@ type DMAPICreateTaskConfigJSONBody Task // DMAPIImportTaskConfigJSONBody defines parameters for DMAPIImportTaskConfig. type DMAPIImportTaskConfigJSONBody TaskConfigRequest +// DMAPIGetTaskListParams defines parameters for DMAPIGetTaskList. +type DMAPIGetTaskListParams struct { + // get task with status + WithStatus *bool `json:"with_status,omitempty"` +} + // DMAPIStartTaskJSONBody defines parameters for DMAPIStartTask. type DMAPIStartTaskJSONBody CreateTaskRequest diff --git a/dm/openapi/spec/dm.yaml b/dm/openapi/spec/dm.yaml index 160642c8aeb..fc4b6c6f366 100644 --- a/dm/openapi/spec/dm.yaml +++ b/dm/openapi/spec/dm.yaml @@ -371,6 +371,14 @@ paths: tags: - task summary: "get task list" + parameters: + - name: "with_status" + in: query + required: false + description: "get task with status" + schema: + type: boolean + example: true operationId: "DMAPIGetTaskList" responses: "200": @@ -1483,6 +1491,10 @@ components: $ref: "#/components/schemas/TaskTableMigrateRule" source_config: $ref: "#/components/schemas/TaskSourceConfig" + status_list: + type: array + items: + $ref: "#/components/schemas/SubTaskStatus" required: - "name" - "task_mode" diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index d6548dc94fe..adca44395d6 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -3,7 +3,6 @@ import sys import requests -NO_SHARD_TASK_NAME = "test-no-shard" SHARD_TASK_NAME = "test-shard" ILLEGAL_CHAR_TASK_NAME = "t-Ë!s`t" SOURCE1_NAME = "mysql-01" @@ -57,9 +56,9 @@ def start_task_failed(): assert resp.status_code == 400 -def start_noshard_task_success(): +def start_noshard_task_success(task_name, tartget_table_name=""): task = { - "name": NO_SHARD_TASK_NAME, + "name": task_name, "task_mode": "all", "shard_mode": "pessimistic", "meta_schema": "dm-meta", @@ -78,6 +77,7 @@ def start_noshard_task_success(): "schema": "openapi", "table": "*", }, + "target": {"schema": "openapi", "table": tartget_table_name}, }, { "source": { @@ -85,6 +85,7 @@ def start_noshard_task_success(): "schema": "openapi", "table": "*", }, + "target": {"schema": "openapi", "table": tartget_table_name}, }, ], "source_config": { @@ -192,6 +193,22 @@ def get_task_list(task_count): assert data["total"] == int(task_count) +def get_task_list_with_status(task_count, task_name, status_count): + url = API_ENDPOINT + "?with_status=true" + resp = requests.get(url=url) + data = resp.json() + assert resp.status_code == 200 + print("get_task_list_with_status resp=", data) + + assert data["total"] == int(task_count) + find_task = False + for task in data["data"]: + if task["name"] == task_name: + find_task = True + assert len(task["status_list"]) == int(status_count) + assert find_task + + def pause_task_success(task_name, source_name): url = API_ENDPOINT + "/" + task_name + "/pause" resp = requests.post( @@ -288,6 +305,7 @@ if __name__ == "__main__": "resume_task_success": resume_task_success, "stop_task_success": stop_task_success, "get_task_list": get_task_list, + "get_task_list_with_status": get_task_list_with_status, "get_task_status_failed": get_task_status_failed, "get_illegal_char_task_status_failed": get_illegal_char_task_status_failed, "get_task_status_success": get_task_status_success, diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 1f0a4eb3600..ee3a04eeef5 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -18,9 +18,9 @@ function prepare_database() { function init_noshard_data() { run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" - run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" - run_sql_source1 "INSERT INTO openapi.t1(i,j) VALUES (1, 2);" + + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" run_sql_source2 "INSERT INTO openapi.t2(i,j) VALUES (3, 4);" } @@ -230,6 +230,7 @@ function test_noshard_task() { prepare_database task_name="test-no-shard" + target_table_name="" # empty means no route # create source succesfully openapi_source_check "create_source1_success" @@ -250,7 +251,7 @@ function test_noshard_task() { openapi_task_check "start_task_failed" # start no shard task success - openapi_task_check "start_noshard_task_success" + openapi_task_check "start_noshard_task_success" $task_name $target_table_name run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $task_name" \ @@ -294,6 +295,49 @@ function test_noshard_task() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: NO SHARD TASK SUCCESS" } +function test_multi_tasks() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: MULTI TASK" + prepare_database + + task1="test-1" + task1_target_table_name="task1_target_table" + + task2="test-2" + task2_target_db_name="task2_target_table" + + # create and check source + openapi_source_check "create_source1_success" + openapi_source_check "create_source2_success" + openapi_source_check "list_source_success" 2 + + init_noshard_data + + openapi_task_check "start_noshard_task_success" $task1 $task1_target_table_name + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task1" \ + "\"stage\": \"Running\"" 2 + + # test get task list with status, now we have 1 task with two status + openapi_task_check "get_task_list_with_status" 1 $task1 2 + + openapi_task_check "start_noshard_task_success" $task2 $task2_target_db_name + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task2" \ + "\"stage\": \"Running\"" 2 + + # now we have 2 task and each one has two status + openapi_task_check "get_task_list_with_status" 2 $task2 2 + + # delete source success and clean data for other test + openapi_source_check "delete_source_with_force_success" "mysql-01" + openapi_source_check "delete_source_with_force_success" "mysql-02" + openapi_source_check "list_source_success" 0 + run_sql_tidb "DROP DATABASE if exists openapi;" + openapi_task_check "get_task_list" 0 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: MULTI TASK SUCCESS" + +} + function test_cluster() { # list master and worker node openapi_cluster_check "list_master_success" 2 @@ -332,8 +376,9 @@ function run() { test_relay test_shard_task + test_multi_tasks test_noshard_task - test_cluster + test_cluster # note that this test case should running at last, becasue it will offline some memebrs of cluster } cleanup_data openapi