diff --git a/backend/helpers/pluginhelper/api/api_collector.go b/backend/helpers/pluginhelper/api/api_collector.go index 440113269f0..c34fb7d02d3 100644 --- a/backend/helpers/pluginhelper/api/api_collector.go +++ b/backend/helpers/pluginhelper/api/api_collector.go @@ -56,6 +56,8 @@ type RequestData struct { // AsyncResponseHandler FIXME ... type AsyncResponseHandler func(res *http.Response) error +var ErrUndetermined = errors.BadInput.New("undetermined") + // ApiCollectorArgs FIXME ... type ApiCollectorArgs struct { RawDataSubTaskArgs @@ -259,7 +261,7 @@ func (collector *ApiCollector) exec(input interface{}) { collector.fetchPagesDetermined(reqData) // fetch pages in parallel without number of total pages } else { - collector.fetchPagesUndetermined(reqData) + collector.fetchPagesUndetermined(reqData, false) } } @@ -296,6 +298,12 @@ func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) { collector.fetchAsync(reqData, func(count int, body []byte, res *http.Response) errors.Error { totalPages, err := collector.args.GetTotalPages(res, collector.args) if err != nil { + // Some APIs might or might not return total pages/records based on total number of records + // check https://github.com/apache/incubator-devlake/issues/8187 for details + if err == ErrUndetermined { + collector.fetchPagesUndetermined(reqData, true) + return nil + } return errors.Default.Wrap(err, "fetchPagesDetermined get totalPages failed") } // spawn a none blocking go routine to fetch other pages @@ -319,7 +327,7 @@ func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) { } // fetchPagesUndetermined fetches data of all pages for APIs that do NOT return paging information -func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) { +func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData, skipFirstPage bool) { //logger := collector.args.Ctx.GetLogger() //logger.Debug("fetch all pages in parallel with specified concurrency: %d", collector.args.Concurrency) // if api doesn't return total number of pages, employ a step concurrent technique @@ -351,6 +359,9 @@ func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) { Input: reqData.Input, InputJSON: reqData.InputJSON, } + if skipFirstPage && reqDataCopy.Pager.Page == 1 { + reqDataCopy.Pager.Page += concurrency + } var collect func() errors.Error collect = func() errors.Error { collector.fetchAsync(&reqDataCopy, func(count int, body []byte, res *http.Response) errors.Error { diff --git a/backend/plugins/gitlab/tasks/shared.go b/backend/plugins/gitlab/tasks/shared.go index 59ce8b921ef..e8f8c650bdb 100644 --- a/backend/plugins/gitlab/tasks/shared.go +++ b/backend/plugins/gitlab/tasks/shared.go @@ -33,7 +33,7 @@ import ( "github.com/apache/incubator-devlake/plugins/gitlab/models" "github.com/apache/incubator-devlake/core/plugin" - helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/helpers/pluginhelper/api" ) const ( @@ -62,10 +62,10 @@ type GitlabInput struct { Iid int } -func GetTotalPagesFromResponse(res *http.Response, args *helper.ApiCollectorArgs) (int, errors.Error) { +func GetTotalPagesFromResponse(res *http.Response, args *api.ApiCollectorArgs) (int, errors.Error) { total := res.Header.Get("X-Total-Pages") if total == "" { - return 0, nil + return 0, api.ErrUndetermined } totalInt, err := strconv.Atoi(total) if err != nil { @@ -140,13 +140,13 @@ func GetRawMessageUpdatedAtAfter(timeAfter *time.Time) func(res *http.Response) } } if isFinish { - return filterRawMessages, helper.ErrFinishCollect + return filterRawMessages, api.ErrFinishCollect } return filterRawMessages, nil } } -func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) { +func GetQuery(reqData *api.RequestData) (url.Values, errors.Error) { query := url.Values{} query.Set("with_stats", "true") query.Set("sort", "asc") @@ -155,9 +155,9 @@ func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) { return query, nil } -func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*helper.RawDataSubTaskArgs, *GitlabTaskData) { +func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*api.RawDataSubTaskArgs, *GitlabTaskData) { data := subtaskCtx.GetData().(*GitlabTaskData) - rawDataSubTaskArgs := &helper.RawDataSubTaskArgs{ + rawDataSubTaskArgs := &api.RawDataSubTaskArgs{ Ctx: subtaskCtx, Params: models.GitlabApiParams{ ProjectId: data.Options.ProjectId, @@ -168,9 +168,9 @@ func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (* return rawDataSubTaskArgs, data } -func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*helper.SubtaskCommonArgs, *GitlabTaskData) { +func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*api.SubtaskCommonArgs, *GitlabTaskData) { data := subtaskCtx.GetData().(*GitlabTaskData) - args := &helper.SubtaskCommonArgs{ + args := &api.SubtaskCommonArgs{ SubTaskContext: subtaskCtx, Table: table, Params: models.GitlabApiParams{ @@ -181,7 +181,7 @@ func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*h return args, data } -func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { +func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*GitlabTaskData) clauses := []dal.Clause{ @@ -204,5 +204,5 @@ func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helpe return nil, err } - return helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{})) + return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{})) }