Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherrypick #8201 to release v1.0 #8202

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions backend/helpers/pluginhelper/api/api_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type RequestData struct {
// AsyncResponseHandler FIXME ...
type AsyncResponseHandler func(res *http.Response) error

var ErrUndetermined = errors.BadInput.New("undetermined")

// ApiCollectorArgs FIXME ...
type ApiCollectorArgs struct {
RawDataSubTaskArgs
Expand Down Expand Up @@ -259,7 +261,7 @@ func (collector *ApiCollector) exec(input interface{}) {
collector.fetchPagesDetermined(reqData)
// fetch pages in parallel without number of total pages
} else {
collector.fetchPagesUndetermined(reqData)
collector.fetchPagesUndetermined(reqData, false)
}
}

Expand Down Expand Up @@ -296,6 +298,12 @@ func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
collector.fetchAsync(reqData, func(count int, body []byte, res *http.Response) errors.Error {
totalPages, err := collector.args.GetTotalPages(res, collector.args)
if err != nil {
// Some APIs might or might not return total pages/records based on total number of records
// check https://github.com/apache/incubator-devlake/issues/8187 for details
if err == ErrUndetermined {
collector.fetchPagesUndetermined(reqData, true)
return nil
}
return errors.Default.Wrap(err, "fetchPagesDetermined get totalPages failed")
}
// spawn a none blocking go routine to fetch other pages
Expand All @@ -319,7 +327,7 @@ func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
}

// fetchPagesUndetermined fetches data of all pages for APIs that do NOT return paging information
func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) {
func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData, skipFirstPage bool) {
//logger := collector.args.Ctx.GetLogger()
//logger.Debug("fetch all pages in parallel with specified concurrency: %d", collector.args.Concurrency)
// if api doesn't return total number of pages, employ a step concurrent technique
Expand Down Expand Up @@ -351,6 +359,9 @@ func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) {
Input: reqData.Input,
InputJSON: reqData.InputJSON,
}
if skipFirstPage && reqDataCopy.Pager.Page == 1 {
reqDataCopy.Pager.Page += concurrency
}
var collect func() errors.Error
collect = func() errors.Error {
collector.fetchAsync(&reqDataCopy, func(count int, body []byte, res *http.Response) errors.Error {
Expand Down
22 changes: 11 additions & 11 deletions backend/plugins/gitlab/tasks/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/apache/incubator-devlake/plugins/gitlab/models"

"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)

const (
Expand Down Expand Up @@ -62,10 +62,10 @@ type GitlabInput struct {
Iid int
}

func GetTotalPagesFromResponse(res *http.Response, args *helper.ApiCollectorArgs) (int, errors.Error) {
func GetTotalPagesFromResponse(res *http.Response, args *api.ApiCollectorArgs) (int, errors.Error) {
total := res.Header.Get("X-Total-Pages")
if total == "" {
return 0, nil
return 0, api.ErrUndetermined
}
totalInt, err := strconv.Atoi(total)
if err != nil {
Expand Down Expand Up @@ -140,13 +140,13 @@ func GetRawMessageUpdatedAtAfter(timeAfter *time.Time) func(res *http.Response)
}
}
if isFinish {
return filterRawMessages, helper.ErrFinishCollect
return filterRawMessages, api.ErrFinishCollect
}
return filterRawMessages, nil
}
}

func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) {
func GetQuery(reqData *api.RequestData) (url.Values, errors.Error) {
query := url.Values{}
query.Set("with_stats", "true")
query.Set("sort", "asc")
Expand All @@ -155,9 +155,9 @@ func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) {
return query, nil
}

func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*helper.RawDataSubTaskArgs, *GitlabTaskData) {
func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*api.RawDataSubTaskArgs, *GitlabTaskData) {
data := subtaskCtx.GetData().(*GitlabTaskData)
rawDataSubTaskArgs := &helper.RawDataSubTaskArgs{
rawDataSubTaskArgs := &api.RawDataSubTaskArgs{
Ctx: subtaskCtx,
Params: models.GitlabApiParams{
ProjectId: data.Options.ProjectId,
Expand All @@ -168,9 +168,9 @@ func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*
return rawDataSubTaskArgs, data
}

func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*helper.SubtaskCommonArgs, *GitlabTaskData) {
func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*api.SubtaskCommonArgs, *GitlabTaskData) {
data := subtaskCtx.GetData().(*GitlabTaskData)
args := &helper.SubtaskCommonArgs{
args := &api.SubtaskCommonArgs{
SubTaskContext: subtaskCtx,
Table: table,
Params: models.GitlabApiParams{
Expand All @@ -181,7 +181,7 @@ func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*h
return args, data
}

func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
clauses := []dal.Clause{
Expand All @@ -204,5 +204,5 @@ func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helpe
return nil, err
}

return helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{}))
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{}))
}
Loading