Skip to content

Commit

Permalink
openapi(engine): use application/json in submit job api (#6579)
Browse files Browse the repository at this point in the history
ref #6255
  • Loading branch information
amyangfei authored Aug 3, 2022
1 parent 16ca2e3 commit 0f8e58e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 63 deletions.
72 changes: 45 additions & 27 deletions engine/servermaster/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"

"github.com/gin-gonic/gin"
Expand All @@ -35,12 +34,21 @@ const (
apiOpVarProjectID = "project_id"
// apiOpVarJobID is the key of job id in HTTP API.
apiOpVarJobID = "job_id"
// apiOpVarJobID is the key of job type in HTTP API.
apiOpJobType = "job_type"
// apiOpVarJobID is the key of job config in HTTP API.
apiOpJobConfig = "job_config"
)

// APICreateJobRequest defines the json fields when creating a job with OpenAPI
type APICreateJobRequest struct {
JobType int32 `json:"job_type"`
JobConfig string `json:"job_config"`
}

// APIQueryJobResponse defines the json fields of query job response
type APIQueryJobResponse struct {
JobType int32 `json:"job_type"`
JobConfig string `json:"job_config"`
Status int32 `json:"status"`
}

// ServerInfoProvider provides server info.
type ServerInfoProvider interface {
// IsLeader returns whether the server is leader.
Expand Down Expand Up @@ -129,28 +137,16 @@ func (o *OpenAPI) ListJobs(c *gin.Context) {
// @Router /api/v1/jobs [post]
// TODO: use gRPC gateway to serve OpenAPI in the future
func (o *OpenAPI) SubmitJob(c *gin.Context) {
tpStr := c.PostForm(apiOpJobType)
tp, err := strconv.ParseInt(tpStr, 10, 64)
data := &APICreateJobRequest{}
err := c.ShouldBindJSON(data)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, errors.New("job-type is not valid"))
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
tenantID := c.PostForm(apiOpVarTenantID)
projectID := c.PostForm(apiOpVarProjectID)
if tenantID == "" {
_ = c.AbortWithError(http.StatusBadRequest, errors.New("tenant_id must be provided"))
return
}
if projectID == "" {
_ = c.AbortWithError(http.StatusBadRequest, errors.New("project_id must be provided"))
return
}

projInfo := &pb.ProjectInfo{
TenantId: tenantID,
ProjectId: projectID,
TenantId: c.Query(apiOpVarTenantID),
ProjectId: c.Query(apiOpVarProjectID),
}
cfg := c.PostForm(apiOpJobConfig)

jobMgr, ok := o.infoProvider.JobManager()
if !ok {
Expand All @@ -159,8 +155,8 @@ func (o *OpenAPI) SubmitJob(c *gin.Context) {
}
ctx := c.Request.Context()
req := &pb.SubmitJobRequest{
Tp: int32(tp),
Config: []byte(cfg),
Tp: data.JobType,
Config: []byte(data.JobConfig),
ProjectInfo: projInfo,
}
resp := jobMgr.SubmitJob(ctx, req)
Expand All @@ -185,9 +181,31 @@ func (o *OpenAPI) QueryJob(c *gin.Context) {
tenantID := c.Query(apiOpVarTenantID)
projectID := c.Query(apiOpVarProjectID)
jobID := c.Param(apiOpVarJobID)
_, _, _ = tenantID, projectID, jobID
// TODO: Implement it.
c.AbortWithStatus(http.StatusNotImplemented)

jobMgr, ok := o.infoProvider.JobManager()
if !ok {
_ = c.AbortWithError(http.StatusServiceUnavailable, errors.New("job manager is not initialized"))
return
}
req := &pb.QueryJobRequest{
JobId: jobID,
ProjectInfo: &pb.ProjectInfo{
TenantId: tenantID,
ProjectId: projectID,
},
}
ctx := c.Request.Context()
resp := jobMgr.QueryJob(ctx, req)
if resp.Err != nil {
_ = c.AbortWithError(http.StatusBadRequest, errors.New(resp.Err.String()))
return
}
queryResp := &APIQueryJobResponse{
JobType: resp.GetTp(),
JobConfig: string(resp.GetConfig()),
Status: int32(resp.GetStatus()),
}
c.IndentedJSON(http.StatusOK, queryResp)
}

// PauseJob pauses a job.
Expand Down
2 changes: 1 addition & 1 deletion engine/servermaster/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestOpenAPIBasic(t *testing.T) {
{
method: http.MethodGet,
path: "/api/v1/jobs/job1",
statusCode: http.StatusNotImplemented,
statusCode: http.StatusOK,
shouldForward: false,
},
{
Expand Down
18 changes: 7 additions & 11 deletions engine/test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/pingcap/tiflow/engine/client"
pb "github.com/pingcap/tiflow/engine/enginepb"
cvs "github.com/pingcap/tiflow/engine/jobmaster/cvsjob"
engineModel "github.com/pingcap/tiflow/engine/model"
Expand Down Expand Up @@ -135,8 +134,6 @@ func testSubmitTest(t *testing.T, cfg *cvs.Config, config *Config, demoAddr stri
democlient, err := NewDemoClient(ctx, demoAddr)
require.Nil(t, err)
fmt.Printf("connect clients\n")
masterclient, err := client.NewMasterClient(ctx, config.MasterAddrs)
require.Nil(t, err)

for {
resp, err := democlient.client.IsReady(ctx, &pb.IsReadyRequest{})
Expand All @@ -159,19 +156,18 @@ func testSubmitTest(t *testing.T, cfg *cvs.Config, config *Config, demoAddr stri

fmt.Printf("job id %s\n", jobID)

queryReq := &pb.QueryJobRequest{
JobId: jobID,
}
// continue to query
for {
ctx1, cancel := context.WithTimeout(ctx, 3*time.Second)
queryResp, err := masterclient.QueryJob(ctx1, queryReq)
queryResp, err := e2e.QueryJobViaOpenAPI(ctx1, config.MasterAddrs[0],
tenantID, projectID, jobID,
)
require.NoError(t, err)
require.Nil(t, queryResp.Err)
require.Equal(t, queryResp.Tp, int32(engineModel.JobTypeCVSDemo))
cancel()
fmt.Printf("query id %s, status %d, time %s\n", jobID, int(queryResp.Status), time.Now().Format("2006-01-02 15:04:05"))
if queryResp.Status == pb.QueryJobResponse_finished {
require.Equal(t, int32(engineModel.JobTypeCVSDemo), queryResp.JobType)
fmt.Printf("query id %s, status %d, time %s\n",
jobID, int(queryResp.Status), time.Now().Format("2006-01-02 15:04:05"))
if queryResp.Status == int32(pb.QueryJobResponse_finished) {
break
}
time.Sleep(time.Second)
Expand Down
65 changes: 41 additions & 24 deletions engine/test/e2e/e2e_test_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package e2e

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"net/http"
"os/exec"
"strconv"
"time"

"github.com/pingcap/errors"
Expand All @@ -37,8 +37,8 @@ import (
"github.com/pingcap/tiflow/engine/pkg/meta"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
"github.com/pingcap/tiflow/engine/pkg/tenant"
"github.com/pingcap/tiflow/engine/servermaster"
server "github.com/pingcap/tiflow/engine/servermaster"
"github.com/pingcap/tiflow/pkg/httputil"
)

func init() {
Expand Down Expand Up @@ -139,21 +139,12 @@ func (cli *ChaosCli) PauseJob(ctx context.Context, jobID string) error {
func (cli *ChaosCli) CheckJobStatus(
ctx context.Context, jobID string, expectedStatus pb.QueryJobResponse_JobStatus,
) (bool, error) {
req := &pb.QueryJobRequest{
JobId: jobID,
ProjectInfo: &pb.ProjectInfo{
TenantId: cli.project.TenantID(),
ProjectId: cli.project.ProjectID(),
},
}
resp, err := cli.masterCli.QueryJob(ctx, req)
resp, err := QueryJobViaOpenAPI(ctx, cli.masterAddrs[0],
cli.project.TenantID(), cli.project.ProjectID(), jobID)
if err != nil {
return false, err
}
if resp.Err != nil {
return false, errors.New(resp.Err.String())
}
return resp.Status == expectedStatus, nil
return resp.Status == int32(expectedStatus), nil
}

// UpdateFakeJobKey updates the etcd value of a worker belonging to a fake job
Expand Down Expand Up @@ -297,26 +288,52 @@ func CreateJobViaOpenAPI(
ctx context.Context, apiEndpoint string, tenantID string, projectID string,
tp engineModel.JobType, cfg string,
) (string, error) {
cli, err := httputil.NewClient(nil)
data := &servermaster.APICreateJobRequest{
JobType: int32(tp),
JobConfig: cfg,
}
postData, err := json.Marshal(data)
if err != nil {
return "", err
}
data := url.Values{
"job_type": {strconv.Itoa(int(tp))},
"job_config": {cfg},
"tenant_id": {tenantID},
"project_id": {projectID},
}
apiURL := "http://" + apiEndpoint + "/api/v1/jobs"
resp, err := cli.PostForm(ctx, apiURL, data)

resp, err := http.Post(
fmt.Sprintf("http://%s/api/v1/jobs?tenant_id=%s&project_id=%s",
apiEndpoint, tenantID, projectID,
),
"application/json",
bytes.NewReader(postData),
)
if err != nil {
return "", err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

var jobID string
err = json.Unmarshal(body, &jobID)
return jobID, err
}

// QueryJobViaOpenAPI wraps OpenAPI to query a job
func QueryJobViaOpenAPI(
ctx context.Context, apiEndpoint string, tenantID, projectID, jobID string,
) (result *servermaster.APIQueryJobResponse, err error) {
url := fmt.Sprintf(
"http://%s/api/v1/jobs/%s?tenant_id=%s&project_id=%s",
apiEndpoint, jobID, tenantID, projectID,
)
resp, err := http.Get(url) // #nosec G107
if err != nil {
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return
}
result = &servermaster.APIQueryJobResponse{}
err = json.Unmarshal(body, result)
return
}

0 comments on commit 0f8e58e

Please sign in to comment.