From 59d571609ff7a1b36de86b31d23c0256be831e21 Mon Sep 17 00:00:00 2001 From: d4x1 <1507509064@qq.com> Date: Tue, 13 Aug 2024 15:41:52 +0800 Subject: [PATCH] feat(circleci): pagination --- .../plugins/circleci/tasks/job_collector.go | 37 ++++--------------- .../circleci/tasks/pipeline_collector.go | 37 ++++--------------- backend/plugins/circleci/tasks/shared.go | 29 +++++++++++++++ backend/plugins/circleci/tasks/task_data.go | 3 ++ .../circleci/tasks/workflow_collector.go | 37 ++++--------------- 5 files changed, 53 insertions(+), 90 deletions(-) diff --git a/backend/plugins/circleci/tasks/job_collector.go b/backend/plugins/circleci/tasks/job_collector.go index 23fbedbc684..1f2f858f5ec 100644 --- a/backend/plugins/circleci/tasks/job_collector.go +++ b/backend/plugins/circleci/tasks/job_collector.go @@ -18,14 +18,11 @@ limitations under the License. package tasks import ( - "encoding/json" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" "github.com/apache/incubator-devlake/plugins/circleci/models" - "net/http" - "net/url" "reflect" ) @@ -63,33 +60,13 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { } collector, err := api.NewApiCollector(api.ApiCollectorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - ApiClient: data.ApiClient, - UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", - Input: iterator, - GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) { - res := CircleciPageTokenResp[any]{} - err := api.UnmarshalResponse(prevPageResponse, &res) - if err != nil { - return nil, err - } - if res.NextPageToken == "" { - return nil, api.ErrFinishCollect - } - return res.NextPageToken, nil - }, - Query: func(reqData *api.RequestData) (url.Values, errors.Error) { - query := url.Values{} - if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" { - query.Set("page_token", reqData.CustomData.(string)) - } - return query, nil - }, - ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - data := CircleciPageTokenResp[[]json.RawMessage]{} - err := api.UnmarshalResponse(res, &data) - return data.Items, err - }, + RawDataSubTaskArgs: *rawDataSubTaskArgs, + ApiClient: data.ApiClient, + UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", + Input: iterator, + GetNextPageCustomData: ExtractNextPageToken, + Query: BuildQueryParamsWithPageToken, + ResponseParser: ParseCircleciPageTokenResp, }) if err != nil { logger.Error(err, "collect jobs error") diff --git a/backend/plugins/circleci/tasks/pipeline_collector.go b/backend/plugins/circleci/tasks/pipeline_collector.go index 586f9ae6dee..b7940e6c8bf 100644 --- a/backend/plugins/circleci/tasks/pipeline_collector.go +++ b/backend/plugins/circleci/tasks/pipeline_collector.go @@ -18,12 +18,9 @@ limitations under the License. package tasks import ( - "encoding/json" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" - "net/http" - "net/url" ) const RAW_PIPELINE_TABLE = "circleci_api_pipelines" @@ -43,33 +40,13 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() logger.Info("collect pipelines") collector, err := api.NewApiCollector(api.ApiCollectorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - ApiClient: data.ApiClient, - UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline", - PageSize: int(data.Options.PageSize), - GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) { - res := CircleciPageTokenResp[any]{} - err := api.UnmarshalResponse(prevPageResponse, &res) - if err != nil { - return nil, err - } - if res.NextPageToken == "" { - return nil, api.ErrFinishCollect - } - return res.NextPageToken, nil - }, - Query: func(reqData *api.RequestData) (url.Values, errors.Error) { - query := url.Values{} - if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" { - query.Set("page_token", reqData.CustomData.(string)) - } - return query, nil - }, - ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - data := CircleciPageTokenResp[[]json.RawMessage]{} - err := api.UnmarshalResponse(res, &data) - return data.Items, err - }, + RawDataSubTaskArgs: *rawDataSubTaskArgs, + ApiClient: data.ApiClient, + UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline", + PageSize: int(data.Options.PageSize), + GetNextPageCustomData: ExtractNextPageToken, + Query: BuildQueryParamsWithPageToken, + ResponseParser: ParseCircleciPageTokenResp, }) if err != nil { logger.Error(err, "collect pipelines error") diff --git a/backend/plugins/circleci/tasks/shared.go b/backend/plugins/circleci/tasks/shared.go index c720daecba4..dd0cdf8c7f2 100644 --- a/backend/plugins/circleci/tasks/shared.go +++ b/backend/plugins/circleci/tasks/shared.go @@ -18,12 +18,15 @@ limitations under the License. package tasks import ( + "encoding/json" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/didgen" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" "github.com/apache/incubator-devlake/plugins/circleci/models" + "net/http" + "net/url" ) var accountIdGen *didgen.DomainIdGenerator @@ -91,3 +94,29 @@ func findPipelineById(db dal.Dal, id string) (*models.CircleciPipeline, errors.E } return pipeline, nil } + +func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) { + res := CircleciPageTokenResp[any]{} + err := api.UnmarshalResponse(prevPageResponse, &res) + if err != nil { + return nil, err + } + if res.NextPageToken == "" { + return nil, api.ErrFinishCollect + } + return res.NextPageToken, nil +} + +func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) { + query := url.Values{} + if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" { + query.Set("page-token", pageToken) + } + return query, nil +} + +func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.Error) { + data := CircleciPageTokenResp[[]json.RawMessage]{} + err := api.UnmarshalResponse(res, &data) + return data.Items, err +} diff --git a/backend/plugins/circleci/tasks/task_data.go b/backend/plugins/circleci/tasks/task_data.go index 960325696c3..ba30ba8d960 100644 --- a/backend/plugins/circleci/tasks/task_data.go +++ b/backend/plugins/circleci/tasks/task_data.go @@ -46,5 +46,8 @@ func DecodeAndValidateTaskOptions(options map[string]interface{}) (*CircleciOpti if op.ConnectionId == 0 { return nil, errors.Default.New("connectionId is invalid") } + if op.PageSize == 0 { + op.PageSize = 20 // CircleCI API default page size + } return &op, nil } diff --git a/backend/plugins/circleci/tasks/workflow_collector.go b/backend/plugins/circleci/tasks/workflow_collector.go index a1ba0acaa9b..8234eceac7d 100644 --- a/backend/plugins/circleci/tasks/workflow_collector.go +++ b/backend/plugins/circleci/tasks/workflow_collector.go @@ -18,14 +18,11 @@ limitations under the License. package tasks import ( - "encoding/json" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" "github.com/apache/incubator-devlake/plugins/circleci/models" - "net/http" - "net/url" "reflect" ) @@ -63,33 +60,13 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error { } collector, err := api.NewApiCollector(api.ApiCollectorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - ApiClient: data.ApiClient, - UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow", - Input: iterator, - GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) { - res := CircleciPageTokenResp[any]{} - err := api.UnmarshalResponse(prevPageResponse, &res) - if err != nil { - return nil, err - } - if res.NextPageToken == "" { - return nil, api.ErrFinishCollect - } - return res.NextPageToken, nil - }, - Query: func(reqData *api.RequestData) (url.Values, errors.Error) { - query := url.Values{} - if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" { - query.Set("page_token", reqData.CustomData.(string)) - } - return query, nil - }, - ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - data := CircleciPageTokenResp[[]json.RawMessage]{} - err := api.UnmarshalResponse(res, &data) - return data.Items, err - }, + RawDataSubTaskArgs: *rawDataSubTaskArgs, + ApiClient: data.ApiClient, + UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow", + Input: iterator, + GetNextPageCustomData: ExtractNextPageToken, + Query: BuildQueryParamsWithPageToken, + ResponseParser: ParseCircleciPageTokenResp, }) if err != nil { logger.Error(err, "collect workflows error")