Skip to content

Commit

Permalink
Replace clickhouse.Client usage with new DataStore interface
Browse files Browse the repository at this point in the history
  • Loading branch information
cluttrdev committed Nov 22, 2023
1 parent 6b635de commit 7488884
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/export_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ func (c *ExportPipelineConfig) Exec(ctx context.Context, args []string) error {
ExportTraces: c.exportTraces,
}

return tasks.ExportPipelineHierarchy(ctx, opts, &ctl.GitLab, &ctl.ClickHouse)
return tasks.ExportPipelineHierarchy(ctx, opts, &ctl.GitLab, ctl.DataStore)
}
38 changes: 22 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/config"
gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab"
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/worker"

"github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore"
)

type Controller struct {
config config.Config
GitLab gitlab.Client
ClickHouse clickhouse.Client
config config.Config
GitLab gitlab.Client
DataStore datastore.DataStore

workers []worker.Worker
}
Expand All @@ -40,7 +42,7 @@ func (c *Controller) configure(cfg config.Config) error {
return err
}

if err := c.configureClickHouseClient(cfg.ClickHouse); err != nil {
if err := c.configureClickHouseDataStore(cfg.ClickHouse); err != nil {
return err
}

Expand All @@ -60,24 +62,32 @@ func (c *Controller) configureGitLabClient(cfg config.GitLab) error {
})
}

