Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): fix UploadPipeline/UploadPipelineVersion v2 API returns v1 object. #9064

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {
// https://github.com/grpc-ecosystem/grpc-gateway/issues/410
sharedPipelineUploadServer := server.NewPipelineUploadServer(resourceManager, &server.PipelineUploadServerOptions{CollectMetrics: *collectMetricsFlag})
// API v1beta1
topMux.HandleFunc("/apis/v1beta1/pipelines/upload", sharedPipelineUploadServer.UploadPipeline)
topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", sharedPipelineUploadServer.UploadPipelineVersion)
topMux.HandleFunc("/apis/v1beta1/pipelines/upload", sharedPipelineUploadServer.UploadPipelineV1)
topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", sharedPipelineUploadServer.UploadPipelineVersionV1)
topMux.HandleFunc("/apis/v1beta1/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
io.WriteString(w, `{"commit_sha":"`+common.GetStringConfigWithDefault("COMMIT_SHA", "unknown")+`", "tag_name":"`+common.GetStringConfigWithDefault("TAG_NAME", "unknown")+`", "multi_user":`+strconv.FormatBool(common.IsMultiUserMode())+`}`)
Expand Down
42 changes: 35 additions & 7 deletions backend/src/apiserver/server/pipeline_upload_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,22 @@ type PipelineUploadServer struct {
options *PipelineUploadServerOptions
}

func (s *PipelineUploadServer) UploadPipelineV1(w http.ResponseWriter, r *http.Request) {
s.uploadPipeline("v1beta1", w, r)
}

func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Request) {
s.uploadPipeline("v2beta1", w, r)
}

// Creates a pipeline and a pipeline version.
// HTTP multipart endpoint for uploading pipeline file.
// https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
// This endpoint is not exposed through grpc endpoint, since grpc-gateway can't convert the gRPC
// endpoint to the HTTP endpoint.
// See https://github.com/grpc-ecosystem/grpc-gateway/issues/500
// Thus we create the HTTP endpoint directly and using swagger to auto generate the HTTP client.
func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Request) {
func (s *PipelineUploadServer) uploadPipeline(api_version string, w http.ResponseWriter, r *http.Request) {
if s.options.CollectMetrics {
uploadPipelineRequests.Inc()
uploadPipelineVersionRequests.Inc()
Expand Down Expand Up @@ -155,21 +163,38 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req

w.Header().Set("Content-Type", "application/json")
marshaler := &jsonpb.Marshaler{EnumsAsInts: false, OrigName: true}
err = marshaler.Marshal(w, toApiPipelineV1(newPipeline, newPipelineVersion))

if api_version == "v1beta1" {
err = marshaler.Marshal(w, toApiPipelineV1(newPipeline, newPipelineVersion))
} else if api_version == "v2beta1" {
err = marshaler.Marshal(w, toApiPipeline(newPipeline))
} else {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline. Invalid API version"))
return
}

if err != nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline due to error marshalling the pipeline"))
return
}
}

func (s *PipelineUploadServer) UploadPipelineVersionV1(w http.ResponseWriter, r *http.Request) {
s.uploadPipelineVersion("v1beta1", w, r)
}

func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *http.Request) {
s.uploadPipelineVersion("v2beta1", w, r)
}

