From 4a21cbac2ab09ef43965650071db9d064a521dd4 Mon Sep 17 00:00:00 2001 From: cluttrdev Date: Mon, 4 Dec 2023 16:55:17 +0100 Subject: [PATCH] Enable job embedded metrics export This change enables export of metrics embedded in job logs. For now, we export those metrics if sections are exported, i.e. if we fetch the job logs anyway. --- cmd/export_pipeline.go | 1 + internal/worker/catchup.go | 1 + internal/worker/export.go | 1 + pkg/gitlab/client.go | 77 +++++++++++++++++++++++++++++++++----- pkg/tasks/export.go | 4 +- 5 files changed, 74 insertions(+), 10 deletions(-) diff --git a/cmd/export_pipeline.go b/cmd/export_pipeline.go index b7a4791..59198f7 100644 --- a/cmd/export_pipeline.go +++ b/cmd/export_pipeline.go @@ -85,6 +85,7 @@ func (c *ExportPipelineConfig) Exec(ctx context.Context, args []string) error { ExportSections: c.exportSections, ExportTestReports: c.exportTestReports, ExportTraces: c.exportTraces, + ExportJobMetrics: c.exportSections, // for now, export metrics if we fetch the logs for sections anyway } return tasks.ExportPipelineHierarchy(ctx, opts, &ctl.GitLab, ctl.DataStore) diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index ad5106e..b4dc9b8 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -147,6 +147,7 @@ func (w *catchUpProjectWorker) process(ctx context.Context, pipelineChan <-chan ExportSections: w.project.Export.Sections.Enabled, ExportTestReports: w.project.Export.TestReports.Enabled, ExportTraces: w.project.Export.Traces.Enabled, + ExportJobMetrics: w.project.Export.Sections.Enabled, // for now, export metrics if we fetch the logs for sections anyway } if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil { diff --git a/internal/worker/export.go b/internal/worker/export.go index 7eeaf79..827eac0 100644 --- a/internal/worker/export.go +++ b/internal/worker/export.go @@ -105,6 +105,7 @@ func (w *exportProjectWorker) run(ctx context.Context) { ExportSections: w.project.Export.Sections.Enabled, ExportTestReports: w.project.Export.TestReports.Enabled, ExportTraces: w.project.Export.Traces.Enabled, + ExportJobMetrics: w.project.Export.Sections.Enabled, // for now, export metrics if we fetch the logs for sections anyway } if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil { diff --git a/pkg/gitlab/client.go b/pkg/gitlab/client.go index cf65ea4..90e25e1 100644 --- a/pkg/gitlab/client.go +++ b/pkg/gitlab/client.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "sync" + "time" "golang.org/x/time/rate" @@ -87,11 +88,13 @@ func (c *Client) CheckReadiness(ctx context.Context) error { } type GetPipelineHierarchyOptions struct { - FetchSections bool + FetchSections bool + FetchJobMetrics bool } type GetPipelineHierarchyResult struct { PipelineHierarchy *models.PipelineHierarchy + JobMetrics []*models.JobMetric Error error } @@ -101,6 +104,12 @@ func (c *Client) GetPipelineHierarchy(ctx context.Context, projectID int64, pipe go func() { defer close(ch) + unixTime := func(ts int64) *time.Time { + const nsec int64 = 0 + t := time.Unix(ts, nsec) + return &t + } + pipeline, err := c.GetPipeline(ctx, projectID, pipelineID) if err != nil { ch <- GetPipelineHierarchyResult{ @@ -111,6 +120,7 @@ func (c *Client) GetPipelineHierarchy(ctx context.Context, projectID int64, pipe jobs := []*models.Job{} sections := []*models.Section{} + metrics := []*models.JobMetric{} for jr := range c.ListPipelineJobs(ctx, projectID, pipelineID) { if jr.Error != nil { ch <- GetPipelineHierarchyResult{ @@ -120,16 +130,64 @@ func (c *Client) GetPipelineHierarchy(ctx context.Context, projectID int64, pipe } jobs = append(jobs, jr.Job) - if opt.FetchSections { - jobID := jr.Job.ID - for sr := range c.ListJobSections(ctx, projectID, jobID) { - if sr.Error != nil { - ch <- GetPipelineHierarchyResult{ - Error: fmt.Errorf("[ListJobSections] %w", sr.Error), + if opt.FetchSections || opt.FetchJobMetrics { + job := jr.Job + r, err := c.GetJobLog(ctx, projectID, job.ID) + if err != nil { + ch <- GetPipelineHierarchyResult{ + Error: fmt.Errorf("get job log: %w", err), + } + return + } + + data, err := parseJobLog(r) + if err != nil { + ch <- GetPipelineHierarchyResult{ + Error: fmt.Errorf("parse job log: %w", err), + } + return + } + + if opt.FetchSections { + for secnum, secdat := range data.Sections { + section := &models.Section{ + Name: secdat.Name, + StartedAt: unixTime(secdat.Start), + FinishedAt: unixTime(secdat.End), + Duration: float64(secdat.End - secdat.Start), } - return + + section.ID = job.ID*1000 + int64(secnum) + section.Job.ID = int64(job.ID) + section.Job.Name = job.Name + section.Job.Status = job.Status + section.Pipeline.ID = int64(job.Pipeline.ID) + section.Pipeline.ProjectID = int64(job.Pipeline.ProjectID) + section.Pipeline.Ref = job.Pipeline.Ref + section.Pipeline.Sha = job.Pipeline.Sha + section.Pipeline.Status = job.Pipeline.Status + + sections = append(sections, section) + } + } + + if opt.FetchJobMetrics { + for _, m := range data.Metrics { + metric := &models.JobMetric{ + Name: m.Name, + Value: m.Value, + Timestamp: m.TimestampMs, + } + + for _, pair := range m.Labels { + metric.Labels[pair.Name] = pair.Value + } + + metric.Job.ID = job.ID + metric.Job.Name = job.Name + + metrics = append(metrics, metric) } - sections = append(sections, sr.Section) } } } @@ -168,6 +226,7 @@ func (c *Client) GetPipelineHierarchy(ctx context.Context, projectID int64, pipe Bridges: bridges, DownstreamPipelines: dps, }, + JobMetrics: metrics, } }() diff --git a/pkg/tasks/export.go b/pkg/tasks/export.go index 4316c10..57f1822 100644 --- a/pkg/tasks/export.go +++ b/pkg/tasks/export.go @@ -17,6 +17,7 @@ type ExportPipelineHierarchyOptions struct { ExportSections bool ExportTestReports bool ExportTraces bool + ExportJobMetrics bool } func ExportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) error { @@ -30,7 +31,8 @@ func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOp defer close(out) opt := &gitlab.GetPipelineHierarchyOptions{ - FetchSections: opts.ExportSections, + FetchSections: opts.ExportSections, + FetchJobMetrics: opts.ExportSections, } phr := <-gl.GetPipelineHierarchy(ctx, opts.ProjectID, opts.PipelineID, opt)