func (c *Controller) configureClickHouseClient(cfg config.ClickHouse) error {
return c.ClickHouse.Configure(clickhouse.ClientConfig{
func (c *Controller) configureClickHouseDataStore(cfg config.ClickHouse) error {
var client clickhouse.Client

conf := clickhouse.ClientConfig{
Host: cfg.Host,
Port: cfg.Port,
Database: cfg.Database,
User: cfg.User,
Password: cfg.Password,
})
}
if err := client.Configure(conf); err != nil {
return err
}

c.DataStore = clickhouse.NewClickHouseDataStore(&client)
return nil
}

func (c *Controller) configureWorkers(cfg config.Config) error {
workers := []worker.Worker{}

for _, prj := range cfg.Projects {
if prj.CatchUp.Enabled {
workers = append(workers, worker.NewCatchUpProjectWorker(prj, &c.GitLab, &c.ClickHouse))
workers = append(workers, worker.NewCatchUpProjectWorker(prj, &c.GitLab, c.DataStore))
}
workers = append(workers, worker.NewExportProjectWorker(prj, &c.GitLab, &c.ClickHouse))
workers = append(workers, worker.NewExportProjectWorker(prj, &c.GitLab, c.DataStore))
}

c.workers = workers
Expand All @@ -86,12 +96,8 @@ func (c *Controller) configureWorkers(cfg config.Config) error {
}

func (c *Controller) init(ctx context.Context) error {
if err := c.ClickHouse.CreateDatabase(ctx); err != nil {
return fmt.Errorf("error creating database: %w", err)
}

if err := c.ClickHouse.CreateTables(ctx); err != nil {
return fmt.Errorf("error creating tables: %w", err)
if err := c.DataStore.Initialize(ctx); err != nil {
return err
}

return nil
Expand All @@ -102,7 +108,7 @@ func (c *Controller) CheckReadiness(ctx context.Context) error {
return err
}

if err := c.ClickHouse.CheckReadiness(ctx); err != nil {
if err := c.DataStore.CheckReadiness(ctx); err != nil {
return err
}

Expand Down
19 changes: 10 additions & 9 deletions pkg/tasks/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"fmt"

"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/clickhouse"
gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab"
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/models"

"github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore"
)

type ExportPipelineHierarchyOptions struct {
Expand All @@ -18,11 +19,11 @@ type ExportPipelineHierarchyOptions struct {
ExportTraces bool
}

func ExportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ch *clickhouse.Client) error {
return <-exportPipelineHierarchy(ctx, opts, gl, ch)
func ExportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) error {
return <-exportPipelineHierarchy(ctx, opts, gl, ds)
}

func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ch *clickhouse.Client) <-chan error {
func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) <-chan error {
out := make(chan error)

go func() {
Expand All @@ -39,14 +40,14 @@ func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOp
}
ph := phr.PipelineHierarchy

if err := clickhouse.InsertPipelineHierarchy(ctx, ph, ch); err != nil {
if err := ds.InsertPipelineHierarchy(ctx, ph); err != nil {
out <- fmt.Errorf("error inserting pipeline hierarchy: %w", err)
return
}

if opts.ExportTraces {
pts := ph.GetAllTraces()
if err := clickhouse.InsertTraces(ctx, pts, ch); err != nil {
if err := ds.InsertTraces(ctx, pts); err != nil {
out <- fmt.Errorf("error inserting traces: %w", err)
return
}
Expand All @@ -66,15 +67,15 @@ func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOp
tcs = append(tcs, ts.TestCases...)
}
}
if err = clickhouse.InsertTestReports(ctx, trs, ch); err != nil {
if err = ds.InsertTestReports(ctx, trs); err != nil {
out <- fmt.Errorf("error inserting testreports: %w", err)
return
}
if err = clickhouse.InsertTestSuites(ctx, tss, ch); err != nil {
if err = ds.InsertTestSuites(ctx, tss); err != nil {
out <- fmt.Errorf("error inserting testsuites: %w", err)
return
}
if err = clickhouse.InsertTestCases(ctx, tcs, ch); err != nil {
if err = ds.InsertTestCases(ctx, tcs); err != nil {
out <- fmt.Errorf("error inserting testcases: %w", err)
return
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/worker/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"sync"
"time"

"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/clickhouse"
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/config"
gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab"
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/tasks"

"github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore"
)

type catchUpProjectWorker struct {
Expand All @@ -22,18 +23,18 @@ type catchUpProjectWorker struct {
// ensure the worker can only be stopped once
stop sync.Once

project config.Project
gitlab *gitlab.Client
clickhouse *clickhouse.Client
project config.Project
gitlab *gitlab.Client
datastore datastore.DataStore
}

func NewCatchUpProjectWorker(cfg config.Project, gl *gitlab.Client, ch *clickhouse.Client) Worker {
func NewCatchUpProjectWorker(cfg config.Project, gl *gitlab.Client, ds datastore.DataStore) Worker {
return &catchUpProjectWorker{
done: make(chan struct{}),

project: cfg,
gitlab: gl,
clickhouse: ch,
project: cfg,
gitlab: gl,
datastore: ds,
}
}

Expand Down Expand Up @@ -91,7 +92,7 @@ func (w *catchUpProjectWorker) produce(ctx context.Context, opt gitlab.ListProje
go func() {
defer close(ch)

latestUpdates, err := w.clickhouse.QueryProjectPipelinesLatestUpdate(ctx, w.project.Id)
latestUpdates, err := w.datastore.QueryProjectPipelinesLatestUpdate(ctx, w.project.Id)
if err != nil {
if errors.Is(err, context.Canceled) {
return
Expand Down Expand Up @@ -148,7 +149,7 @@ func (w *catchUpProjectWorker) process(ctx context.Context, pipelineChan <-chan
ExportTraces: w.project.Export.Traces.Enabled,
}

if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.clickhouse); err != nil {
if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil {
if !errors.Is(err, context.Canceled) {
log.Printf("error exporting pipeline hierarchy: %s\n", err)
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"sync"
"time"

"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/clickhouse"
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/config"
gitlab "github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/gitlab"
"github.com/cluttrdev/gitlab-clickhouse-exporter/pkg/tasks"

"github.com/cluttrdev/gitlab-clickhouse-exporter/internal/datastore"
)

type exportProjectWorker struct {
Expand All @@ -21,18 +22,18 @@ type exportProjectWorker struct {
// ensure the worker can only be stopped once
stop sync.Once

project config.Project
gitlab *gitlab.Client
clickhouse *clickhouse.Client
project config.Project
gitlab *gitlab.Client
datastore datastore.DataStore
}

func NewExportProjectWorker(cfg config.Project, gl *gitlab.Client, ch *clickhouse.Client) Worker {
func NewExportProjectWorker(cfg config.Project, gl *gitlab.Client, ds datastore.DataStore) Worker {
return &exportProjectWorker{
done: make(chan struct{}),

project: cfg,
gitlab: gl,
clickhouse: ch,
project: cfg,
gitlab: gl,
datastore: ds,
}
}

Expand Down Expand Up @@ -106,7 +107,7 @@ func (w *exportProjectWorker) run(ctx context.Context) {
ExportTraces: w.project.Export.Traces.Enabled,
}

if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.clickhouse); err != nil {
if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil {
log.Printf("error exporting pipeline hierarchy: %s\n", err)
} else {
log.Printf("Exported projects/%d/pipelines/%d\n", opts.ProjectID, opts.PipelineID)
Expand Down

0 comments on commit 7488884

Please sign in to comment.