// Creates a pipeline version under an existing pipeline.
// HTTP multipart endpoint for uploading pipeline version file.
// https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
// This endpoint is not exposed through grpc endpoint, since grpc-gateway can't convert the gRPC
// endpoint to the HTTP endpoint.
// See https://github.com/grpc-ecosystem/grpc-gateway/issues/500
// Thus we create the HTTP endpoint directly and using swagger to auto generate the HTTP client.
func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *http.Request) {
func (s *PipelineUploadServer) uploadPipelineVersion(api_version string, w http.ResponseWriter, r *http.Request) {
if s.options.CollectMetrics {
uploadPipelineVersionRequests.Inc()
}
Expand Down Expand Up @@ -235,12 +260,15 @@ func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *h

w.Header().Set("Content-Type", "application/json")
marshaler := &jsonpb.Marshaler{EnumsAsInts: false, OrigName: true}
createdPipelineVersion := toApiPipelineVersionV1(newPipelineVersion)
if createdPipelineVersion == nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.NewInternalServerError(errors.New("Failed to convert internal pipeline version representation to its v1beta1 API counterpart"), "Failed to create a pipeline version due to error converting it to API"))
if api_version == "v1beta1" {
err = marshaler.Marshal(w, toApiPipelineVersionV1(newPipelineVersion))
Copy link
Contributor

@Linchin Linchin Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous code, we have a check for the output of toApiPipelineVersionV1() to be nil or not. I wonder if we should keep it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toApiPipelineVersionV1() per its current implementation never returns nil. So the previous code wasn't useful.

} else if api_version == "v2beta1" {
err = marshaler.Marshal(w, toApiPipelineVersion(newPipelineVersion))
} else {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline version. Invalid API version"))
return
}
err = marshaler.Marshal(w, createdPipelineVersion)

if err != nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline version due to marshalling error"))
return
Expand Down
57 changes: 43 additions & 14 deletions backend/src/apiserver/server/pipeline_upload_server_test.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have a test for UploadPipeline(), maybe we can add a test for UploadPipelineVersion() too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UploadPipelineVersion() is highly similar to UploadPipeline(), but that isn't a good excuse at all. I agree with you we should have better test coverage here. I will follow up on improving the test coverage shortly.

Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,65 @@ const (
func TestUploadPipeline(t *testing.T) {
// TODO(v2): when we add a field to distinguish between v1 and v2 template, verify it's in the response
tt := []struct {
name string
spec []byte
name string
spec []byte
api_version string
}{{
name: "upload argo workflow YAML",
spec: []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"),
name: "upload argo workflow YAML",
spec: []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"),
api_version: "v1beta1",
}, {
name: "upload pipeline v2 job in proto yaml",
spec: []byte(v2SpecHelloWorld),
name: "upload argo workflow YAML",
spec: []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"),
api_version: "v2beta1",
}, {
name: "upload pipeline v2 job in proto yaml",
spec: []byte(v2SpecHelloWorld),
api_version: "v1beta1",
}, {
name: "upload pipeline v2 job in proto yaml",
spec: []byte(v2SpecHelloWorld),
api_version: "v2beta1",
}}
for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
clientManager, server := setupClientManagerAndServer()
bytesBuffer, writer := setupWriter("")
setWriterWithBuffer("uploadfile", "hello-world.yaml", string(test.spec), writer)
response := uploadPipeline("/apis/v1beta1/pipelines/upload",
bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipeline)
var response *httptest.ResponseRecorder
if test.api_version == "v1beta1" {
response = uploadPipeline("/apis/v1beta1/pipelines/upload",
bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipelineV1)
} else if test.api_version == "v2beta1" {
response = uploadPipeline("/apis/v2beta1/pipelines/upload",
bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipeline)
}

if response.Code != 200 {
t.Fatalf("Upload response is not 200, message: %s", response.Body.String())
}

// Verify time format is RFC3339.
parsedResponse := struct {
CreatedAt string `json:"created_at"`
DefaultVersion struct {
CreatedAt string `json:"created_at"`
} `json:"default_version"`
// v1 API only field
ID string `json:"id"`
// v2 API only field
PipelineID string `json:"pipeline_id"`
// v1 API and v2 API shared field
CreatedAt string `json:"created_at"`
}{}
json.Unmarshal(response.Body.Bytes(), &parsedResponse)

// Verify time format is RFC3339.
assert.Equal(t, "1970-01-01T00:00:01Z", parsedResponse.CreatedAt)
assert.Equal(t, "1970-01-01T00:00:02Z", parsedResponse.DefaultVersion.CreatedAt)

// Verify v1 API returns v1 object while v2 API returns v2 object.
if test.api_version == "v1beta1" {
assert.Equal(t, "123e4567-e89b-12d3-a456-426655440000", parsedResponse.ID)
assert.Equal(t, "", parsedResponse.PipelineID)
} else if test.api_version == "v2beta1" {
assert.Equal(t, "", parsedResponse.ID)
assert.Equal(t, "123e4567-e89b-12d3-a456-426655440000", parsedResponse.PipelineID)
}

// Verify stored in object store
objStore := clientManager.ObjectStore()
Expand Down