Skip to content

Commit

Permalink
dm/openpai: add more task api (pingcap#3171)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored and ti-chi-bot committed Nov 4, 2021
1 parent 6c6ca9e commit 0fc8540
Show file tree
Hide file tree
Showing 18 changed files with 655 additions and 309 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ dm_integration_test: check_third_party_binary_for_dm install_test_python_dep
@which bin/dm-master.test
@which bin/dm-worker.test
@which bin/dm-syncer.test
ln -srf bin dm/
cd dm && ln -sf ../bin .
cd dm && ./tests/run.sh $(CASE)

dm_compatibility_test: check_third_party_binary_for_dm
Expand Down
3 changes: 3 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ ErrSchemaTrackerCannotCreateTable,[code=44003:class=schema-tracker:scope=interna
ErrSchemaTrackerCannotSerialize,[code=44004:class=schema-tracker:scope=internal:level=high], "Message: failed to serialize table info for `%s`.`%s`"
ErrSchemaTrackerCannotGetTable,[code=44005:class=schema-tracker:scope=internal:level=high], "Message: cannot get table info for %v from schema tracker"
ErrSchemaTrackerCannotExecDDL,[code=44006:class=schema-tracker:scope=internal:level=high], "Message: cannot track DDL: %s"
ErrSchemaTrackerMarshalJSON,[code=44013:class=schema-tracker:scope=downstream:level=high], "Message: can not marshal struct maybe `%v` is unable to serialize"
ErrSchemaTrackerUnMarshalJSON,[code=44014:class=schema-tracker:scope=downstream:level=high], "Message: can not unmarshal json maybe `%s` is not proper JSON"
ErrSchemaTrackerUnSchemaNotExist,[code=44015:class=schema-tracker:scope=downstream:level=high], "Message: can not find `%s` in tracker"
ErrSchemaTrackerCannotFetchDownstreamTable,[code=44007:class=schema-tracker:scope=downstream:level=medium], "Message: cannot fetch downstream table schema of %v to initialize upstream schema %v in schema tracker"
ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scope=internal:level=high], "Message: cannot parse downstream table schema of %v to initialize upstream schema %v in schema tracker"
ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=internal:level=medium], "Message: %s is not a valid `CREATE TABLE` statement"
Expand Down
196 changes: 171 additions & 25 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package master

