diff --git a/cmd/catchup.go b/cmd/catchup.go index 82f870d..f580604 100644 --- a/cmd/catchup.go +++ b/cmd/catchup.go @@ -11,12 +11,12 @@ import ( "sync" "syscall" - "github.com/cluttrdev/cli" + "github.com/alitto/pond" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/cluttrdev/gitlab-exporter/pkg/worker" + "github.com/cluttrdev/cli" "github.com/cluttrdev/gitlab-exporter/internal/config" "github.com/cluttrdev/gitlab-exporter/internal/exporter" @@ -104,25 +104,11 @@ func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error { g := &run.Group{} - pool := worker.NewWorkerPool(42) - { // worker pool - ctx, cancel := context.WithCancel(context.Background()) - - g.Add(func() error { // execute - slog.Info("Starting worker pool") - pool.Start(ctx) - <-ctx.Done() - return ctx.Err() - }, func(err error) { // interrupt - defer cancel() - slog.Info("Stopping worker pool...") - pool.Stop() - slog.Info("Stopping worker pool... done") - }) - } - if len(cfg.Projects) > 0 { // jobs - ctx, cancel := context.WithCancel(context.Background()) + ctxJobs, cancelJobs := context.WithCancel(context.Background()) + + slog.Info("Starting worker pool") + pool := pond.New(42, 1024, pond.Context(ctxJobs)) g.Add(func() error { // execute var wg sync.WaitGroup @@ -149,8 +135,12 @@ func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error { return nil }, func(err error) { // interrupt slog.Info("Cancelling jobs...") - cancel() - <-ctx.Done() + cancelJobs() + + slog.Info("Stopping worker pool...") + pool.StopAndWait() + slog.Info("Stopping worker pool... done") + slog.Info("Cancelling jobs... done") }) } else { diff --git a/cmd/fetch_pipeline.go b/cmd/fetch_pipeline.go index 5c2ec4d..5169fcd 100644 --- a/cmd/fetch_pipeline.go +++ b/cmd/fetch_pipeline.go @@ -89,8 +89,8 @@ func (c *FetchPipelineConfig) Exec(ctx context.Context, args []string) error { FetchSections: c.fetchSections, } - phr := <-glc.GetPipelineHierarchy(ctx, projectID, pipelineID, opt) - if err := phr.Error; err != nil { + phr, err := glc.GetPipelineHierarchy(ctx, projectID, pipelineID, opt) + if err != nil { return fmt.Errorf("error fetching pipeline hierarchy: %w", err) } ph := phr.PipelineHierarchy diff --git a/cmd/run.go b/cmd/run.go index f0c3a74..0f744be 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -16,6 +16,7 @@ import ( "sync" "syscall" + "github.com/alitto/pond" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -24,8 +25,6 @@ import ( "github.com/cluttrdev/cli" - "github.com/cluttrdev/gitlab-exporter/pkg/worker" - "github.com/cluttrdev/gitlab-exporter/internal/config" "github.com/cluttrdev/gitlab-exporter/internal/exporter" "github.com/cluttrdev/gitlab-exporter/internal/gitlab" @@ -134,6 +133,7 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { } // gather projects from config + slog.Info("Resolving projects to export...") projects, err := resolveProjects(ctx, cfg, gitlabclient) if err != nil { return fmt.Errorf("error resolving projects: %w", err) @@ -155,25 +155,12 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { g := &run.Group{} - pool := worker.NewWorkerPool(42) - { // worker pool - ctx, cancel := context.WithCancel(context.Background()) - - g.Add(func() error { // execute - slog.Info("Starting worker pool") - pool.Start(ctx) - <-ctx.Done() - return ctx.Err() - }, func(err error) { // interrupt - defer cancel() - slog.Info("Stopping worker pool...") - pool.Stop() - slog.Info("Stopping worker pool... done") - }) - } - if len(projects) > 0 { // jobs - ctx, cancel := context.WithCancel(context.Background()) + slog.Info(fmt.Sprintf("Found %d projects to export", len(projects))) + ctxJobs, cancelJobs := context.WithCancel(context.Background()) + + slog.Info("Starting worker pool") + pool := pond.New(42, 1024, pond.Context(ctxJobs)) g.Add(func() error { // execute var wg sync.WaitGroup @@ -189,7 +176,7 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { wg.Add(1) go func() { defer wg.Done() - job.Run(ctx) + job.Run(ctxJobs) }() } @@ -203,7 +190,7 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { wg.Add(1) go func() { defer wg.Done() - job.Run(ctx) + job.Run(ctxJobs) }() } @@ -211,8 +198,7 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { return nil }, func(err error) { // interrupt slog.Info("Cancelling jobs...") - cancel() - <-ctx.Done() + cancelJobs() slog.Info("Cancelling jobs... done") }) } else { @@ -296,43 +282,47 @@ func serveHTTP(cfg config.HTTP, reg *prometheus.Registry) (func() error, func(er } func resolveProjects(ctx context.Context, cfg config.Config, glab *gitlab.Client) ([]config.Project, error) { - pm := make(map[int64]config.Project) + projectConfigs := make(map[int64]config.Project) opt := gitlab.ListNamespaceProjectsOptions{} for _, namespace := range cfg.Namespaces { opt.Kind = namespace.Kind - opt.Visibility = (*gitlab.VisibilityValue)(&namespace.Visibility) + opt.Visibility = (*_gitlab.VisibilityValue)(&namespace.Visibility) opt.WithShared = namespace.WithShared opt.IncludeSubgroups = namespace.IncludeSubgroups - ps, err := glab.ListNamespaceProjects(ctx, namespace.Id, opt) - if err != nil { - return nil, err - } - - for _, p := range ps { - pm[p.Id] = config.Project{ - ProjectSettings: namespace.ProjectSettings, - Id: p.Id, + err := glab.ListNamespaceProjects(ctx, namespace.Id, opt, func(projects []*_gitlab.Project) bool { + for _, project := range projects { + projectID := int64(project.ID) + projectConfigs[projectID] = config.Project{ + ProjectSettings: namespace.ProjectSettings, + Id: projectID, + } } - } - for _, pid := range namespace.ExcludeProjects { - p, _, err := glab.Client().Projects.GetProject(pid, nil, _gitlab.WithContext(ctx)) - if err != nil { - return nil, fmt.Errorf("error getting project %q: %w", pid, err) + for _, pid := range namespace.ExcludeProjects { + p, _, err := glab.Client().Projects.GetProject(pid, nil, _gitlab.WithContext(ctx)) + if err != nil { + slog.Error("error getting namespace project", "namespace_id", namespace.Id, "project", pid, "error", err) + return false + } + delete(projectConfigs, int64(p.ID)) } - delete(pm, int64(p.ID)) + + return true + }) + if err != nil { + return nil, err } } // overwrite with explicitly configured projects for _, p := range cfg.Projects { - pm[p.Id] = p + projectConfigs[p.Id] = p } - projects := make([]config.Project, 0, len(pm)) - for _, p := range pm { + projects := make([]config.Project, 0, len(projectConfigs)) + for _, p := range projectConfigs { projects = append(projects, p) } diff --git a/configs/gitlab-exporter.yaml b/configs/gitlab-exporter.yaml index e9e0103..5d45215 100644 --- a/configs/gitlab-exporter.yaml +++ b/configs/gitlab-exporter.yaml @@ -38,11 +38,11 @@ project_defaults: # requires fetching entire job logs to parse for embedded metrics. enabled: true - note_events: true - mergerequests: # Whether or not to export merge request data enabled: true + + note_events: true catch_up: # Whether to export data from past pipelines at startup. diff --git a/go.mod b/go.mod index d0c7bec..462a61a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cluttrdev/gitlab-exporter go 1.22 require ( + github.com/alitto/pond v1.9.2 github.com/cluttrdev/cli v0.0.0-20240318153739-b60d492ac8ff github.com/creasty/defaults v1.7.0 github.com/google/go-cmp v0.6.0 diff --git a/go.sum b/go.sum index 6bc3cf7..94dc1d7 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs= +github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/internal/gitlab/client.go b/internal/gitlab/client.go index 901497a..28e4904 100644 --- a/internal/gitlab/client.go +++ b/internal/gitlab/client.go @@ -99,131 +99,108 @@ type GetPipelineHierarchyOptions struct { type GetPipelineHierarchyResult struct { PipelineHierarchy *PipelineHierarchy Metrics []*typespb.Metric - Error error } -func (c *Client) GetPipelineHierarchy(ctx context.Context, projectID int64, pipelineID int64, opt *GetPipelineHierarchyOptions) <-chan GetPipelineHierarchyResult { - ch := make(chan GetPipelineHierarchyResult) +func (c *Client) GetPipelineHierarchy(ctx context.Context, projectID int64, pipelineID int64, opt *GetPipelineHierarchyOptions) (*GetPipelineHierarchyResult, error) { + pipeline, err := c.GetPipeline(ctx, projectID, pipelineID) + if err != nil { + return nil, err + } - go func() { - defer close(ch) + jobs := []*typespb.Job{} + sections := []*typespb.Section{} + metrics := []*typespb.Metric{} - pipeline, err := c.GetPipeline(ctx, projectID, pipelineID) - if err != nil { - ch <- GetPipelineHierarchyResult{ - Error: err, + js, err := c.GetPipelineJobs(ctx, projectID, pipelineID) + if err != nil { + return nil, fmt.Errorf("get pipeline jobs: %w", err) + } + + jobs = append(jobs, js...) + if opt.FetchSections || opt.FetchJobMetrics { + for _, job := range js { + r, err := c.GetJobLog(ctx, projectID, job.Id) + if err != nil { + return nil, fmt.Errorf("get job log: %w", err) } - return - } - jobs := []*typespb.Job{} - sections := []*typespb.Section{} - metrics := []*typespb.Metric{} - for jr := range c.ListPipelineJobs(ctx, projectID, pipelineID) { - if jr.Error != nil { - ch <- GetPipelineHierarchyResult{ - Error: fmt.Errorf("[ListPipelineJobs] %w", jr.Error), - } - return + data, err := ParseJobLog(r) + if err != nil { + return nil, fmt.Errorf("parse job log: %w", err) } - jobs = append(jobs, jr.Job) - - if opt.FetchSections || opt.FetchJobMetrics { - job := jr.Job - r, err := c.GetJobLog(ctx, projectID, job.Id) - if err != nil { - ch <- GetPipelineHierarchyResult{ - Error: fmt.Errorf("get job log: %w", err), - } - return - } - data, err := ParseJobLog(r) - if err != nil { - ch <- GetPipelineHierarchyResult{ - Error: fmt.Errorf("parse job log: %w", err), + if opt.FetchSections { + for secnum, secdat := range data.Sections { + section := &typespb.Section{ + Id: job.Id*1000 + int64(secnum), + Job: &typespb.JobReference{ + Id: job.Id, + Name: job.Name, + Status: job.Status, + }, + Pipeline: job.Pipeline, + Name: secdat.Name, + StartedAt: types.ConvertUnixSeconds(secdat.Start), + FinishedAt: types.ConvertUnixSeconds(secdat.End), + Duration: types.ConvertDuration(float64(secdat.End - secdat.Start)), } - return - } - if opt.FetchSections { - for secnum, secdat := range data.Sections { - section := &typespb.Section{ - Id: job.Id*1000 + int64(secnum), - Job: &typespb.JobReference{ - Id: job.Id, - Name: job.Name, - Status: job.Status, - }, - Pipeline: job.Pipeline, - Name: secdat.Name, - StartedAt: types.ConvertUnixSeconds(secdat.Start), - FinishedAt: types.ConvertUnixSeconds(secdat.End), - Duration: types.ConvertDuration(float64(secdat.End - secdat.Start)), - } - - sections = append(sections, section) - } + sections = append(sections, section) } + } - if opt.FetchJobMetrics { - var metricIID int = 0 - for _, m := range data.Metrics { - metricIID++ - metric := &typespb.Metric{ - Id: []byte(fmt.Sprintf("%d-%d", job.Id, metricIID)), - Iid: int64(metricIID), - JobId: job.Id, - Name: m.Name, - Labels: convertLabels(m.Labels), - Value: m.Value, - Timestamp: types.ConvertUnixMilli(m.Timestamp), - } - metrics = append(metrics, metric) + if opt.FetchJobMetrics { + var metricIID int = 0 + for _, m := range data.Metrics { + metricIID++ + metric := &typespb.Metric{ + Id: []byte(fmt.Sprintf("%d-%d", job.Id, metricIID)), + Iid: int64(metricIID), + JobId: job.Id, + Name: m.Name, + Labels: convertLabels(m.Labels), + Value: m.Value, + Timestamp: types.ConvertUnixMilli(m.Timestamp), } + metrics = append(metrics, metric) } } } + } - bridges := []*typespb.Bridge{} - dps := []*PipelineHierarchy{} - for br := range c.ListPipelineBridges(ctx, projectID, pipelineID) { - if br.Error != nil { - ch <- GetPipelineHierarchyResult{ - Error: fmt.Errorf("[ListPipelineBridges] %w", br.Error), - } - return - } - bridges = append(bridges, br.Bridge) + bridges := []*typespb.Bridge{} + dps := []*PipelineHierarchy{} - dp := br.Bridge.DownstreamPipeline - if dp == nil || dp.Id == 0 { - continue - } + bs, err := c.GetPipelineBridges(ctx, projectID, pipelineID) + if err != nil { + return nil, fmt.Errorf("get pipeline bridges: %w", err) + } - dpr := <-c.GetPipelineHierarchy(ctx, dp.ProjectId, dp.Id, opt) - if dpr.Error != nil { - ch <- GetPipelineHierarchyResult{ - Error: fmt.Errorf("[GetPipelineHierarchy] %w", dpr.Error), - } - return - } - dps = append(dps, dpr.PipelineHierarchy) + bridges = append(bridges, bs...) + for _, b := range bs { + dp := b.DownstreamPipeline + if dp == nil || dp.Id == 0 { + continue } - ch <- GetPipelineHierarchyResult{ - PipelineHierarchy: &PipelineHierarchy{ - Pipeline: pipeline, - Jobs: jobs, - Sections: sections, - Bridges: bridges, - DownstreamPipelines: dps, - }, - Metrics: metrics, + dpr, err := c.GetPipelineHierarchy(ctx, dp.ProjectId, dp.Id, opt) + if err != nil { + return nil, fmt.Errorf("get downstream pipeline hierarchy: %w", err) } - }() + dps = append(dps, dpr.PipelineHierarchy) + metrics = append(metrics, dpr.Metrics...) + } - return ch + return &GetPipelineHierarchyResult{ + PipelineHierarchy: &PipelineHierarchy{ + Pipeline: pipeline, + Jobs: jobs, + Sections: sections, + Bridges: bridges, + DownstreamPipelines: dps, + }, + Metrics: metrics, + }, nil } func convertLabels(labels map[string]string) []*typespb.Metric_Label { diff --git a/internal/gitlab/gitlab.go b/internal/gitlab/gitlab.go deleted file mode 100644 index 418895e..0000000 --- a/internal/gitlab/gitlab.go +++ /dev/null @@ -1,21 +0,0 @@ -package gitlab - -import ( - "fmt" - "strconv" -) - -func Ptr[T any](v T) *T { - return &v -} - -func parseID(id interface{}) (string, error) { - switch v := id.(type) { - case int: - return strconv.Itoa(v), nil - case string: - return v, nil - default: - return "", fmt.Errorf("invalid ID type %#v, the ID must be an int or a string", id) - } -} diff --git a/internal/gitlab/job_logs.go b/internal/gitlab/job_logs.go index 0aa4be6..bf845d9 100644 --- a/internal/gitlab/job_logs.go +++ b/internal/gitlab/job_logs.go @@ -11,9 +11,7 @@ import ( ) func (c *Client) GetJobLog(ctx context.Context, projectID int64, jobID int64) (*bytes.Reader, error) { - c.RLock() trace, _, err := c.client.Jobs.GetTraceFile(int(projectID), int(jobID), _gitlab.WithContext(ctx)) - c.RUnlock() return trace, err } diff --git a/internal/gitlab/jobs.go b/internal/gitlab/jobs.go index b391beb..0facf6a 100644 --- a/internal/gitlab/jobs.go +++ b/internal/gitlab/jobs.go @@ -9,95 +9,82 @@ import ( "github.com/cluttrdev/gitlab-exporter/protobuf/typespb" ) -type ListPipelineJobsResult struct { - Job *typespb.Job - Error error -} - -func (c *Client) ListPipelineJobs(ctx context.Context, projectID int64, pipelineID int64) <-chan ListPipelineJobsResult { - ch := make(chan ListPipelineJobsResult) - - go func() { - defer close(ch) +func (c *Client) GetPipelineJobs(ctx context.Context, projectID int64, pipelineID int64) ([]*typespb.Job, error) { + var jobs []*typespb.Job + + opt := &gitlab.ListJobsOptions{ + ListOptions: gitlab.ListOptions{ + Pagination: "keyset", + PerPage: 100, + OrderBy: "updated_at", + Sort: "desc", + }, + IncludeRetried: gitlab.Ptr(false), + } + + options := []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + } + + for { + js, resp, err := c.client.Jobs.ListPipelineJobs(int(projectID), int(pipelineID), opt, options...) + if err != nil { + return nil, err + } - opts := &gitlab.ListJobsOptions{ - ListOptions: gitlab.ListOptions{ - PerPage: 100, - Page: 1, - }, - IncludeRetried: &[]bool{false}[0], + for _, j := range js { + jobs = append(jobs, types.ConvertJob(j)) } - for { - c.RLock() - jobs, res, err := c.client.Jobs.ListPipelineJobs(int(projectID), int(pipelineID), opts, gitlab.WithContext(ctx)) - c.RUnlock() - if err != nil { - ch <- ListPipelineJobsResult{ - Error: err, - } - return - } - - for _, j := range jobs { - ch <- ListPipelineJobsResult{ - Job: types.ConvertJob(j), - } - } - - if res.NextPage == 0 { - break - } - - opts.Page = res.NextPage + if resp.NextLink == "" { + break } - }() - return ch -} + options = []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + gitlab.WithKeysetPaginationParameters(resp.NextLink), + } + } -type ListPipelineBridgesResult struct { - Bridge *typespb.Bridge - Error error + return jobs, nil } -func (c *Client) ListPipelineBridges(ctx context.Context, projectID int64, pipelineID int64) <-chan ListPipelineBridgesResult { - ch := make(chan ListPipelineBridgesResult) +func (c *Client) GetPipelineBridges(ctx context.Context, projectID int64, pipelineID int64) ([]*typespb.Bridge, error) { + var bridges []*typespb.Bridge + + opts := &gitlab.ListJobsOptions{ + ListOptions: gitlab.ListOptions{ + Pagination: "keyset", + PerPage: 100, + OrderBy: "updated_at", + Sort: "desc", + }, + IncludeRetried: gitlab.Ptr(false), + } + + options := []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + } + + for { + bs, resp, err := c.client.Jobs.ListPipelineBridges(int(projectID), int(pipelineID), opts, options...) + if err != nil { + return nil, err + } - go func() { - defer close(ch) + for _, b := range bs { + bridges = append(bridges, types.ConvertBridge(b)) + } - opts := &gitlab.ListJobsOptions{ - ListOptions: gitlab.ListOptions{ - PerPage: 100, - Page: 1, - }, + if resp.NextLink == "" { + break } - for { - c.RLock() - bridges, res, err := c.client.Jobs.ListPipelineBridges(int(projectID), int(pipelineID), opts, gitlab.WithContext(ctx)) - c.RUnlock() - if err != nil { - ch <- ListPipelineBridgesResult{ - Error: err, - } - return - } - - for _, b := range bridges { - ch <- ListPipelineBridgesResult{ - Bridge: types.ConvertBridge(b), - } - } - - if res.NextPage == 0 { - break - } - - opts.Page = res.NextPage + options = []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + gitlab.WithKeysetPaginationParameters(resp.NextLink), } - }() + } - return ch + return bridges, nil } diff --git a/internal/gitlab/mergerequests.go b/internal/gitlab/mergerequests.go new file mode 100644 index 0000000..40f672e --- /dev/null +++ b/internal/gitlab/mergerequests.go @@ -0,0 +1,43 @@ +package gitlab + +import ( + "context" + + _gitlab "github.com/xanzy/go-gitlab" +) + +func ListProjectMergeRequests(ctx context.Context, glab *_gitlab.Client, pid int64, opt _gitlab.ListProjectMergeRequestsOptions, yield func(p []*_gitlab.MergeRequest) bool) error { + opt.ListOptions.Pagination = "keyset" + if opt.ListOptions.OrderBy == "" { + opt.ListOptions.OrderBy = "updated_at" + } + if opt.ListOptions.Sort == "" { + opt.ListOptions.Sort = "desc" + } + + options := []_gitlab.RequestOptionFunc{ + _gitlab.WithContext(ctx), + } + + for { + mrs, resp, err := glab.MergeRequests.ListProjectMergeRequests(int(pid), &opt, options...) + if err != nil { + return err + } + + if !yield(mrs) { + break + } + + if resp.NextLink == "" { + break + } + + options = []_gitlab.RequestOptionFunc{ + _gitlab.WithContext(ctx), + _gitlab.WithKeysetPaginationParameters(resp.NextLink), + } + } + + return nil +} diff --git a/internal/gitlab/pipelines.go b/internal/gitlab/pipelines.go index 1a1a684..104a1c5 100644 --- a/internal/gitlab/pipelines.go +++ b/internal/gitlab/pipelines.go @@ -4,56 +4,65 @@ import ( "context" "fmt" - gitlab "github.com/xanzy/go-gitlab" + _gitlab "github.com/xanzy/go-gitlab" "github.com/cluttrdev/gitlab-exporter/internal/types" "github.com/cluttrdev/gitlab-exporter/protobuf/typespb" ) -type ListOptions = gitlab.ListOptions -type ListProjectPipelinesOptions = gitlab.ListProjectPipelinesOptions +func ListProjectPipelines(ctx context.Context, glab *_gitlab.Client, pid int64, opt _gitlab.ListProjectPipelinesOptions, yield func(p []*_gitlab.PipelineInfo) bool) error { + opt.ListOptions.Pagination = "keyset" + if opt.ListOptions.OrderBy == "" { + opt.ListOptions.OrderBy = "updated_at" + } + if opt.ListOptions.Sort == "" { + opt.ListOptions.Sort = "desc" + } -type ListProjectPipelinesResult struct { - Pipeline *typespb.PipelineInfo - Error error -} + options := []_gitlab.RequestOptionFunc{ + _gitlab.WithContext(ctx), + } -func (c *Client) ListProjectPipelines(ctx context.Context, projectID int64, opt ListProjectPipelinesOptions) <-chan ListProjectPipelinesResult { - out := make(chan ListProjectPipelinesResult) - - go func() { - defer close(out) - - for { - c.RLock() - ps, resp, err := c.client.Pipelines.ListProjectPipelines(int(projectID), &opt, gitlab.WithContext(ctx)) - c.RUnlock() - if err != nil { - out <- ListProjectPipelinesResult{ - Error: err, - } - return - } - - for _, pi := range ps { - out <- ListProjectPipelinesResult{ - Pipeline: types.ConvertPipelineInfo(pi), - } - } - - if resp.NextPage == 0 { - break - } - opt.Page = resp.NextPage + for { + ps, resp, err := glab.Pipelines.ListProjectPipelines(int(pid), &opt, options...) + if err != nil { + return err } - }() - return out + if !yield(ps) { + break + } + + if resp.NextLink == "" { + break + } + + options = []_gitlab.RequestOptionFunc{ + _gitlab.WithContext(ctx), + _gitlab.WithKeysetPaginationParameters(resp.NextLink), + } + } + + return nil +} + +func FetchProjectPipelines(ctx context.Context, glab *_gitlab.Client, pid int64, opt _gitlab.ListProjectPipelinesOptions) ([]*_gitlab.PipelineInfo, error) { + var pipelines []*_gitlab.PipelineInfo + + err := ListProjectPipelines(ctx, glab, pid, opt, func(ps []*_gitlab.PipelineInfo) bool { + pipelines = append(pipelines, ps...) + return true + }) + if err != nil { + return nil, err + } + + return pipelines, nil } func (c *Client) GetPipeline(ctx context.Context, projectID int64, pipelineID int64) (*typespb.Pipeline, error) { c.RLock() - pipeline, _, err := c.client.Pipelines.GetPipeline(int(projectID), int(pipelineID), gitlab.WithContext(ctx)) + pipeline, _, err := c.client.Pipelines.GetPipeline(int(projectID), int(pipelineID), _gitlab.WithContext(ctx)) c.RUnlock() if err != nil { return nil, fmt.Errorf("error getting pipeline: %w", err) diff --git a/internal/gitlab/projects.go b/internal/gitlab/projects.go index c3828ff..95bbab9 100644 --- a/internal/gitlab/projects.go +++ b/internal/gitlab/projects.go @@ -11,35 +11,9 @@ import ( "github.com/cluttrdev/gitlab-exporter/protobuf/typespb" ) -type ListProjectsOptions = gitlab.ListProjectsOptions -type ListGroupProjectsOptions = gitlab.ListGroupProjectsOptions -type VisibilityValue = gitlab.VisibilityValue - -func (c *Client) ListProjects(ctx context.Context, opt ListProjectsOptions) ([]*typespb.Project, error) { - var projects []*typespb.Project - - for { - ps, resp, err := c.client.Projects.ListProjects(&opt, gitlab.WithContext(ctx)) - if err != nil { - return projects, err - } - - for _, p := range ps { - projects = append(projects, types.ConvertProject(p)) - } - - if resp.NextPage == 0 { - break - } - opt.Page = resp.NextPage - } - - return projects, nil -} - func (c *Client) GetProject(ctx context.Context, id int64) (*typespb.Project, error) { opt := gitlab.GetProjectOptions{ - Statistics: Ptr(true), + Statistics: gitlab.Ptr(true), } p, _, err := c.client.Projects.GetProject(int(id), &opt, gitlab.WithContext(ctx)) @@ -58,68 +32,98 @@ type ListNamespaceProjectsOptions struct { IncludeSubgroups bool } -func (c *Client) ListNamespaceProjects(ctx context.Context, id interface{}, opt ListNamespaceProjectsOptions) ([]*typespb.Project, error) { +func (c *Client) ListNamespaceProjects(ctx context.Context, id interface{}, opt ListNamespaceProjectsOptions, yield func(projects []*gitlab.Project) bool) error { kind := strings.ToLower(opt.Kind) if !(strings.EqualFold(kind, "user") || strings.EqualFold(kind, "group")) { n, _, err := c.client.Namespaces.GetNamespace(id, gitlab.WithContext(ctx)) if err != nil { - return nil, fmt.Errorf("error determining namespace kind: %w", err) + return fmt.Errorf("error determining namespace kind: %w", err) } kind = n.Kind } if kind == "user" { - return c.ListUserProjects(ctx, id, opt.ListProjectsOptions) + opts := opt.ListProjectsOptions + return c.ListUserProjects(ctx, id, opts, yield) } else if kind == "group" { - return c.ListGroupProjects(ctx, id, gitlab.ListGroupProjectsOptions{ + opts := gitlab.ListGroupProjectsOptions{ ListOptions: opt.ListOptions, WithShared: &opt.WithShared, IncludeSubGroups: &opt.IncludeSubgroups, - }) + } + return c.ListGroupProjects(ctx, id, opts, yield) } - return nil, fmt.Errorf("invalid namespace kind: %v", kind) + return fmt.Errorf("invalid namespace kind: %v", kind) } -func (c *Client) ListUserProjects(ctx context.Context, uid interface{}, opt ListProjectsOptions) ([]*typespb.Project, error) { - var projects []*typespb.Project +func (c *Client) ListUserProjects(ctx context.Context, uid interface{}, opt gitlab.ListProjectsOptions, yield func(projects []*gitlab.Project) bool) error { + opt.ListOptions.Pagination = "keyset" + if opt.ListOptions.OrderBy == "" { + opt.ListOptions.OrderBy = "updated_at" + } + if opt.ListOptions.Sort == "" { + opt.ListOptions.Sort = "desc" + } + + options := []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + } for { - ps, resp, err := c.client.Projects.ListUserProjects(uid, &opt, gitlab.WithContext(ctx)) + ps, resp, err := c.client.Projects.ListUserProjects(uid, &opt, options...) if err != nil { - return projects, err + return err } - for _, p := range ps { - projects = append(projects, types.ConvertProject(p)) + if !yield(ps) { + break } - if resp.NextPage == 0 { + if resp.NextLink == "" { break } - opt.Page = resp.NextPage + + options = []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + gitlab.WithKeysetPaginationParameters(resp.NextLink), + } } - return projects, nil + return nil } -func (c *Client) ListGroupProjects(ctx context.Context, gid interface{}, opt ListGroupProjectsOptions) ([]*typespb.Project, error) { - var projects []*typespb.Project +func (c *Client) ListGroupProjects(ctx context.Context, gid interface{}, opt gitlab.ListGroupProjectsOptions, yield func(projects []*gitlab.Project) bool) error { + opt.ListOptions.Pagination = "keyset" + if opt.ListOptions.OrderBy == "" { + opt.ListOptions.OrderBy = "updated_at" + } + if opt.ListOptions.Sort == "" { + opt.ListOptions.Sort = "desc" + } + + options := []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + } for { - ps, resp, err := c.client.Groups.ListGroupProjects(gid, &opt, gitlab.WithContext(ctx)) + ps, resp, err := c.client.Groups.ListGroupProjects(gid, &opt, options...) if err != nil { - return projects, err + return err } - for _, p := range ps { - projects = append(projects, types.ConvertProject(p)) + if !yield(ps) { + break } - if resp.NextPage == 0 { + if resp.NextLink == "" { break } - opt.Page = resp.NextPage + + options = []gitlab.RequestOptionFunc{ + gitlab.WithContext(ctx), + gitlab.WithKeysetPaginationParameters(resp.NextLink), + } } - return projects, nil + return nil } diff --git a/internal/gitlab/reports.go b/internal/gitlab/reports.go index 45cb9bf..3511c02 100644 --- a/internal/gitlab/reports.go +++ b/internal/gitlab/reports.go @@ -19,21 +19,19 @@ type PipelineTestReportData struct { } func (c *Client) GetPipelineTestReport(ctx context.Context, projectID int64, pipelineID int64) (*PipelineTestReportData, error) { - c.RLock() - defer c.RUnlock() report, _, err := c.client.Pipelines.GetPipelineTestReport(int(projectID), int(pipelineID), gitlab.WithContext(ctx)) if err != nil { - return nil, fmt.Errorf("error getting pipeline test report: %w", err) + return nil, fmt.Errorf("get pipeline test report: %w", err) } summary, err := c.GetPipelineTestReportSummary(ctx, projectID, pipelineID) if err != nil { - return nil, fmt.Errorf("error getting pipeline test report summary: %w", err) + return nil, fmt.Errorf("get pipeline test report summary: %w", err) } testreport, testsuites, testcases := types.ConvertTestReport(pipelineID, report) if err := overrideIDs(pipelineID, summary, testreport, testsuites, testcases); err != nil { - return nil, fmt.Errorf("error setting test report ids: %w", err) + return nil, fmt.Errorf("set test report ids: %w", err) } return &PipelineTestReportData{ diff --git a/internal/jobs/catchup.go b/internal/jobs/catchup.go index a8a48bd..abd7fd4 100644 --- a/internal/jobs/catchup.go +++ b/internal/jobs/catchup.go @@ -2,7 +2,6 @@ package jobs import ( "context" - "errors" "log/slog" "sync" "time" @@ -13,8 +12,6 @@ import ( "github.com/cluttrdev/gitlab-exporter/internal/exporter" "github.com/cluttrdev/gitlab-exporter/internal/gitlab" "github.com/cluttrdev/gitlab-exporter/internal/tasks" - "github.com/cluttrdev/gitlab-exporter/internal/types" - "github.com/cluttrdev/gitlab-exporter/pkg/worker" "github.com/cluttrdev/gitlab-exporter/protobuf/typespb" ) @@ -23,7 +20,7 @@ type ProjectCatchUpJob struct { GitLab *gitlab.Client Exporter *exporter.Exporter - WorkerPool *worker.Pool + WorkerPool WorkerPool } func (j *ProjectCatchUpJob) Run(ctx context.Context) { @@ -32,7 +29,13 @@ func (j *ProjectCatchUpJob) Run(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - j.exportProjectPipelines(ctx, &wg) + j.exportProject(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + j.exportProjectPipelines(ctx) }() wg.Add(1) @@ -44,21 +47,48 @@ func (j *ProjectCatchUpJob) Run(ctx context.Context) { wg.Wait() } -func (j *ProjectCatchUpJob) exportProjectPipelines(ctx context.Context, wg *sync.WaitGroup) { +func (j *ProjectCatchUpJob) submit(task func()) bool { + if j.WorkerPool.Stopped() { + return false + } + + j.WorkerPool.Submit(task) + return true +} + +func (j *ProjectCatchUpJob) exportProject(ctx context.Context) { + projectID := j.Config.Id + + _ = j.submit(func() { + project, err := j.GitLab.GetProject(ctx, projectID) + if err != nil { + slog.Error("error getting project", "project_id", projectID, "error", err) + return + } + + if err := j.Exporter.ExportProjects(ctx, []*typespb.Project{project}); err != nil { + slog.Error("error exporting project", "project_id", projectID, "error", err) + } + }) +} + +func (j *ProjectCatchUpJob) exportProjectPipelines(ctx context.Context) { projectID := j.Config.Id + glab := j.GitLab.Client() - opt := gitlab.ListProjectPipelinesOptions{ - ListOptions: gitlab.ListOptions{ + opt := _gitlab.ListProjectPipelinesOptions{ + ListOptions: _gitlab.ListOptions{ PerPage: 100, - Page: 1, + OrderBy: "updated_at", + Sort: "desc", }, - - Scope: gitlab.Ptr("finished"), + Scope: _gitlab.Ptr("finished"), } if j.Config.CatchUp.UpdatedAfter != "" { after, err := time.Parse("2006-01-02T15:04:05Z", j.Config.CatchUp.UpdatedAfter) if err != nil { slog.Error("error parsing catchup update_after", "error", err) + return } else { opt.UpdatedAfter = &after } @@ -67,51 +97,53 @@ func (j *ProjectCatchUpJob) exportProjectPipelines(ctx context.Context, wg *sync before, err := time.Parse("2006-01-02T15:04:05Z", j.Config.CatchUp.UpdatedBefore) if err != nil { slog.Error("error parsing catchup update_before", "error", err) + return } else { opt.UpdatedBefore = &before } } - pipelines := j.GitLab.ListProjectPipelines(ctx, j.Config.Id, opt) - for r := range pipelines { - if r.Error != nil { - if errors.Is(r.Error, context.Canceled) { - return - } else { - slog.Error("error listing project pipelines", "error", r.Error) + err := gitlab.ListProjectPipelines(ctx, glab, projectID, opt, func(pipelines []*_gitlab.PipelineInfo) bool { + for _, pipeline := range pipelines { + pipelineID := int64(pipeline.ID) + submitted := j.submit(func() { + err := tasks.ExportPipelineHierarchy(ctx, j.GitLab, j.Exporter, tasks.ExportPipelineHierarchyOptions{ + ProjectID: projectID, + PipelineID: pipelineID, + + ExportSections: j.Config.Export.Sections.Enabled, + ExportTestReports: j.Config.Export.TestReports.Enabled, + ExportTraces: j.Config.Export.Traces.Enabled, + ExportMetrics: j.Config.Export.Metrics.Enabled, + }) + if err != nil { + slog.Error("error exporting pipeline hierarchy", "project_id", projectID, "pipeline_id", pipelineID, "error", err) + } + }) + + if !submitted { + return false } - continue } - pipelineID := r.Pipeline.Id - wg.Add(1) - j.WorkerPool.Submit(func(ctx context.Context) { - defer wg.Done() - err := tasks.ExportPipelineHierarchy(ctx, j.GitLab, j.Exporter, tasks.ExportPipelineHierarchyOptions{ - ProjectID: projectID, - PipelineID: pipelineID, - - ExportSections: j.Config.Export.Sections.Enabled, - ExportTestReports: j.Config.Export.TestReports.Enabled, - ExportTraces: j.Config.Export.Traces.Enabled, - ExportMetrics: j.Config.Export.Metrics.Enabled, - }) - if err != nil { - slog.Error(err.Error()) - } - }) + return true + }) + + if err != nil { + slog.Error("error listing project pipelines", "project_id", projectID, "error", err) } } func (j *ProjectCatchUpJob) exportProjectMergeRequests(ctx context.Context) { - projectID := j.Config.Id + projectID := int(j.Config.Id) + glab := j.GitLab.Client() + exp := j.Exporter opt := _gitlab.ListProjectMergeRequestsOptions{ ListOptions: _gitlab.ListOptions{ - Pagination: "keyset", - PerPage: 100, - OrderBy: "updated_at", - Sort: "desc", + PerPage: 100, + OrderBy: "updated_at", + Sort: "desc", }, View: _gitlab.Ptr("simple"), } @@ -133,59 +165,28 @@ func (j *ProjectCatchUpJob) exportProjectMergeRequests(ctx context.Context) { } } - options := []_gitlab.RequestOptionFunc{ - _gitlab.WithContext(ctx), - } - - var wg sync.WaitGroup - for { - mrs, resp, err := j.GitLab.Client().MergeRequests.ListProjectMergeRequests(int(projectID), &opt, options...) - if err != nil { - slog.Error("error fetching project merge requests", "project", projectID, "error", err) - break - } - + err := gitlab.ListProjectMergeRequests(ctx, glab, int64(projectID), opt, func(mrs []*_gitlab.MergeRequest) bool { iids := make([]int, 0, len(mrs)) for _, mr := range mrs { iids = append(iids, mr.IID) } - wg.Add(1) - j.WorkerPool.Submit(func(ctx context.Context) { - defer wg.Done() - mergerequests := make([]*typespb.MergeRequest, 0, len(iids)) - - opt := _gitlab.GetMergeRequestsOptions{} - for _, iid := range iids { - mr, _, err := j.GitLab.Client().MergeRequests.GetMergeRequest(int(projectID), iid, &opt, _gitlab.WithContext(ctx)) - if err != nil { - if errors.Is(err, context.Canceled) { - break - } - slog.Error(err.Error()) - continue - } + submitted := j.submit(func() { + opt := tasks.ExportProjectMergeRequestsOptions{ + ProjectID: projectID, + MergeRequestIIDs: iids, - mergerequests = append(mergerequests, types.ConvertMergeRequest(mr)) + ExportNoteEvents: j.Config.Export.MergeRequests.NoteEvents, } - - if len(mergerequests) == 0 { - return - } - - if err := j.Exporter.ExportMergeRequests(ctx, mergerequests); err != nil { - slog.Error(err.Error()) + if err := tasks.ExportProjectMergeRequests(ctx, glab, exp, opt); err != nil { + slog.Error("error exporting project merge requests", "project_id", projectID, "error", err) } }) - if resp.NextLink == "" { - break - } + return submitted + }) - options = []_gitlab.RequestOptionFunc{ - _gitlab.WithContext(ctx), - _gitlab.WithKeysetPaginationParameters(resp.NextLink), - } + if err != nil { + slog.Error("error listing project merge requests", "project_id", projectID, "error", err) } - wg.Wait() } diff --git a/internal/jobs/export.go b/internal/jobs/export.go index 96a25d7..ca26da9 100644 --- a/internal/jobs/export.go +++ b/internal/jobs/export.go @@ -2,8 +2,6 @@ package jobs import ( "context" - "errors" - "fmt" "log/slog" "sync" "time" @@ -14,8 +12,6 @@ import ( "github.com/cluttrdev/gitlab-exporter/internal/exporter" "github.com/cluttrdev/gitlab-exporter/internal/gitlab" "github.com/cluttrdev/gitlab-exporter/internal/tasks" - "github.com/cluttrdev/gitlab-exporter/internal/types" - "github.com/cluttrdev/gitlab-exporter/pkg/worker" "github.com/cluttrdev/gitlab-exporter/protobuf/typespb" ) @@ -24,7 +20,7 @@ type ProjectExportJob struct { GitLab *gitlab.Client Exporter *exporter.Exporter - WorkerPool *worker.Pool + WorkerPool WorkerPool lastUpdate time.Time } @@ -34,8 +30,6 @@ func (j *ProjectExportJob) Run(ctx context.Context) { j.lastUpdate = time.Now().UTC() - projectID := j.Config.Id - ticker := time.NewTicker(period) var iteration int = 0 var wg sync.WaitGroup @@ -44,19 +38,16 @@ func (j *ProjectExportJob) Run(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - iteration++ - slog.Debug("Exporting project", "id", projectID, "iteration", iteration) - wg.Add(1) - j.WorkerPool.Submit(func(ctx context.Context) { + go func() { defer wg.Done() j.exportProject(ctx, iteration == 1) - }) + }() wg.Add(1) go func() { defer wg.Done() - j.exportProjectPipelines(ctx, &wg) + j.exportProjectPipelines(ctx) }() wg.Add(1) @@ -71,12 +62,21 @@ func (j *ProjectExportJob) Run(ctx context.Context) { } } +func (j *ProjectExportJob) submit(task func()) bool { + if j.WorkerPool.Stopped() { + return false + } + + j.WorkerPool.Submit(task) + return true +} + func (j *ProjectExportJob) exportProject(ctx context.Context, first bool) { projectID := j.Config.Id project, err := j.GitLab.GetProject(ctx, projectID) if err != nil { - slog.Error("error fetching project", "project", projectID, "error", err) + slog.Error("error fetching project", "project_id", projectID, "error", err) return } else if !project.LastActivityAt.AsTime().After(j.lastUpdate) && !first { return @@ -87,76 +87,68 @@ func (j *ProjectExportJob) exportProject(ctx context.Context, first bool) { } } -func (j *ProjectExportJob) exportProjectPipelines(ctx context.Context, wg *sync.WaitGroup) { +func (j *ProjectExportJob) exportProjectPipelines(ctx context.Context) { projectID := j.Config.Id + glab := j.GitLab.Client() - opt := gitlab.ListProjectPipelinesOptions{ - ListOptions: gitlab.ListOptions{ + opt := _gitlab.ListProjectPipelinesOptions{ + ListOptions: _gitlab.ListOptions{ PerPage: 100, - Page: 1, + OrderBy: "updated_at", + Sort: "desc", }, - - Scope: gitlab.Ptr("finished"), + Scope: _gitlab.Ptr("finished"), UpdatedAfter: &j.lastUpdate, } - pipelines := j.GitLab.ListProjectPipelines(ctx, projectID, opt) - for r := range pipelines { - if r.Error != nil { - if errors.Is(r.Error, context.Canceled) { - return + err := gitlab.ListProjectPipelines(ctx, glab, projectID, opt, func(pipelines []*_gitlab.PipelineInfo) bool { + for _, pipeline := range pipelines { + pipelineID := int64(pipeline.ID) + submitted := j.submit(func() { + err := tasks.ExportPipelineHierarchy(ctx, j.GitLab, j.Exporter, tasks.ExportPipelineHierarchyOptions{ + ProjectID: projectID, + PipelineID: pipelineID, + + ExportSections: j.Config.Export.Sections.Enabled, + ExportTestReports: j.Config.Export.TestReports.Enabled, + ExportTraces: j.Config.Export.Traces.Enabled, + ExportMetrics: j.Config.Export.Metrics.Enabled, + }) + if err != nil { + slog.Error("error exporting pipeline hierarchy", "project_id", projectID, "pipeline_id", pipelineID, "error", err) + } + }) + if !submitted { + return false } - slog.Error("error listing project pipelines", "error", r.Error) - continue } - pipelineID := r.Pipeline.Id - wg.Add(1) - j.WorkerPool.Submit(func(ctx context.Context) { - defer wg.Done() - err := tasks.ExportPipelineHierarchy(ctx, j.GitLab, j.Exporter, tasks.ExportPipelineHierarchyOptions{ - ProjectID: projectID, - PipelineID: pipelineID, - - ExportSections: j.Config.Export.Sections.Enabled, - ExportTestReports: j.Config.Export.TestReports.Enabled, - ExportTraces: j.Config.Export.Traces.Enabled, - ExportMetrics: j.Config.Export.Metrics.Enabled, - }) - if err != nil { - slog.Error(err.Error()) - } - }) + return true + }) + + if err != nil { + slog.Error("error listing project pipelines", "project_id", projectID, "error", err) } } func (j *ProjectExportJob) exportProjectMergeRequests(ctx context.Context) { projectID := int(j.Config.Id) glab := j.GitLab.Client() + exp := j.Exporter opt := _gitlab.ListProjectMergeRequestsOptions{ ListOptions: _gitlab.ListOptions{ - Pagination: "keyset", - PerPage: 100, - OrderBy: "updated_at", - Sort: "desc", + PerPage: 100, + OrderBy: "updated_at", + Sort: "desc", }, - View: _gitlab.Ptr("simple"), - + View: _gitlab.Ptr("simple"), UpdatedAfter: &j.lastUpdate, } - options := []_gitlab.RequestOptionFunc{ - _gitlab.WithContext(ctx), - } - - var wg sync.WaitGroup - for { - // get iids of updated merge requests - mrs, resp, err := glab.MergeRequests.ListProjectMergeRequests(projectID, &opt, options...) - if err != nil { - slog.Error("error fetching project merge requests", "project", projectID, "error", err) - break + err := gitlab.ListProjectMergeRequests(ctx, glab, int64(projectID), opt, func(mrs []*_gitlab.MergeRequest) bool { + if len(mrs) == 0 { + return true } iids := make([]int, 0, len(mrs)) @@ -164,66 +156,22 @@ func (j *ProjectExportJob) exportProjectMergeRequests(ctx context.Context) { iids = append(iids, mr.IID) } - wg.Add(1) - j.WorkerPool.Submit(func(ctx context.Context) { - defer wg.Done() - mergerequests := make([]*typespb.MergeRequest, 0, len(iids)) - mrNoteEvents := make([]*typespb.MergeRequestNoteEvent, 0, len(iids)) - - opt := _gitlab.GetMergeRequestsOptions{} - for _, iid := range iids { - mr, _, err := glab.MergeRequests.GetMergeRequest(projectID, iid, &opt, _gitlab.WithContext(ctx)) - if err != nil { - if errors.Is(err, context.Canceled) { - break - } - slog.Error("error fetching merge request", "project_id", projectID, "iid", iid) - continue - } - - mergerequests = append(mergerequests, types.ConvertMergeRequest(mr)) - - if j.Config.Export.MergeRequests.NoteEvents { - notes, err := tasks.FetchMergeRequestNotes(ctx, glab, projectID, iid) - if err != nil { - if errors.Is(err, context.Canceled) { - break - } - slog.Error("error fetching merge request note events", "project_id", projectID, "iid", iid) - continue - } - - for _, note := range notes { - if ev := types.ConvertToMergeRequestNoteEvent(note); ev != nil { - mrNoteEvents = append(mrNoteEvents, ev) - } - } - } - } + submitted := j.submit(func() { + opt := tasks.ExportProjectMergeRequestsOptions{ + ProjectID: projectID, + MergeRequestIIDs: iids, - if len(mergerequests) > 0 { - if err := j.Exporter.ExportMergeRequests(ctx, mergerequests); err != nil { - slog.Error(fmt.Sprintf("error exporting merge requests: %v", err)) - } + ExportNoteEvents: j.Config.Export.MergeRequests.NoteEvents, } - - if len(mrNoteEvents) > 0 { - if err := j.Exporter.ExportMergeRequestNoteEvents(ctx, mrNoteEvents); err != nil { - slog.Error(fmt.Sprintf("error exporting merge request note events: %v", err)) - } + if err := tasks.ExportProjectMergeRequests(ctx, glab, exp, opt); err != nil { + slog.Error("error exporting project merge requests", "project_id", projectID, "error", err) } }) - if resp.NextLink == "" { - break - } + return submitted + }) - options = []_gitlab.RequestOptionFunc{ - _gitlab.WithContext(ctx), - _gitlab.WithKeysetPaginationParameters(resp.NextLink), - } + if err != nil { + slog.Error("error listing project merge requests", "project_id", projectID, "error", err) } - - // wait for all paginated exports to finish - wg.Wait() } diff --git a/internal/jobs/pool.go b/internal/jobs/pool.go new file mode 100644 index 0000000..305f889 --- /dev/null +++ b/internal/jobs/pool.go @@ -0,0 +1,8 @@ +package jobs + +type WorkerPool interface { + StopAndWait() + Stopped() bool + + Submit(func()) +} diff --git a/internal/tasks/mergerequests.go b/internal/tasks/mergerequests.go index 14330a6..9157c25 100644 --- a/internal/tasks/mergerequests.go +++ b/internal/tasks/mergerequests.go @@ -2,20 +2,16 @@ package tasks import ( "context" + "errors" + "fmt" + "log/slog" gitlab "github.com/xanzy/go-gitlab" -) - -func FetchProjectMergeRequest(ctx context.Context, glab *gitlab.Client, pid int64, iid int64) (*gitlab.MergeRequest, error) { - opt := gitlab.GetMergeRequestsOptions{} - - mr, _, err := glab.MergeRequests.GetMergeRequest(int(pid), int(iid), &opt, gitlab.WithContext(ctx)) - if err != nil { - return nil, err - } - return mr, nil -} + "github.com/cluttrdev/gitlab-exporter/internal/exporter" + "github.com/cluttrdev/gitlab-exporter/internal/types" + "github.com/cluttrdev/gitlab-exporter/protobuf/typespb" +) func FetchMergeRequestNotes(ctx context.Context, glab *gitlab.Client, pid interface{}, iid int) ([]*gitlab.Note, error) { opts := gitlab.ListMergeRequestNotesOptions{ @@ -52,3 +48,56 @@ func FetchMergeRequestNotes(ctx context.Context, glab *gitlab.Client, pid interf return notes, nil } + +type ExportProjectMergeRequestsOptions struct { + ProjectID int + MergeRequestIIDs []int + + ExportNoteEvents bool +} + +func ExportProjectMergeRequests(ctx context.Context, glab *gitlab.Client, exp *exporter.Exporter, opt ExportProjectMergeRequestsOptions) error { + mergerequests := make([]*typespb.MergeRequest, 0, len(opt.MergeRequestIIDs)) + mrNoteEvents := make([]*typespb.MergeRequestNoteEvent, 0, len(opt.MergeRequestIIDs)) + + _opt := gitlab.GetMergeRequestsOptions{} + for _, iid := range opt.MergeRequestIIDs { + mr, _, err := glab.MergeRequests.GetMergeRequest(opt.ProjectID, iid, &_opt, gitlab.WithContext(ctx)) + if err != nil { + if errors.Is(err, context.Canceled) { + break + } + slog.Error("error fetching merge request", "project_id", opt.ProjectID, "iid", iid, "error", err) + continue + } + + mergerequests = append(mergerequests, types.ConvertMergeRequest(mr)) + + if opt.ExportNoteEvents { + notes, err := FetchMergeRequestNotes(ctx, glab, opt.ProjectID, iid) + if err != nil { + if errors.Is(err, context.Canceled) { + break + } + slog.Error("error fetching merge request note events", "project_id", opt.ProjectID, "iid", iid) + continue + } + + for _, note := range notes { + if ev := types.ConvertToMergeRequestNoteEvent(note); ev != nil { + mrNoteEvents = append(mrNoteEvents, ev) + } + } + } + } + + if err := exp.ExportMergeRequests(ctx, mergerequests); err != nil { + return fmt.Errorf("export merge requests: %w", err) + } + + if err := exp.ExportMergeRequestNoteEvents(ctx, mrNoteEvents); err != nil { + return fmt.Errorf("export merge request note events: %w", err) + } + + return nil +} diff --git a/internal/tasks/pipeline.go b/internal/tasks/pipeline.go index f6d473b..b1d7e0a 100644 --- a/internal/tasks/pipeline.go +++ b/internal/tasks/pipeline.go @@ -25,32 +25,32 @@ func ExportPipelineHierarchy(ctx context.Context, glc *gitlab.Client, exp *expor FetchJobMetrics: opts.ExportSections, } - phr := <-glc.GetPipelineHierarchy(ctx, opts.ProjectID, opts.PipelineID, opt) - if err := phr.Error; err != nil { - return fmt.Errorf("error getting pipeline hierarchy (project=%d pipeline=%d): %w", opts.ProjectID, opts.PipelineID, err) + phr, err := glc.GetPipelineHierarchy(ctx, opts.ProjectID, opts.PipelineID, opt) + if err != nil { + return fmt.Errorf("get pipeline hierarchy: %w", err) } ph := phr.PipelineHierarchy if err := exp.ExportPipelineHierarchy(ctx, ph); err != nil { - return fmt.Errorf("error exporting pipeline hierarchy (project=%d pipeline=%d): %w", opts.ProjectID, opts.PipelineID, err) + return fmt.Errorf("export pipeline hierarchy: %w", err) } if opts.ExportTraces { traces := ph.GetAllTraces() if err := exp.ExportTraces(ctx, traces); err != nil { - return fmt.Errorf("error exporting traces (project=%d pipeline=%d): %w", opts.ProjectID, opts.PipelineID, err) + return fmt.Errorf("export traces: %w", err) } } if opts.ExportMetrics { if err := exp.ExportMetrics(ctx, phr.Metrics); err != nil { - return fmt.Errorf("error exporting metrics (project=%d pipeline=%d): %w", opts.ProjectID, opts.PipelineID, err) + return fmt.Errorf("export metrics: %w", err) } } if opts.ExportTestReports { if err := exportPipelineHierarchyTestReports(ctx, glc, exp, ph); err != nil { - return err + return fmt.Errorf("export testreports: %w", err) } }