From 1fce7a54e46087e3c760da85f789103247991bf7 Mon Sep 17 00:00:00 2001 From: Chen Sun Date: Thu, 30 Mar 2023 10:09:46 +0000 Subject: [PATCH] fix UploadPipeline/UploadPipelineVersion v2 API returns v1 object. --- backend/src/apiserver/main.go | 4 +- .../server/pipeline_upload_server.go | 42 +++++++++++--- .../server/pipeline_upload_server_test.go | 57 ++++++++++++++----- 3 files changed, 80 insertions(+), 23 deletions(-) diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 07a700018a4..d702e881176 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -170,8 +170,8 @@ func startHttpProxy(resourceManager *resource.ResourceManager) { // https://github.com/grpc-ecosystem/grpc-gateway/issues/410 sharedPipelineUploadServer := server.NewPipelineUploadServer(resourceManager, &server.PipelineUploadServerOptions{CollectMetrics: *collectMetricsFlag}) // API v1beta1 - topMux.HandleFunc("/apis/v1beta1/pipelines/upload", sharedPipelineUploadServer.UploadPipeline) - topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", sharedPipelineUploadServer.UploadPipelineVersion) + topMux.HandleFunc("/apis/v1beta1/pipelines/upload", sharedPipelineUploadServer.UploadPipelineV1) + topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", sharedPipelineUploadServer.UploadPipelineVersionV1) topMux.HandleFunc("/apis/v1beta1/healthz", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") io.WriteString(w, `{"commit_sha":"`+common.GetStringConfigWithDefault("COMMIT_SHA", "unknown")+`", "tag_name":"`+common.GetStringConfigWithDefault("TAG_NAME", "unknown")+`", "multi_user":`+strconv.FormatBool(common.IsMultiUserMode())+`}`) diff --git a/backend/src/apiserver/server/pipeline_upload_server.go b/backend/src/apiserver/server/pipeline_upload_server.go index 7c644b57a55..aa85bf67da6 100644 --- a/backend/src/apiserver/server/pipeline_upload_server.go +++ b/backend/src/apiserver/server/pipeline_upload_server.go @@ -70,6 +70,14 @@ type PipelineUploadServer struct { options *PipelineUploadServerOptions } +func (s *PipelineUploadServer) UploadPipelineV1(w http.ResponseWriter, r *http.Request) { + s.uploadPipeline("v1beta1", w, r) +} + +func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Request) { + s.uploadPipeline("v2beta1", w, r) +} + // Creates a pipeline and a pipeline version. // HTTP multipart endpoint for uploading pipeline file. // https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html @@ -77,7 +85,7 @@ type PipelineUploadServer struct { // endpoint to the HTTP endpoint. // See https://github.com/grpc-ecosystem/grpc-gateway/issues/500 // Thus we create the HTTP endpoint directly and using swagger to auto generate the HTTP client. -func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Request) { +func (s *PipelineUploadServer) uploadPipeline(api_version string, w http.ResponseWriter, r *http.Request) { if s.options.CollectMetrics { uploadPipelineRequests.Inc() uploadPipelineVersionRequests.Inc() @@ -155,13 +163,30 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req w.Header().Set("Content-Type", "application/json") marshaler := &jsonpb.Marshaler{EnumsAsInts: false, OrigName: true} - err = marshaler.Marshal(w, toApiPipelineV1(newPipeline, newPipelineVersion)) + + if api_version == "v1beta1" { + err = marshaler.Marshal(w, toApiPipelineV1(newPipeline, newPipelineVersion)) + } else if api_version == "v2beta1" { + err = marshaler.Marshal(w, toApiPipeline(newPipeline)) + } else { + s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline. Invalid API version")) + return + } + if err != nil { s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline due to error marshalling the pipeline")) return } } +func (s *PipelineUploadServer) UploadPipelineVersionV1(w http.ResponseWriter, r *http.Request) { + s.uploadPipelineVersion("v1beta1", w, r) +} + +func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *http.Request) { + s.uploadPipelineVersion("v2beta1", w, r) +} + // Creates a pipeline version under an existing pipeline. // HTTP multipart endpoint for uploading pipeline version file. // https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html @@ -169,7 +194,7 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req // endpoint to the HTTP endpoint. // See https://github.com/grpc-ecosystem/grpc-gateway/issues/500 // Thus we create the HTTP endpoint directly and using swagger to auto generate the HTTP client. -func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *http.Request) { +func (s *PipelineUploadServer) uploadPipelineVersion(api_version string, w http.ResponseWriter, r *http.Request) { if s.options.CollectMetrics { uploadPipelineVersionRequests.Inc() } @@ -235,12 +260,15 @@ func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *h w.Header().Set("Content-Type", "application/json") marshaler := &jsonpb.Marshaler{EnumsAsInts: false, OrigName: true} - createdPipelineVersion := toApiPipelineVersionV1(newPipelineVersion) - if createdPipelineVersion == nil { - s.writeErrorToResponse(w, http.StatusInternalServerError, util.NewInternalServerError(errors.New("Failed to convert internal pipeline version representation to its v1beta1 API counterpart"), "Failed to create a pipeline version due to error converting it to API")) + if api_version == "v1beta1" { + err = marshaler.Marshal(w, toApiPipelineVersionV1(newPipelineVersion)) + } else if api_version == "v2beta1" { + err = marshaler.Marshal(w, toApiPipelineVersion(newPipelineVersion)) + } else { + s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline version. Invalid API version")) return } - err = marshaler.Marshal(w, createdPipelineVersion) + if err != nil { s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Failed to create a pipeline version due to marshalling error")) return diff --git a/backend/src/apiserver/server/pipeline_upload_server_test.go b/backend/src/apiserver/server/pipeline_upload_server_test.go index 2491fd6e024..8515033b671 100644 --- a/backend/src/apiserver/server/pipeline_upload_server_test.go +++ b/backend/src/apiserver/server/pipeline_upload_server_test.go @@ -45,36 +45,65 @@ const ( func TestUploadPipeline(t *testing.T) { // TODO(v2): when we add a field to distinguish between v1 and v2 template, verify it's in the response tt := []struct { - name string - spec []byte + name string + spec []byte + api_version string }{{ - name: "upload argo workflow YAML", - spec: []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), + name: "upload argo workflow YAML", + spec: []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), + api_version: "v1beta1", }, { - name: "upload pipeline v2 job in proto yaml", - spec: []byte(v2SpecHelloWorld), + name: "upload argo workflow YAML", + spec: []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), + api_version: "v2beta1", + }, { + name: "upload pipeline v2 job in proto yaml", + spec: []byte(v2SpecHelloWorld), + api_version: "v1beta1", + }, { + name: "upload pipeline v2 job in proto yaml", + spec: []byte(v2SpecHelloWorld), + api_version: "v2beta1", }} for _, test := range tt { t.Run(test.name, func(t *testing.T) { clientManager, server := setupClientManagerAndServer() bytesBuffer, writer := setupWriter("") setWriterWithBuffer("uploadfile", "hello-world.yaml", string(test.spec), writer) - response := uploadPipeline("/apis/v1beta1/pipelines/upload", - bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipeline) + var response *httptest.ResponseRecorder + if test.api_version == "v1beta1" { + response = uploadPipeline("/apis/v1beta1/pipelines/upload", + bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipelineV1) + } else if test.api_version == "v2beta1" { + response = uploadPipeline("/apis/v2beta1/pipelines/upload", + bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipeline) + } + if response.Code != 200 { t.Fatalf("Upload response is not 200, message: %s", response.Body.String()) } - // Verify time format is RFC3339. parsedResponse := struct { - CreatedAt string `json:"created_at"` - DefaultVersion struct { - CreatedAt string `json:"created_at"` - } `json:"default_version"` + // v1 API only field + ID string `json:"id"` + // v2 API only field + PipelineID string `json:"pipeline_id"` + // v1 API and v2 API shared field + CreatedAt string `json:"created_at"` }{} json.Unmarshal(response.Body.Bytes(), &parsedResponse) + + // Verify time format is RFC3339. assert.Equal(t, "1970-01-01T00:00:01Z", parsedResponse.CreatedAt) - assert.Equal(t, "1970-01-01T00:00:02Z", parsedResponse.DefaultVersion.CreatedAt) + + // Verify v1 API returns v1 object while v2 API returns v2 object. + if test.api_version == "v1beta1" { + assert.Equal(t, "123e4567-e89b-12d3-a456-426655440000", parsedResponse.ID) + assert.Equal(t, "", parsedResponse.PipelineID) + } else if test.api_version == "v2beta1" { + assert.Equal(t, "", parsedResponse.ID) + assert.Equal(t, "123e4567-e89b-12d3-a456-426655440000", parsedResponse.PipelineID) + } // Verify stored in object store objStore := clientManager.ObjectStore()