import (
"context"
"encoding/json"
"fmt"
"net/http"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/ticdc/dm/dm/config"
"github.com/pingcap/ticdc/dm/dm/ctl/common"
"github.com/pingcap/ticdc/dm/dm/master/scheduler"
"github.com/pingcap/ticdc/dm/dm/master/workerrpc"
"github.com/pingcap/ticdc/dm/dm/pb"
"github.com/pingcap/ticdc/dm/openapi"
"github.com/pingcap/ticdc/dm/pkg/conn"
Expand Down Expand Up @@ -479,9 +481,13 @@ func (s *Server) DMAPIGetTaskList(ctx echo.Context) error {

// DMAPIGetTaskStatus url is:(GET /api/v1/tasks/{task-name}/status).
func (s *Server) DMAPIGetTaskStatus(ctx echo.Context, taskName string, params openapi.DMAPIGetTaskStatusParams) error {
// todo support params
// 1. get task source list from scheduler
sourceList := s.getTaskResources(taskName)
var sourceList []string
if params.SourceNameList == nil {
sourceList = s.getTaskResources(taskName)
} else {
sourceList = *params.SourceNameList
}
if len(sourceList) == 0 {
return terror.ErrSchedulerTaskNotExist.Generate(taskName)
}
Expand Down Expand Up @@ -557,43 +563,183 @@ func (s *Server) DMAPIGetTaskStatus(ctx echo.Context, taskName string, params op
return ctx.JSON(http.StatusOK, resp)
}

// DMAPPauseTask pause task url is: (POST /api/v1/tasks/{task-name}/pause).
func (s *Server) DMAPPauseTask(ctx echo.Context, taskName string) error {
return nil
// DMAPIPauseTask pause task url is: (POST /api/v1/tasks/{task-name}/pause).
func (s *Server) DMAPIPauseTask(ctx echo.Context, taskName string) error {
var sourceName openapi.SchemaNameList
if err := ctx.Bind(&sourceName); err != nil {
return err
}
if len(sourceName) == 0 {
sourceName = s.getTaskResources(taskName)
}
return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Paused, taskName, sourceName...)
}

// DMAPIResumeTask resume task url is: (POST /api/v1/tasks/{task-name}/resume).
func (s *Server) DMAPIResumeTask(ctx echo.Context, taskName string) error {
return nil
}

// DMAPIGetTaskSourceSchemaList get task source schema list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas).
func (s *Server) DMAPIGetTaskSourceSchemaList(ctx echo.Context, taskName string, sourceName string) error {
return nil
var sourceName openapi.SchemaNameList
if err := ctx.Bind(&sourceName); err != nil {
return err
}
if len(sourceName) == 0 {
sourceName = s.getTaskResources(taskName)
}
return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, sourceName...)
}

// DMAPIGetTaskSchemaStructure get task source schema structure url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}).
func (s *Server) DMAPIGetTaskSchemaStructure(ctx echo.Context, taskName string, sourceName string, schemaName string) error {
return nil
// DMAPIGetSchemaListByTaskAndSource get task source schema list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas).
func (s *Server) DMAPIGetSchemaListByTaskAndSource(ctx echo.Context, taskName string, sourceName string) error {
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
return terror.ErrWorkerNoStart
}
workerReq := workerrpc.Request{
Type: workerrpc.CmdOperateSchema,
OperateSchema: &pb.OperateWorkerSchemaRequest{
Op: pb.SchemaOp_ListSchema,
Task: taskName,
Source: sourceName,
},
}
newCtx := ctx.Request().Context()
resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
return err
}
if !resp.OperateSchema.Result {
return terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)
}
schemaList := openapi.SchemaNameList{}
if err := json.Unmarshal([]byte(resp.OperateSchema.Msg), &schemaList); err != nil {
return terror.ErrSchemaTrackerUnMarshalJSON.Delegate(err, resp.OperateSchema.Msg)
}
return ctx.JSON(http.StatusOK, schemaList)
}

// DMAPIGetTaskSourceTableList get task source table list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}).
func (s *Server) DMAPIGetTaskSourceTableList(ctx echo.Context, taskName string, sourceName string, schemaName string) error {
return nil
// DMAPIGetTableListByTaskAndSource get task source table list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}).
func (s *Server) DMAPIGetTableListByTaskAndSource(ctx echo.Context, taskName string, sourceName string, schemaName string) error {
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
return terror.ErrWorkerNoStart
}
workerReq := workerrpc.Request{
Type: workerrpc.CmdOperateSchema,
OperateSchema: &pb.OperateWorkerSchemaRequest{
Op: pb.SchemaOp_ListTable,
Task: taskName,
Source: sourceName,
Database: schemaName,
},
}
newCtx := ctx.Request().Context()
resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
return err
}
if !resp.OperateSchema.Result {
return terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)
}
tableList := openapi.TableNameList{}
if err := json.Unmarshal([]byte(resp.OperateSchema.Msg), &tableList); err != nil {
return terror.ErrSchemaTrackerUnMarshalJSON.Delegate(err, resp.OperateSchema.Msg)
}
return ctx.JSON(http.StatusOK, tableList)
}

