From 74888842a514c28e4a3b6f8d793ef9158f3a298d Mon Sep 17 00:00:00 2001 From: cluttrdev Date: Wed, 22 Nov 2023 20:56:54 +0100 Subject: [PATCH] Replace clickhouse.Client usage with new DataStore interface --- cmd/export_pipeline.go | 2 +- pkg/controller/controller.go | 38 +++++++++++++++++++++--------------- pkg/tasks/export.go | 19 +++++++++--------- pkg/worker/catchup.go | 21 ++++++++++---------- pkg/worker/export.go | 19 +++++++++--------- 5 files changed, 54 insertions(+), 45 deletions(-) diff --git a/cmd/export_pipeline.go b/cmd/export_pipeline.go index 26d257d..1cee02c 100644 --- a/cmd/export_pipeline.go +++ b/cmd/export_pipeline.go @@ -87,5 +87,5 @@ func (c *ExportPipelineConfig) Exec(ctx context.Context, args []string) error { ExportTraces: c.exportTraces, } - return tasks.ExportPipelineHierarchy(ctx, opts, &ctl.GitLab, &ctl.ClickHouse) + return tasks.ExportPipelineHierarchy(ctx, opts, &ctl.GitLab, ctl.DataStore) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 922a45e..bc66de1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -13,12 +13,14 @@ import ( "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/config" gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab" "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/worker" + + "github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore" ) type Controller struct { - config config.Config - GitLab gitlab.Client - ClickHouse clickhouse.Client + config config.Config + GitLab gitlab.Client + DataStore datastore.DataStore workers []worker.Worker } @@ -40,7 +42,7 @@ func (c *Controller) configure(cfg config.Config) error { return err } - if err := c.configureClickHouseClient(cfg.ClickHouse); err != nil { + if err := c.configureClickHouseDataStore(cfg.ClickHouse); err != nil { return err } @@ -60,14 +62,22 @@ func (c *Controller) configureGitLabClient(cfg config.GitLab) error { }) } -func (c *Controller) configureClickHouseClient(cfg config.ClickHouse) error { - return c.ClickHouse.Configure(clickhouse.ClientConfig{ +func (c *Controller) configureClickHouseDataStore(cfg config.ClickHouse) error { + var client clickhouse.Client + + conf := clickhouse.ClientConfig{ Host: cfg.Host, Port: cfg.Port, Database: cfg.Database, User: cfg.User, Password: cfg.Password, - }) + } + if err := client.Configure(conf); err != nil { + return err + } + + c.DataStore = clickhouse.NewClickHouseDataStore(&client) + return nil } func (c *Controller) configureWorkers(cfg config.Config) error { @@ -75,9 +85,9 @@ func (c *Controller) configureWorkers(cfg config.Config) error { for _, prj := range cfg.Projects { if prj.CatchUp.Enabled { - workers = append(workers, worker.NewCatchUpProjectWorker(prj, &c.GitLab, &c.ClickHouse)) + workers = append(workers, worker.NewCatchUpProjectWorker(prj, &c.GitLab, c.DataStore)) } - workers = append(workers, worker.NewExportProjectWorker(prj, &c.GitLab, &c.ClickHouse)) + workers = append(workers, worker.NewExportProjectWorker(prj, &c.GitLab, c.DataStore)) } c.workers = workers @@ -86,12 +96,8 @@ func (c *Controller) configureWorkers(cfg config.Config) error { } func (c *Controller) init(ctx context.Context) error { - if err := c.ClickHouse.CreateDatabase(ctx); err != nil { - return fmt.Errorf("error creating database: %w", err) - } - - if err := c.ClickHouse.CreateTables(ctx); err != nil { - return fmt.Errorf("error creating tables: %w", err) + if err := c.DataStore.Initialize(ctx); err != nil { + return err } return nil @@ -102,7 +108,7 @@ func (c *Controller) CheckReadiness(ctx context.Context) error { return err } - if err := c.ClickHouse.CheckReadiness(ctx); err != nil { + if err := c.DataStore.CheckReadiness(ctx); err != nil { return err } diff --git a/pkg/tasks/export.go b/pkg/tasks/export.go index c51ad61..41175ab 100644 --- a/pkg/tasks/export.go +++ b/pkg/tasks/export.go @@ -4,9 +4,10 @@ import ( "context" "fmt" - "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/clickhouse" gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab" "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/models" + + "github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore" ) type ExportPipelineHierarchyOptions struct { @@ -18,11 +19,11 @@ type ExportPipelineHierarchyOptions struct { ExportTraces bool } -func ExportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ch *clickhouse.Client) error { - return <-exportPipelineHierarchy(ctx, opts, gl, ch) +func ExportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) error { + return <-exportPipelineHierarchy(ctx, opts, gl, ds) } -func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ch *clickhouse.Client) <-chan error { +func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) <-chan error { out := make(chan error) go func() { @@ -39,14 +40,14 @@ func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOp } ph := phr.PipelineHierarchy - if err := clickhouse.InsertPipelineHierarchy(ctx, ph, ch); err != nil { + if err := ds.InsertPipelineHierarchy(ctx, ph); err != nil { out <- fmt.Errorf("error inserting pipeline hierarchy: %w", err) return } if opts.ExportTraces { pts := ph.GetAllTraces() - if err := clickhouse.InsertTraces(ctx, pts, ch); err != nil { + if err := ds.InsertTraces(ctx, pts); err != nil { out <- fmt.Errorf("error inserting traces: %w", err) return } @@ -66,15 +67,15 @@ func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOp tcs = append(tcs, ts.TestCases...) } } - if err = clickhouse.InsertTestReports(ctx, trs, ch); err != nil { + if err = ds.InsertTestReports(ctx, trs); err != nil { out <- fmt.Errorf("error inserting testreports: %w", err) return } - if err = clickhouse.InsertTestSuites(ctx, tss, ch); err != nil { + if err = ds.InsertTestSuites(ctx, tss); err != nil { out <- fmt.Errorf("error inserting testsuites: %w", err) return } - if err = clickhouse.InsertTestCases(ctx, tcs, ch); err != nil { + if err = ds.InsertTestCases(ctx, tcs); err != nil { out <- fmt.Errorf("error inserting testcases: %w", err) return } diff --git a/pkg/worker/catchup.go b/pkg/worker/catchup.go index bd6a4a4..9ff5124 100644 --- a/pkg/worker/catchup.go +++ b/pkg/worker/catchup.go @@ -7,10 +7,11 @@ import ( "sync" "time" - "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/clickhouse" "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/config" gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab" "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/tasks" + + "github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore" ) type catchUpProjectWorker struct { @@ -22,18 +23,18 @@ type catchUpProjectWorker struct { // ensure the worker can only be stopped once stop sync.Once - project config.Project - gitlab *gitlab.Client - clickhouse *clickhouse.Client + project config.Project + gitlab *gitlab.Client + datastore datastore.DataStore } -func NewCatchUpProjectWorker(cfg config.Project, gl *gitlab.Client, ch *clickhouse.Client) Worker { +func NewCatchUpProjectWorker(cfg config.Project, gl *gitlab.Client, ds datastore.DataStore) Worker { return &catchUpProjectWorker{ done: make(chan struct{}), - project: cfg, - gitlab: gl, - clickhouse: ch, + project: cfg, + gitlab: gl, + datastore: ds, } } @@ -91,7 +92,7 @@ func (w *catchUpProjectWorker) produce(ctx context.Context, opt gitlab.ListProje go func() { defer close(ch) - latestUpdates, err := w.clickhouse.QueryProjectPipelinesLatestUpdate(ctx, w.project.Id) + latestUpdates, err := w.datastore.QueryProjectPipelinesLatestUpdate(ctx, w.project.Id) if err != nil { if errors.Is(err, context.Canceled) { return @@ -148,7 +149,7 @@ func (w *catchUpProjectWorker) process(ctx context.Context, pipelineChan <-chan ExportTraces: w.project.Export.Traces.Enabled, } - if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.clickhouse); err != nil { + if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil { if !errors.Is(err, context.Canceled) { log.Printf("error exporting pipeline hierarchy: %s\n", err) } diff --git a/pkg/worker/export.go b/pkg/worker/export.go index 5dc0194..5939a37 100644 --- a/pkg/worker/export.go +++ b/pkg/worker/export.go @@ -6,10 +6,11 @@ import ( "sync" "time" - "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/clickhouse" "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/config" gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab" "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/tasks" + + "github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore" ) type exportProjectWorker struct { @@ -21,18 +22,18 @@ type exportProjectWorker struct { // ensure the worker can only be stopped once stop sync.Once - project config.Project - gitlab *gitlab.Client - clickhouse *clickhouse.Client + project config.Project + gitlab *gitlab.Client + datastore datastore.DataStore } -func NewExportProjectWorker(cfg config.Project, gl *gitlab.Client, ch *clickhouse.Client) Worker { +func NewExportProjectWorker(cfg config.Project, gl *gitlab.Client, ds datastore.DataStore) Worker { return &exportProjectWorker{ done: make(chan struct{}), - project: cfg, - gitlab: gl, - clickhouse: ch, + project: cfg, + gitlab: gl, + datastore: ds, } } @@ -106,7 +107,7 @@ func (w *exportProjectWorker) run(ctx context.Context) { ExportTraces: w.project.Export.Traces.Enabled, } - if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.clickhouse); err != nil { + if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil { log.Printf("error exporting pipeline hierarchy: %s\n", err) } else { log.Printf("Exported projects/%d/pipelines/%d\n", opts.ProjectID, opts.PipelineID)