// DMAPIDeleteTaskSourceTableStructure delete task source table structure url is: (DELETE /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).
func (s *Server) DMAPIDeleteTaskSourceTableStructure(ctx echo.Context, taskName string, sourceName string, schemaName string, tableName string) error {
return nil
// DMAPIGetTableStructure get task source table structure url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).
func (s *Server) DMAPIGetTableStructure(ctx echo.Context, taskName string, sourceName string, schemaName string, tableName string) error {
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
return terror.ErrWorkerNoStart
}
workerReq := workerrpc.Request{
Type: workerrpc.CmdOperateSchema,
OperateSchema: &pb.OperateWorkerSchemaRequest{
Op: pb.SchemaOp_GetSchema,
Task: taskName,
Source: sourceName,
Database: schemaName,
Table: tableName,
},
}
newCtx := ctx.Request().Context()
resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
return err
}
if !resp.OperateSchema.Result {
return terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)
}
taskTableStruct := openapi.GetTaskTableStructureResponse{
SchemaCreateSql: &resp.OperateSchema.Msg,
SchemaName: &schemaName,
TableName: tableName,
}
return ctx.JSON(http.StatusOK, taskTableStruct)
}

// DMAPIGetTaskSourceTableStructure get task source table structure url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).
func (s *Server) DMAPIGetTaskSourceTableStructure(ctx echo.Context, taskName string, sourceName string, schemaName string, tableName string) error {
return nil
// DMAPIDeleteTableStructure delete task source table structure url is: (DELETE /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).
func (s *Server) DMAPIDeleteTableStructure(ctx echo.Context, taskName string, sourceName string, schemaName string, tableName string) error {
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
return terror.ErrWorkerNoStart
}
workerReq := workerrpc.Request{
Type: workerrpc.CmdOperateSchema,
OperateSchema: &pb.OperateWorkerSchemaRequest{
Op: pb.SchemaOp_RemoveSchema,
Task: taskName,
Source: sourceName,
Database: schemaName,
Table: tableName,
},
}
newCtx := ctx.Request().Context()
resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
return err
}
if !resp.OperateSchema.Result {
return terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)
}
return ctx.NoContent(http.StatusNoContent)
}

// DMAPIOperateTaskSourceTableStructure operate task source table structure url is: (PUT /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).
func (s *Server) DMAPIOperateTaskSourceTableStructure(ctx echo.Context, taskName string, sourceName string, schemaName string, tableName string) error {
// DMAPIOperateTableStructure operate task source table structure url is: (PUT /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).
func (s *Server) DMAPIOperateTableStructure(ctx echo.Context, taskName string, sourceName string, schemaName string, tableName string) error {
var req openapi.OperateTaskTableStructureRequest
if err := ctx.Bind(&req); err != nil {
return err
}
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
return terror.ErrWorkerNoStart
}
opReq := &pb.OperateWorkerSchemaRequest{
Op: pb.SchemaOp_SetSchema,
Task: taskName,
Source: sourceName,
Database: schemaName,
Table: tableName,
Schema: req.SqlContent,
Sync: *req.Sync,
Flush: *req.Flush,
}
if req.Sync != nil {
opReq.Sync = *req.Sync
}
if req.Flush != nil {
opReq.Flush = *req.Flush
}
workerReq := workerrpc.Request{Type: workerrpc.CmdOperateSchema, OperateSchema: opReq}
newCtx := ctx.Request().Context()
resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
return err
}
if !resp.OperateSchema.Result {
return terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)
}
return nil
}

Expand Down
48 changes: 34 additions & 14 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,52 +452,72 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
task.TargetConfig.Password = dbCfg.Password

createTaskReq := openapi.CreateTaskRequest{RemoveMeta: false, Task: task}
result2 := testutil.NewRequest().Post(taskURL).WithJsonBody(createTaskReq).Go(t.testT, s.echo)
c.Assert(result2.Code(), check.Equals, http.StatusCreated)
result = testutil.NewRequest().Post(taskURL).WithJsonBody(createTaskReq).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusCreated)
var createTaskResp openapi.Task
err = result2.UnmarshalBodyToObject(&createTaskResp)
err = result.UnmarshalBodyToObject(&createTaskResp)
c.Assert(err, check.IsNil)
c.Assert(task.Name, check.Equals, createTaskResp.Name)
subTaskM := s.scheduler.GetSubTaskCfgsByTask(task.Name)
c.Assert(len(subTaskM) == 1, check.IsTrue)
c.Assert(subTaskM[source1Name].Name, check.Equals, task.Name)

// list tasks
result3 := testutil.NewRequest().Get(taskURL).Go(t.testT, s.echo)
c.Assert(result3.Code(), check.Equals, http.StatusOK)
result = testutil.NewRequest().Get(taskURL).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultTaskList openapi.GetTaskListResponse
err = result3.UnmarshalBodyToObject(&resultTaskList)
err = result.UnmarshalBodyToObject(&resultTaskList)
c.Assert(err, check.IsNil)
c.Assert(resultTaskList.Total, check.Equals, 1)
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// pause and resume task
pauseTaskURL := fmt.Sprintf("%s/%s/pause", taskURL, task.Name)
result = testutil.NewRequest().Post(pauseTaskURL).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusOK)
c.Assert(s.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect, check.Equals, pb.Stage_Paused)

resumeTaskURL := fmt.Sprintf("%s/%s/resume", taskURL, task.Name)
result = testutil.NewRequest().Post(resumeTaskURL).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusOK)
c.Assert(s.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect, check.Equals, pb.Stage_Running)

// get task status
mockWorkerClient := pbmock.NewMockWorkerClient(ctrl)
mockTaskQueryStatus(mockWorkerClient, task.Name, source1.SourceName, workerName1)
s.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient))
taskStatusURL := fmt.Sprintf("%s/%s/status", taskURL, task.Name)
result4 := testutil.NewRequest().Get(taskStatusURL).Go(t.testT, s.echo)
c.Assert(result4.Code(), check.Equals, http.StatusOK)
result = testutil.NewRequest().Get(taskStatusURL).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultTaskStatus openapi.GetTaskStatusResponse
err = result4.UnmarshalBodyToObject(&resultTaskStatus)
err = result.UnmarshalBodyToObject(&resultTaskStatus)
c.Assert(err, check.IsNil)
c.Assert(resultTaskStatus.Total, check.Equals, 1) // only 1 subtask
c.Assert(resultTaskStatus.Data[0].Name, check.Equals, task.Name)
c.Assert(resultTaskStatus.Data[0].Stage, check.Equals, pb.Stage_Running.String())
c.Assert(resultTaskStatus.Data[0].WorkerName, check.Equals, workerName1)

// get task status with source name
taskStatusURL = fmt.Sprintf("%s/%s/status?source_name_list=%s", taskURL, task.Name, source1Name)
result = testutil.NewRequest().Get(taskStatusURL).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultTaskStatusWithStatus openapi.GetTaskStatusResponse
err = result.UnmarshalBodyToObject(&resultTaskStatusWithStatus)
c.Assert(err, check.IsNil)
c.Assert(resultTaskStatusWithStatus, check.DeepEquals, resultTaskStatus)

// stop task
result5 := testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", taskURL, task.Name)).Go(t.testT, s.echo)
c.Assert(result5.Code(), check.Equals, http.StatusNoContent)
result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", taskURL, task.Name)).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusNoContent)
subTaskM = s.scheduler.GetSubTaskCfgsByTask(task.Name)
c.Assert(len(subTaskM) == 0, check.IsTrue)
c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/dm/master/MockSkipAdjustTargetDB"), check.IsNil)

// list tasks
result6 := testutil.NewRequest().Get(taskURL).Go(t.testT, s.echo)
c.Assert(result6.Code(), check.Equals, http.StatusOK)
result = testutil.NewRequest().Get(taskURL).Go(t.testT, s.echo)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultTaskList2 openapi.GetTaskListResponse
err = result6.UnmarshalBodyToObject(&resultTaskList2)
err = result.UnmarshalBodyToObject(&resultTaskList2)
c.Assert(err, check.IsNil)
c.Assert(resultTaskList2.Total, check.Equals, 0)
}
Expand Down
Loading

0 comments on commit 0fc8540

Please sign in to comment.