From 9a073f429d1b7639cee7bb699e20a80c23545cb8 Mon Sep 17 00:00:00 2001 From: cluttrdev Date: Fri, 19 Jan 2024 23:09:07 +0100 Subject: [PATCH] BREAKING: Remove ClickHouse/Datastore functionality in favor of gRPC exporter --- cmd/deduplicate.go | 107 ------- cmd/export.go | 2 +- cmd/export_pipeline.go | 5 +- cmd/fetch_pipeline.go | 19 +- cmd/root.go | 22 -- configs/gitlab-exporter.yaml | 7 - go.mod | 16 +- go.sum | 71 +---- internal/datastore/datastore.go | 29 -- internal/worker/catchup.go | 164 ---------- internal/worker/export.go | 121 -------- internal/worker/worker.go | 19 +- main.go | 2 - pkg/clickhouse/client.go | 132 -------- pkg/clickhouse/ddl.go | 431 -------------------------- pkg/clickhouse/dml.go | 514 -------------------------------- pkg/clickhouse/store.go | 75 ----- pkg/config/config.go | 17 +- pkg/config/parser.go | 5 +- pkg/controller/controller.go | 54 +--- pkg/controller/tasks.go | 242 +++++++++++++++ pkg/tasks/deduplicate.go | 148 --------- pkg/tasks/export.go | 88 ------ test/config/config_test.go | 10 - test/tasks/deduplicate_test.go | 235 --------------- 25 files changed, 299 insertions(+), 2236 deletions(-) delete mode 100644 cmd/deduplicate.go delete mode 100644 internal/datastore/datastore.go delete mode 100644 internal/worker/catchup.go delete mode 100644 internal/worker/export.go delete mode 100644 pkg/clickhouse/client.go delete mode 100644 pkg/clickhouse/ddl.go delete mode 100644 pkg/clickhouse/dml.go delete mode 100644 pkg/clickhouse/store.go create mode 100644 pkg/controller/tasks.go delete mode 100644 pkg/tasks/deduplicate.go delete mode 100644 pkg/tasks/export.go delete mode 100644 test/tasks/deduplicate_test.go diff --git a/cmd/deduplicate.go b/cmd/deduplicate.go deleted file mode 100644 index 69af28a..0000000 --- a/cmd/deduplicate.go +++ /dev/null @@ -1,107 +0,0 @@ -package cmd - -import ( - "context" - "flag" - "fmt" - "strings" - - "github.com/peterbourgon/ff/v3/ffcli" - - "github.com/cluttrdev/gitlab-exporter/pkg/clickhouse" - "github.com/cluttrdev/gitlab-exporter/pkg/config" - "github.com/cluttrdev/gitlab-exporter/pkg/tasks" -) - -type DeduplicateConfig struct { - rootConfig *RootConfig - - database string - final bool - by columnList - except columnList - throwIfNoop bool - - flags *flag.FlagSet -} - -type columnList []string - -func (f *columnList) String() string { - return fmt.Sprintf("%v", []string(*f)) -} - -func (f *columnList) Set(value string) error { - values := strings.Split(value, ",") - for _, v := range values { - *f = append(*f, v) - } - return nil -} - -func NewDeduplicateCmd(rootConfig *RootConfig) *ffcli.Command { - fs := flag.NewFlagSet(fmt.Sprintf("%s deduplicate", exeName), flag.ContinueOnError) - - cfg := DeduplicateConfig{ - rootConfig: rootConfig, - - flags: fs, - } - - cfg.RegisterFlags(fs) - - return &ffcli.Command{ - Name: "deduplicate", - ShortUsage: fmt.Sprintf("%s deduplicate [flags] table", exeName), - ShortHelp: "Deduplicate database table", - UsageFunc: usageFunc, - FlagSet: fs, - Options: rootCmdOptions, - Exec: cfg.Exec, - } -} - -func (c *DeduplicateConfig) RegisterFlags(fs *flag.FlagSet) { - c.rootConfig.RegisterFlags(fs) - - fs.StringVar(&c.database, "database", "gitlab_ci", "The database name. (default: 'gitlab_ci')") - fs.BoolVar(&c.final, "final", true, "Optimize even if all data is already in one part. (default: true)") - fs.Var(&c.by, "by", "Comma separated list of columns to deduplicate by. (default: [])") - fs.Var(&c.except, "except", "Comma separated list of columns to not deduplicate by. (default: [])") - fs.BoolVar(&c.throwIfNoop, "throw-if-noop", true, "Notify if deduplication is not performed. (default: true)") -} - -func (c *DeduplicateConfig) Exec(ctx context.Context, args []string) error { - if len(args) != 1 { - return fmt.Errorf("invalid number of positional arguments: %v", args) - } - - table := args[0] - - cfg := config.Default() - if err := loadConfig(c.rootConfig.filename, c.flags, &cfg); err != nil { - return fmt.Errorf("error loading configuration: %w", err) - } - - ch, err := clickhouse.NewClickHouseClient(clickhouse.ClientConfig{ - Host: cfg.ClickHouse.Host, - Port: cfg.ClickHouse.Port, - Database: cfg.ClickHouse.Database, - User: cfg.ClickHouse.User, - Password: cfg.ClickHouse.Password, - }) - if err != nil { - return fmt.Errorf("error creating clickhouse client: %w", err) - } - - opt := tasks.DeduplicateTableOptions{ - Database: c.database, - Table: table, - Final: &c.final, - By: c.by, - Except: c.except, - ThrowIfNoop: &c.throwIfNoop, - } - - return tasks.DeduplicateTable(ctx, opt, ch) -} diff --git a/cmd/export.go b/cmd/export.go index 7396db5..51ea540 100644 --- a/cmd/export.go +++ b/cmd/export.go @@ -36,7 +36,7 @@ func NewExportCmd(rootConfig *RootConfig, out io.Writer) *ffcli.Command { return &ffcli.Command{ Name: "export", ShortUsage: fmt.Sprintf("%s export [flags] [...]", exeName), - ShortHelp: "Export data from the GitLab API to ClickHouse", + ShortHelp: "Export data from the GitLab API", UsageFunc: usageFunc, FlagSet: fs, Subcommands: []*ffcli.Command{ diff --git a/cmd/export_pipeline.go b/cmd/export_pipeline.go index 59198f7..be3ed1d 100644 --- a/cmd/export_pipeline.go +++ b/cmd/export_pipeline.go @@ -10,7 +10,6 @@ import ( "github.com/cluttrdev/gitlab-exporter/pkg/config" "github.com/cluttrdev/gitlab-exporter/pkg/controller" - "github.com/cluttrdev/gitlab-exporter/pkg/tasks" ) type ExportPipelineConfig struct { @@ -78,7 +77,7 @@ func (c *ExportPipelineConfig) Exec(ctx context.Context, args []string) error { return fmt.Errorf("error constructing controller: %w", err) } - opts := tasks.ExportPipelineHierarchyOptions{ + opts := controller.ExportPipelineHierarchyOptions{ ProjectID: projectID, PipelineID: pipelineID, @@ -88,5 +87,5 @@ func (c *ExportPipelineConfig) Exec(ctx context.Context, args []string) error { ExportJobMetrics: c.exportSections, // for now, export metrics if we fetch the logs for sections anyway } - return tasks.ExportPipelineHierarchy(ctx, opts, &ctl.GitLab, ctl.DataStore) + return controller.ExportPipelineHierarchy(ctl, ctx, opts) } diff --git a/cmd/fetch_pipeline.go b/cmd/fetch_pipeline.go index 54254e7..f872b12 100644 --- a/cmd/fetch_pipeline.go +++ b/cmd/fetch_pipeline.go @@ -21,6 +21,8 @@ type FetchPipelineConfig struct { fetchHierarchy bool fetchSections bool + outputTrace bool + flags *flag.FlagSet } @@ -51,6 +53,7 @@ func (c *FetchPipelineConfig) RegisterFlags(fs *flag.FlagSet) { fs.BoolVar(&c.fetchHierarchy, "hierarchy", false, "Fetch pipeline hierarchy. (default: false)") fs.BoolVar(&c.fetchSections, "fetch-sections", true, "Fetch job sections. (default: true)") + fs.BoolVar(&c.outputTrace, "trace", false, "Output pipeline trace. (default: false)") } func (c *FetchPipelineConfig) Exec(ctx context.Context, args []string) error { @@ -81,7 +84,7 @@ func (c *FetchPipelineConfig) Exec(ctx context.Context, args []string) error { } var b []byte - if c.fetchHierarchy { + if c.fetchHierarchy || c.outputTrace { opt := &gitlab.GetPipelineHierarchyOptions{ FetchSections: c.fetchSections, } @@ -92,9 +95,17 @@ func (c *FetchPipelineConfig) Exec(ctx context.Context, args []string) error { } ph := phr.PipelineHierarchy - b, err = json.Marshal(ph) - if err != nil { - return fmt.Errorf("error marshalling pipeline hierarchy: %w", err) + if c.outputTrace { + ts := ph.GetAllTraces() + b, err = json.Marshal(ts) + if err != nil { + return fmt.Errorf("error marshalling pipeline traces: %w", err) + } + } else { + b, err = json.Marshal(ph) + if err != nil { + return fmt.Errorf("error marshalling pipeline hierarchy: %w", err) + } } } else { p, err := ctl.GitLab.GetPipeline(ctx, projectID, pipelineID) diff --git a/cmd/root.go b/cmd/root.go index d43c039..b6f5cbe 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -57,12 +57,6 @@ func (c *RootConfig) RegisterFlags(fs *flag.FlagSet) { fs.String("gitlab-api-url", defaults.GitLab.Api.URL, fmt.Sprintf("The GitLab API URL (default: '%s').", defaults.GitLab.Api.URL)) fs.String("gitlab-api-token", defaults.GitLab.Api.Token, fmt.Sprintf("The GitLab API Token (default: '%s').", defaults.GitLab.Api.Token)) - fs.String("clickhouse-host", defaults.ClickHouse.Host, fmt.Sprintf("The ClickHouse server name (default: '%s').", defaults.ClickHouse.Host)) - fs.String("clickhouse-port", defaults.ClickHouse.Port, fmt.Sprintf("The ClickHouse port to connect to (default: '%s')", defaults.ClickHouse.Port)) - fs.String("clickhouse-database", defaults.ClickHouse.Database, fmt.Sprintf("Select the current default ClickHouse database (default: '%s').", defaults.ClickHouse.Database)) - fs.String("clickhouse-user", defaults.ClickHouse.User, fmt.Sprintf("The ClickHouse username to connect with (default: '%s').", defaults.ClickHouse.User)) - fs.String("clickhouse-password", defaults.ClickHouse.Password, fmt.Sprintf("The ClickHouse password (default: '%s').", defaults.ClickHouse.Password)) - fs.StringVar(&c.filename, "config", "", "Configuration file to use.") } @@ -83,16 +77,6 @@ func loadConfig(filename string, flags *flag.FlagSet, cfg *config.Config) error cfg.GitLab.Api.URL = f.Value.String() case "gitlab-api-token": cfg.GitLab.Api.Token = f.Value.String() - case "clickhouse-host": - cfg.ClickHouse.Host = f.Value.String() - case "clickhouse-port": - cfg.ClickHouse.Port = f.Value.String() - case "clickhouse-database": - cfg.ClickHouse.Database = f.Value.String() - case "clickhouse-user": - cfg.ClickHouse.User = f.Value.String() - case "clickhouse-password": - cfg.ClickHouse.Password = f.Value.String() } }) @@ -104,12 +88,6 @@ func writeConfig(cfg config.Config, out io.Writer) { fmt.Fprintf(out, "GitLab URL: %s\n", cfg.GitLab.Api.URL) fmt.Fprintf(out, "GitLab Token: %x\n", sha256String(cfg.GitLab.Api.Token)) fmt.Fprintln(out, "----") - fmt.Fprintf(out, "ClickHouse Host: %s\n", cfg.ClickHouse.Host) - fmt.Fprintf(out, "ClickHouse Port: %s\n", cfg.ClickHouse.Port) - fmt.Fprintf(out, "ClickHouse Database: %s\n", cfg.ClickHouse.Database) - fmt.Fprintf(out, "ClickHouse User: %s\n", cfg.ClickHouse.User) - fmt.Fprintf(out, "ClickHouse Password: %x\n", sha256String(cfg.ClickHouse.Password)) - fmt.Fprintln(out, "----") projects := []int64{} for _, p := range cfg.Projects { diff --git a/configs/gitlab-exporter.yaml b/configs/gitlab-exporter.yaml index 2a940bb..c4dca0d 100644 --- a/configs/gitlab-exporter.yaml +++ b/configs/gitlab-exporter.yaml @@ -10,13 +10,6 @@ gitlab: rate: limit: 0.0 -clickhouse: - host: "127.0.0.1" - port: "9000" - database: "default" - user: "default" - password: "" - # List of gRPC server endpoints to export to endpoints: [] # - address: "127.0.0.1:36275" diff --git a/go.mod b/go.mod index 05745f8..c8ad8b3 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/cluttrdev/gitlab-exporter go 1.20 require ( - github.com/ClickHouse/clickhouse-go/v2 v2.12.0 github.com/creasty/defaults v1.7.0 github.com/google/go-cmp v0.5.9 github.com/peterbourgon/ff/v3 v3.4.0 @@ -18,29 +17,18 @@ require ( ) require ( - github.com/ClickHouse/ch-go v0.52.1 // indirect - github.com/andybalholm/brotli v1.0.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/go-faster/city v1.0.1 // indirect - github.com/go-faster/errors v0.6.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/uuid v1.3.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.4 // indirect - github.com/klauspost/compress v1.15.15 // indirect + github.com/kr/text v0.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/paulmach/orb v0.9.0 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect - github.com/segmentio/asm v1.2.0 // indirect - github.com/shopspring/decimal v1.3.1 // indirect - go.opentelemetry.io/otel v1.16.0 // indirect - go.opentelemetry.io/otel/trace v1.16.0 // indirect + github.com/stretchr/testify v1.8.3 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/sys v0.15.0 // indirect diff --git a/go.sum b/go.sum index 55d001b..0260e97 100644 --- a/go.sum +++ b/go.sum @@ -1,65 +1,36 @@ -github.com/ClickHouse/ch-go v0.52.1 h1:nucdgfD1BDSHjbNaG3VNebonxJzD8fX8jbuBpfo5VY0= -github.com/ClickHouse/ch-go v0.52.1/go.mod h1:B9htMJ0hii/zrC2hljUKdnagRBuLqtRG/GrU3jqCwRk= -github.com/ClickHouse/clickhouse-go/v2 v2.12.0 h1:k0Q0qiuwGeGZC7/6Ff9J3C9Od+rzy9FXgGOcAfIxrF0= -github.com/ClickHouse/clickhouse-go/v2 v2.12.0/go.mod h1:W/UQ/GchOF+Q0k5iv6ZanLKQNukA4Oiyt4sMFDsv8QY= -github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= -github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creasty/defaults v1.7.0 h1:eNdqZvc5B509z18lD8yc212CAqJNvfT1Jq6L8WowdBA= github.com/creasty/defaults v1.7.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= -github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= -github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= -github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= -github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= -github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= -github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= -github.com/paulmach/orb v0.9.0 h1:MwA1DqOKtvCgm7u9RZ/pnYejTeDJPnr0+0oFajBbJqk= -github.com/paulmach/orb v0.9.0/go.mod h1:SudmOk85SXtmXAB3sLGyJ6tZy/8pdfrV0o6ef98Xc30= -github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkMUBc= github.com/peterbourgon/ff/v3 v3.4.0/go.mod h1:zjJVUhx+twciwfDl0zBcFzl4dW8axCRyXE/eKY9RztQ= -github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= -github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= @@ -71,47 +42,21 @@ github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= -github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= -github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= -github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xanzy/go-gitlab v0.88.0 h1:9GHBrxyCUNZZNuAsbJ1NbEH6XAYsKyTn6NfE0wYO5SY= github.com/xanzy/go-gitlab v0.88.0/go.mod h1:5ryv+MnpZStBH8I/77HuQBsMbBGANtVpLWC15qOjWAw= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= -github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= -go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= -go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= -go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= -go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= -golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= @@ -119,15 +64,9 @@ golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -137,7 +76,6 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -146,13 +84,9 @@ golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ= @@ -161,12 +95,9 @@ google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/datastore/datastore.go b/internal/datastore/datastore.go deleted file mode 100644 index 5352117..0000000 --- a/internal/datastore/datastore.go +++ /dev/null @@ -1,29 +0,0 @@ -package datastore - -import ( - "context" - "time" - - "github.com/cluttrdev/gitlab-exporter/pkg/models" -) - -type DataStore interface { - Initialize(context.Context) error - CheckReadiness(context.Context) error - - InsertPipelines(context.Context, []*models.Pipeline) error - InsertJobs(context.Context, []*models.Job) error - InsertSections(context.Context, []*models.Section) error - InsertBridges(context.Context, []*models.Bridge) error - - InsertPipelineHierarchy(context.Context, *models.PipelineHierarchy) error - - InsertTestReports(context.Context, []*models.PipelineTestReport) error - InsertTestSuites(context.Context, []*models.PipelineTestSuite) error - InsertTestCases(context.Context, []*models.PipelineTestCase) error - - InsertTraces(context.Context, []models.Trace) error - InsertJobMetrics(context.Context, []*models.JobMetric) error - - QueryProjectPipelinesLatestUpdate(context.Context, int64) (map[int64]time.Time, error) -} diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go deleted file mode 100644 index b4dc9b8..0000000 --- a/internal/worker/catchup.go +++ /dev/null @@ -1,164 +0,0 @@ -package worker - -import ( - "context" - "errors" - "log" - "sync" - "time" - - "github.com/cluttrdev/gitlab-exporter/pkg/config" - "github.com/cluttrdev/gitlab-exporter/pkg/gitlab" - "github.com/cluttrdev/gitlab-exporter/pkg/tasks" - - "github.com/cluttrdev/gitlab-exporter/internal/datastore" -) - -type catchUpProjectWorker struct { - cancel context.CancelFunc - done chan struct{} - - // ensure the worker can only be started once - start sync.Once - // ensure the worker can only be stopped once - stop sync.Once - - project config.Project - gitlab *gitlab.Client - datastore datastore.DataStore -} - -func NewCatchUpProjectWorker(cfg config.Project, gl *gitlab.Client, ds datastore.DataStore) Worker { - return &catchUpProjectWorker{ - done: make(chan struct{}), - - project: cfg, - gitlab: gl, - datastore: ds, - } -} - -func (w *catchUpProjectWorker) Start(ctx context.Context) { - ctx, w.cancel = context.WithCancel(ctx) - go func() { - w.start.Do(func() { - w.run(ctx) - }) - }() -} - -func (w *catchUpProjectWorker) Stop() { - w.stop.Do(func() { - w.cancel() - close(w.done) - }) -} - -func (w *catchUpProjectWorker) Done() <-chan struct{} { - return w.done -} - -func (w *catchUpProjectWorker) run(ctx context.Context) { - opt := gitlab.ListProjectPipelineOptions{ - PerPage: 100, - Page: 1, - - Scope: &[]string{"finished"}[0], - } - if w.project.CatchUp.UpdatedAfter != "" { - after, err := time.Parse("2006-01-02T15:04:05Z", w.project.CatchUp.UpdatedAfter) - if err != nil { - log.Println(err) - } else { - opt.UpdatedAfter = &after - } - } - if w.project.CatchUp.UpdatedBefore != "" { - before, err := time.Parse("2006-01-02T15:04:05Z", w.project.CatchUp.UpdatedBefore) - if err != nil { - log.Println(err) - } else { - opt.UpdatedBefore = &before - } - } - - ch := w.produce(ctx, opt) - w.process(ctx, ch) -} - -func (w *catchUpProjectWorker) produce(ctx context.Context, opt gitlab.ListProjectPipelineOptions) <-chan int64 { - ch := make(chan int64) - - go func() { - defer close(ch) - - latestUpdates, err := w.datastore.QueryProjectPipelinesLatestUpdate(ctx, w.project.Id) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - log.Println(err) - } - - resChan := w.gitlab.ListProjectPipelines(ctx, w.project.Id, opt) - for { - select { - case <-ctx.Done(): - return - case r, ok := <-resChan: - if !ok { // channel closed - return - } - - if r.Error != nil && !errors.Is(r.Error, context.Canceled) { - log.Println(r.Error) - continue - } - - if !w.project.CatchUp.Forced { - // if not forced, skip pipelines that have not been updated - lastUpdatedAt, ok := latestUpdates[r.Pipeline.ID] - if ok && r.Pipeline.UpdatedAt.Compare(lastUpdatedAt) <= 0 { - continue - } - } - - ch <- r.Pipeline.ID - } - } - }() - - return ch -} - -func (w *catchUpProjectWorker) process(ctx context.Context, pipelineChan <-chan int64) { - numWorkers := 10 - var wg sync.WaitGroup - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - for pipelineID := range pipelineChan { - opts := tasks.ExportPipelineHierarchyOptions{ - ProjectID: w.project.Id, - PipelineID: pipelineID, - - ExportSections: w.project.Export.Sections.Enabled, - ExportTestReports: w.project.Export.TestReports.Enabled, - ExportTraces: w.project.Export.Traces.Enabled, - ExportJobMetrics: w.project.Export.Sections.Enabled, // for now, export metrics if we fetch the logs for sections anyway - } - - if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil { - if !errors.Is(err, context.Canceled) { - log.Printf("error exporting pipeline hierarchy: %s\n", err) - } - } else { - log.Printf("Caught up on projects/%d/pipelines/%d\n", opts.ProjectID, opts.PipelineID) - } - } - }() - } - wg.Wait() -} diff --git a/internal/worker/export.go b/internal/worker/export.go deleted file mode 100644 index 827eac0..0000000 --- a/internal/worker/export.go +++ /dev/null @@ -1,121 +0,0 @@ -package worker - -import ( - "context" - "log" - "sync" - "time" - - "github.com/cluttrdev/gitlab-exporter/pkg/config" - "github.com/cluttrdev/gitlab-exporter/pkg/gitlab" - "github.com/cluttrdev/gitlab-exporter/pkg/tasks" - - "github.com/cluttrdev/gitlab-exporter/internal/datastore" -) - -type exportProjectWorker struct { - cancel context.CancelFunc - done chan struct{} - - // ensure the worker can only be started once - start sync.Once - // ensure the worker can only be stopped once - stop sync.Once - - project config.Project - gitlab *gitlab.Client - datastore datastore.DataStore -} - -func NewExportProjectWorker(cfg config.Project, gl *gitlab.Client, ds datastore.DataStore) Worker { - return &exportProjectWorker{ - done: make(chan struct{}), - - project: cfg, - gitlab: gl, - datastore: ds, - } -} - -func (w *exportProjectWorker) Start(ctx context.Context) { - ctx, w.cancel = context.WithCancel(ctx) - go func() { - w.start.Do(func() { - w.run(ctx) - }) - }() -} - -func (w *exportProjectWorker) Stop() { - w.stop.Do(func() { - w.cancel() - close(w.done) - }) -} - -func (w *exportProjectWorker) Done() <-chan struct{} { - return w.done -} - -func (w *exportProjectWorker) run(ctx context.Context) { - interval := 60 * time.Second - - opt := gitlab.ListProjectPipelineOptions{ - PerPage: 100, - Page: 1, - - Scope: &[]string{"finished"}[0], - } - - before := time.Now().UTC().Add(-interval) - opt.UpdatedBefore = &before - - var first bool = true - ticker := time.NewTicker(1 * time.Millisecond) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if first { - ticker.Stop() - ticker = time.NewTicker(interval) - first = false - } - - now := time.Now().UTC() - opt.UpdatedAfter = opt.UpdatedBefore - opt.UpdatedBefore = &now - - var wg sync.WaitGroup - for r := range w.gitlab.ListProjectPipelines(ctx, w.project.Id, opt) { - if r.Error != nil { - log.Println(r.Error) - continue - } - - wg.Add(1) - go func(pid int64) { - defer wg.Done() - - opts := tasks.ExportPipelineHierarchyOptions{ - ProjectID: w.project.Id, - PipelineID: pid, - - ExportSections: w.project.Export.Sections.Enabled, - ExportTestReports: w.project.Export.TestReports.Enabled, - ExportTraces: w.project.Export.Traces.Enabled, - ExportJobMetrics: w.project.Export.Sections.Enabled, // for now, export metrics if we fetch the logs for sections anyway - } - - if err := tasks.ExportPipelineHierarchy(ctx, opts, w.gitlab, w.datastore); err != nil { - log.Printf("error exporting pipeline hierarchy: %s\n", err) - } else { - log.Printf("Exported projects/%d/pipelines/%d\n", opts.ProjectID, opts.PipelineID) - } - }(r.Pipeline.ID) - } - wg.Wait() - } - } -} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 8c68862..1bc02b1 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -2,6 +2,7 @@ package worker import ( "context" + "sync" ) type Worker interface { @@ -14,17 +15,29 @@ type worker struct { cancel context.CancelFunc done chan struct{} + // ensure the worker can only be started once + start sync.Once + // ensure the worker can only be stopped once + stop sync.Once + run func(context.Context) } func (w *worker) Start(ctx context.Context) { ctx, w.cancel = context.WithCancel(ctx) - go w.run(ctx) + go func() { + w.start.Do(func() { + defer w.cancel() + w.run(ctx) + }) + }() } func (w *worker) Stop() { - w.cancel() - close(w.done) + w.stop.Do(func() { + w.cancel() + close(w.done) + }) } func (w *worker) Done() <-chan struct{} { diff --git a/main.go b/main.go index 4b3eb4c..23f784f 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,6 @@ func main() { runCmd = cmd.NewRunCmd(rootConfig, out) fetchCmd = cmd.NewFetchCmd(rootConfig, out) exportCmd = cmd.NewExportCmd(rootConfig, out) - deduplicateCmd = cmd.NewDeduplicateCmd(rootConfig) ) rootCmd.Subcommands = []*ffcli.Command{ @@ -32,7 +31,6 @@ func main() { runCmd, fetchCmd, exportCmd, - deduplicateCmd, } if len(os.Args[1:]) == 0 { diff --git a/pkg/clickhouse/client.go b/pkg/clickhouse/client.go deleted file mode 100644 index cf71675..0000000 --- a/pkg/clickhouse/client.go +++ /dev/null @@ -1,132 +0,0 @@ -package clickhouse - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" -) - -type Client struct { - sync.RWMutex - conn driver.Conn - - dbName string -} - -type ClientConfig struct { - Host string - Port string - Database string - User string - Password string -} - -func NewClickHouseClient(cfg ClientConfig) (*Client, error) { - var client Client - - if err := client.Configure(cfg); err != nil { - return nil, err - } - - return &client, nil -} - -func (c *Client) Configure(cfg ClientConfig) error { - addr := fmt.Sprintf("%s:%s", cfg.Host, cfg.Port) - - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{addr}, - Auth: clickhouse.Auth{ - Database: cfg.Database, - Username: cfg.User, - Password: cfg.Password, - }, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "gitlab-exporter", Version: "v0.0.0+unknown"}, - }, - }, - }) - if err != nil { - return err - } - - c.Lock() - c.conn = conn - c.dbName = cfg.Database - c.Unlock() - return nil -} - -func (c *Client) CheckReadiness(ctx context.Context) error { - if err := c.conn.Ping(ctx); err != nil { - if exception, ok := err.(*clickhouse.Exception); ok { - return fmt.Errorf("clickhouse exception: [%d] %s", exception.Code, exception.Message) - } else { - return fmt.Errorf("error pinging clickhouse: %w", err) - } - } - return nil -} - -func WithParameters(ctx context.Context, params map[string]string) context.Context { - return clickhouse.Context(ctx, clickhouse.WithParameters(params)) -} - -func (c *Client) Exec(ctx context.Context, query string, args ...any) error { - c.RLock() - defer c.RUnlock() - return c.conn.Exec(ctx, query, args...) -} - -func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error { - c.RLock() - defer c.RUnlock() - return c.conn.Select(ctx, dest, query, args...) -} - -func (c *Client) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) { - c.RLock() - defer c.RUnlock() - return c.conn.PrepareBatch(ctx, query) -} - -func (c *Client) CreateTables(ctx context.Context) error { - return createTables(ctx, c.dbName, c) -} - -func (c *Client) QueryProjectPipelinesLatestUpdate(ctx context.Context, projectID int64) (map[int64]time.Time, error) { - const ( - msPerSecond float64 = 1000 - ) - - var results []struct { - PipelineID int64 `ch:"id"` - LatestUpdate float64 `ch:"latest_update"` - } - - query := fmt.Sprintf(` - SELECT id, max(updated_at) AS latest_update - FROM %s.pipelines - WHERE project_id = %d - GROUP BY id - `, c.dbName, projectID) - - if err := c.Select(ctx, &results, query); err != nil { - return nil, err - } - - m := map[int64]time.Time{} - for _, r := range results { - m[r.PipelineID] = time.UnixMilli(int64(r.LatestUpdate * msPerSecond)).UTC() - } - - return m, nil -} diff --git a/pkg/clickhouse/ddl.go b/pkg/clickhouse/ddl.go deleted file mode 100644 index aeb35e4..0000000 --- a/pkg/clickhouse/ddl.go +++ /dev/null @@ -1,431 +0,0 @@ -package clickhouse - -import ( - "context" - "fmt" -) - -const ( - createPipelinesTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - id Int64, - iid Int64, - project_id Int64, - status String, - source String, - ref String, - sha String, - before_sha String, - tag Bool, - yaml_errors String, - created_at Float64, - updated_at Float64, - started_at Float64, - finished_at Float64, - committed_at Float64, - duration Float64, - queued_duration Float64, - coverage Float64, - web_url String -) -ENGINE ReplacingMergeTree(updated_at) -ORDER BY id -; - ` - - createJobsTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - coverage Float64, - allow_failure Bool, - created_at Float64, - started_at Float64, - finished_at Float64, - erased_at Float64, - duration Float64, - queued_duration Float64, - tag_list Array(String), - id Int64, - name String, - pipeline Tuple( - id Int64, - project_id Int64, - ref String, - sha String, - status String - ), - ref String, - stage String, - status String, - failure_reason String, - tag Bool, - web_url String -) -ENGINE ReplacingMergeTree() -ORDER BY id -; - ` - - createBridgesTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - coverage Float64, - allow_failure Bool, - created_at Float64, - started_at Float64, - finished_at Float64, - erased_at Float64, - duration Float64, - queued_duration Float64, - id Int64, - name String, - pipeline Tuple( - id Int64, - iid Int64, - project_id Int64, - status String, - source String, - ref String, - sha String, - web_url String, - created_at Float64, - updated_at Float64 - ), - ref String, - stage String, - status String, - failure_reason String, - tag Bool, - web_url String, - downstream_pipeline Tuple( - id Int64, - iid Int64, - project_id Int64, - status String, - source String, - ref String, - sha String, - web_url String, - created_at Float64, - updated_at Float64 - ) -) -ENGINE ReplacingMergeTree() -ORDER BY id -; - ` - - createSectionsTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - id Int64, - name String, - job Tuple( - id Int64, - name String, - status String - ), - pipeline Tuple( - id Int64, - project_id Int64, - ref String, - sha String, - status String - ), - started_at Float64, - finished_at Float64, - duration Float64 -) -ENGINE ReplacingMergeTree() -ORDER BY id -; - ` - - createTestReportsTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - id Int64, - pipeline_id Int64, - total_time Float64, - total_count Int64, - success_count Int64, - failed_count Int64, - skipped_count Int64, - error_count Int64, - test_suites Nested( - id Int64, - name String, - total_time Float64, - total_count Int64 - ) -) -ENGINE ReplacingMergeTree() -ORDER BY id -; - ` - - createTestSuitesTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - id Int64, - testreport Tuple( - id Int64, - pipeline_id Int64 - ), - name String, - total_time Float64, - total_count Int64, - success_count Int64, - failed_count Int64, - skipped_count Int64, - error_count Int64, - test_cases Nested( - id Int64, - status String, - name String - ) -) -ENGINE ReplacingMergeTree() -ORDER BY id -; - ` - - createTestCasesTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - id Int64, - testsuite Tuple( - id Int64 - ), - testreport Tuple( - id Int64, - pipeline_id Int64 - ), - status String, - name String, - classname String, - file String, - execution_time Float64, - system_output String, - stack_trace String, - attachment_url String, - recent_failures Tuple( - count Int64, - base_branch String - ) -) -ENGINE ReplacingMergeTree() -ORDER BY id -; - ` - - createJobMetricsTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - name String, - labels Map(string, string), - value Float64, - timestamp Int64, - job Tuple( - id Int64, - name String - ) -) -ENGINE MergeTree() -ORDER BY (job.id, name, timestamp) -; - ` -) - -const ( - // OpenTelemetry Traces - // schemas taken from https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/clickhouseexporter/exporter_traces.go - - createTracesTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s ( - Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), - TraceId String CODEC(ZSTD(1)), - SpanId String CODEC(ZSTD(1)), - ParentSpanId String CODEC(ZSTD(1)), - TraceState String CODEC(ZSTD(1)), - SpanName LowCardinality(String) CODEC(ZSTD(1)), - SpanKind LowCardinality(String) CODEC(ZSTD(1)), - ServiceName LowCardinality(String) CODEC(ZSTD(1)), - ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), - ScopeName String CODEC(ZSTD(1)), - ScopeVersion String CODEC(ZSTD(1)), - SpanAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), - Duration Int64 CODEC(ZSTD(1)), - StatusCode LowCardinality(String) CODEC(ZSTD(1)), - StatusMessage String CODEC(ZSTD(1)), - Events Nested ( - Timestamp DateTime64(9), - Name LowCardinality(String), - Attributes Map(LowCardinality(String), String) - ) CODEC(ZSTD(1)), - Links Nested ( - TraceId String, - SpanId String, - TraceState String, - Attributes Map(LowCardinality(String), String) - ) CODEC(ZSTD(1)), - INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, - INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_duration Duration TYPE minmax GRANULARITY 1 -) ENGINE MergeTree() -PARTITION BY toDate(Timestamp) -ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId) -SETTINGS index_granularity=8192, ttl_only_drop_parts = 1 -; - ` - - createTraceIdTsTableSQL = ` -CREATE TABLE IF NOT EXISTS %s.%s_trace_id_ts ( - TraceId String CODEC(ZSTD(1)), - Start DateTime64(9) CODEC(Delta, ZSTD(1)), - End DateTime64(9) CODEC(Delta, ZSTD(1)), - INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1 -) ENGINE MergeTree() -ORDER BY (TraceId, toUnixTimestamp(Start)) -SETTINGS index_granularity=8192 -; - ` - - createTraceIdTsMaterializedViewSQL = ` -CREATE MATERIALIZED VIEW IF NOT EXISTS %s.%s_trace_id_ts_mv -TO %s.%s_trace_id_ts -AS SELECT - TraceId, - min(Timestamp) as Start, - max(Timestamp) as End -FROM %s.%s -WHERE TraceId != '' -GROUP BY TraceId -; - ` - - createTraceViewSQL = ` -CREATE VIEW IF NOT EXISTS %s.%s AS -SELECT - TraceId AS traceID, - SpanId AS spanID, - SpanName AS operationName, - ParentSpanId AS parentSpanID, - ServiceName AS serviceName, - Duration / 1000000 AS duration, - Timestamp AS startTime, - arrayMap(key -> map('key', key, 'value', SpanAttributes[key]), mapKeys(SpanAttributes)) AS tags, - arrayMap(key -> map('key', key, 'value', ResourceAttributes[key]), mapKeys(ResourceAttributes)) AS serviceTags -FROM %s.%s -WHERE TraceId = {trace_id:String} -; - ` -) - -func createTables(ctx context.Context, db string, client *Client) error { - if err := client.Exec(ctx, renderCreatePipelinesTableSQL(db)); err != nil { - return fmt.Errorf("exec create pipelines table: %w", err) - } - if err := client.Exec(ctx, renderCreateJobsTableSQL(db)); err != nil { - return fmt.Errorf("exec create jobs table: %w", err) - } - if err := client.Exec(ctx, renderCreateSectionsTableSQL(db)); err != nil { - return fmt.Errorf("exec create sections table: %w", err) - } - if err := client.Exec(ctx, renderCreateBridgesTableSQL(db)); err != nil { - return fmt.Errorf("exec create bridges table: %w", err) - } - if err := client.Exec(ctx, renderCreateTestReportsTableSQL(db)); err != nil { - return fmt.Errorf("exec create testreports table: %w", err) - } - if err := client.Exec(ctx, renderCreateTestSuitesTableSQL(db)); err != nil { - return fmt.Errorf("exec create testsuites table: %w", err) - } - if err := client.Exec(ctx, renderCreateTestCasesTableSQL(db)); err != nil { - return fmt.Errorf("exec create testcases table: %w", err) - } - if err := client.Exec(ctx, renderCreateJobMetricsTableSQL(db)); err != nil { - return fmt.Errorf("exec create job metrics table: %w", err) - } - - if err := client.Exec(ctx, renderCreateTracesTableSQL(db)); err != nil { - return fmt.Errorf("exec create traces table: %w", err) - } - if err := client.Exec(ctx, renderCreateTraceIdTsTableSQL(db)); err != nil { - return fmt.Errorf("exec create traceIdTs table: %w", err) - } - if err := client.Exec(ctx, renderCreateTraceIdTsMaterializedViewSQL(db)); err != nil { - return fmt.Errorf("exec create traceIdTs view: %w", err) - } - if err := client.Exec(ctx, renderTraceViewSQL(db)); err != nil { - return fmt.Errorf("exec create trace view: %w", err) - } - - return nil -} - -func renderCreatePipelinesTableSQL(db string) string { - const tableName string = "pipelines" - return fmt.Sprintf(createPipelinesTableSQL, db, tableName) -} - -func renderCreateJobsTableSQL(db string) string { - const tableName string = "jobs" - return fmt.Sprintf(createJobsTableSQL, db, tableName) -} - -func renderCreateBridgesTableSQL(db string) string { - const tableName string = "bridges" - return fmt.Sprintf(createBridgesTableSQL, db, tableName) -} - -func renderCreateSectionsTableSQL(db string) string { - const tableName string = "sections" - return fmt.Sprintf(createSectionsTableSQL, db, tableName) -} - -func renderCreateTestReportsTableSQL(db string) string { - const tableName string = "testreports" - return fmt.Sprintf(createTestReportsTableSQL, db, tableName) -} - -func renderCreateTestSuitesTableSQL(db string) string { - const tableName string = "testsuites" - return fmt.Sprintf(createTestSuitesTableSQL, db, tableName) -} - -func renderCreateTestCasesTableSQL(db string) string { - const tableName string = "testcases" - return fmt.Sprintf(createTestCasesTableSQL, db, tableName) -} - -func renderCreateJobMetricsTableSQL(db string) string { - const tableName string = "job_metrics" - return fmt.Sprintf(createJobMetricsTableSQL, db, tableName) -} - -func renderCreateTracesTableSQL(db string) string { - const tableName string = "traces" - return fmt.Sprintf(createTracesTableSQL, db, tableName) -} - -func renderCreateTraceIdTsTableSQL(db string) string { - const tracesTableName string = "traces" - return fmt.Sprintf(createTraceIdTsTableSQL, db, tracesTableName) -} - -func renderCreateTraceIdTsMaterializedViewSQL(db string) string { - const tracesTableName string = "traces" - return fmt.Sprintf( - createTraceIdTsMaterializedViewSQL, - db, tracesTableName, - db, tracesTableName, - db, tracesTableName, - ) -} - -func renderTraceViewSQL(db string) string { - const viewName string = "trace_view" - const tracesTableName string = "traces" - return fmt.Sprintf( - createTraceViewSQL, - db, viewName, - db, tracesTableName, - ) -} diff --git a/pkg/clickhouse/dml.go b/pkg/clickhouse/dml.go deleted file mode 100644 index a42efd0..0000000 --- a/pkg/clickhouse/dml.go +++ /dev/null @@ -1,514 +0,0 @@ -package clickhouse - -import ( - "context" - "fmt" - "time" - - "github.com/cluttrdev/gitlab-exporter/pkg/models" -) - -func timestamp(t *time.Time) float64 { - const msPerS float64 = 1000.0 - if t == nil { - return 0.0 - } - return float64(t.UnixMilli()) / msPerS -} - -func InsertPipelines(ctx context.Context, pipelines []*models.Pipeline, c *Client) error { - const query string = `INSERT INTO {db: Identifier}.pipelines` - var params = map[string]string{ - "db": c.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := c.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertPipelines] %w", err) - } - - for _, p := range pipelines { - err = batch.Append( - p.ID, - p.IID, - p.ProjectID, - p.Status, - p.Source, - p.Ref, - p.SHA, - p.BeforeSHA, - p.Tag, - p.YamlErrors, - timestamp(p.CreatedAt), - timestamp(p.UpdatedAt), - timestamp(p.StartedAt), - timestamp(p.FinishedAt), - timestamp(p.CommittedAt), - p.Duration, - p.QueuedDuration, - p.Coverage, - p.WebURL, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertPipelines] %w", err) - } - } - - return batch.Send() -} - -func InsertJobs(ctx context.Context, jobs []*models.Job, c *Client) error { - const query string = `INSERT INTO {db: Identifier}.jobs` - var params = map[string]string{ - "db": c.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := c.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertJobs] %w", err) - } - - for _, j := range jobs { - err = batch.Append( - j.Coverage, - j.AllowFailure, - timestamp(j.CreatedAt), - timestamp(j.StartedAt), - timestamp(j.FinishedAt), - timestamp(j.ErasedAt), - j.Duration, - j.QueuedDuration, - j.TagList, - j.ID, - j.Name, - map[string]interface{}{ - "id": j.Pipeline.ID, - "project_id": j.Pipeline.ProjectID, - "ref": j.Pipeline.Ref, - "sha": j.Pipeline.Sha, - "status": j.Pipeline.Status, - }, - j.Ref, - j.Stage, - j.Status, - j.FailureReason, - j.Tag, - j.WebURL, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertJobs] %w", err) - } - } - - return batch.Send() -} - -func InsertBridges(ctx context.Context, bridges []*models.Bridge, c *Client) error { - const query string = `INSERT INTO {db: Identifier}.bridges` - var params = map[string]string{ - "db": c.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := c.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertBridges] %w", err) - } - - for _, b := range bridges { - err = batch.Append( - b.Coverage, - b.AllowFailure, - timestamp(b.CreatedAt), - timestamp(b.StartedAt), - timestamp(b.FinishedAt), - timestamp(b.ErasedAt), - b.Duration, - b.QueuedDuration, - b.ID, - b.Name, - map[string]interface{}{ - "id": b.Pipeline.ID, - "iid": b.Pipeline.IID, - "project_id": b.Pipeline.ProjectID, - "status": b.Pipeline.Status, - "source": b.Pipeline.Source, - "ref": b.Pipeline.Source, - "sha": b.Pipeline.SHA, - "web_url": b.Pipeline.WebURL, - "created_at": timestamp(b.Pipeline.CreatedAt), - "updated_at": timestamp(b.Pipeline.UpdatedAt), - }, - b.Ref, - b.Stage, - b.Status, - b.FailureReason, - b.Tag, - b.WebURL, - map[string]interface{}{ - "id": b.DownstreamPipeline.ID, - "iid": b.DownstreamPipeline.IID, - "project_id": b.DownstreamPipeline.ProjectID, - "status": b.DownstreamPipeline.Status, - "source": b.DownstreamPipeline.Source, - "ref": b.DownstreamPipeline.Source, - "sha": b.DownstreamPipeline.SHA, - "web_url": b.DownstreamPipeline.WebURL, - "created_at": timestamp(b.DownstreamPipeline.CreatedAt), - "updated_at": timestamp(b.DownstreamPipeline.UpdatedAt), - }, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertBridges] %w", err) - } - } - - return batch.Send() -} - -func InsertSections(ctx context.Context, sections []*models.Section, client *Client) error { - const query string = `INSERT INTO {db: Identifier}.sections` - var params = map[string]string{ - "db": client.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := client.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertSections] %w", err) - } - - for _, s := range sections { - err = batch.Append( - s.ID, - s.Name, - map[string]interface{}{ - "id": s.Job.ID, - "name": s.Job.Name, - "status": s.Job.Status, - }, - map[string]interface{}{ - "id": s.Pipeline.ID, - "project_id": s.Pipeline.ProjectID, - "ref": s.Pipeline.Ref, - "sha": s.Pipeline.Sha, - "status": s.Pipeline.Status, - }, - timestamp(s.StartedAt), - timestamp(s.FinishedAt), - s.Duration, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertSections] %w", err) - } - } - - return batch.Send() -} - -func InsertPipelineHierarchy(ctx context.Context, hierarchy *models.PipelineHierarchy, client *Client) error { - if err := InsertPipelines(ctx, hierarchy.GetAllPipelines(), client); err != nil { - return fmt.Errorf("[InsertPipelineHierarchy] %w", err) - } - - if err := InsertJobs(ctx, hierarchy.GetAllJobs(), client); err != nil { - return fmt.Errorf("[InsertPipelineHierarchy] %w", err) - } - - if err := InsertSections(ctx, hierarchy.GetAllSections(), client); err != nil { - return fmt.Errorf("[InsertPipelineHierarchy] %w", err) - } - - if err := InsertBridges(ctx, hierarchy.GetAllBridges(), client); err != nil { - return fmt.Errorf("[InsertPipelineHierarchy] %w", err) - } - - return nil -} - -func InsertTestReports(ctx context.Context, reports []*models.PipelineTestReport, client *Client) error { - const query string = `INSERT INTO {db: Identifier}.testreports` - var params = map[string]string{ - "db": client.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := client.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTestReports] %w", err) - } - - for _, tr := range reports { - ids, names, times, counts := convertTestSuitesSummary(tr.TestSuites) - - err = batch.Append( - tr.ID, - tr.PipelineID, - tr.TotalTime, - tr.TotalCount, - tr.SuccessCount, - tr.FailedCount, - tr.SkippedCount, - tr.ErrorCount, - ids, - names, - times, - counts, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTestReports] %w", err) - } - } - - return batch.Send() -} - -func convertTestSuitesSummary(suites []*models.PipelineTestSuite) (ids []int64, names []string, times []float64, counts []int64) { - for _, ts := range suites { - ids = append(ids, ts.ID) - names = append(names, ts.Name) - times = append(times, ts.TotalTime) - counts = append(counts, ts.TotalCount) - } - - return -} - -func InsertTestSuites(ctx context.Context, suites []*models.PipelineTestSuite, client *Client) error { - const query string = `INSERT INTO {db: Identifier}.testsuites` - var params = map[string]string{ - "db": client.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := client.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTestReports] %w", err) - } - - for _, ts := range suites { - ids, statuses, names := convertTestCasesSummary(ts.TestCases) - - err = batch.Append( - ts.ID, - map[string]interface{}{ - "id": ts.TestReport.ID, - "pipeline_id": ts.TestReport.PipelineID, - }, - ts.Name, - ts.TotalTime, - ts.TotalCount, - ts.SuccessCount, - ts.FailedCount, - ts.SkippedCount, - ts.ErrorCount, - ids, - statuses, - names, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTestSuites] %w", err) - } - } - - return batch.Send() -} - -func convertTestCasesSummary(cases []*models.PipelineTestCase) (ids []int64, statuses []string, names []string) { - for _, tc := range cases { - ids = append(ids, tc.ID) - statuses = append(statuses, tc.Status) - names = append(names, tc.Name) - } - - return -} - -func InsertTestCases(ctx context.Context, cases []*models.PipelineTestCase, client *Client) error { - const query string = `INSERT INTO {db: Identifier}.testcases` - var params = map[string]string{ - "db": client.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := client.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTestReports] %w", err) - } - - for _, tc := range cases { - err = batch.Append( - tc.ID, - map[string]interface{}{ - "id": tc.TestSuite.ID, - }, - map[string]interface{}{ - "id": tc.TestReport.ID, - "pipeline_id": tc.TestReport.PipelineID, - }, - tc.Status, - tc.Name, - tc.Classname, - tc.File, - tc.ExecutionTime, - tc.SystemOutput, - tc.StackTrace, - tc.AttachmentURL, - map[string]interface{}{ - "count": tc.RecentFailures.Count, - "base_branch": tc.RecentFailures.BaseBranch, - }, - ) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTestCases] %w", err) - } - } - - return batch.Send() -} - -func InsertJobMetrics(ctx context.Context, metrics []*models.JobMetric, client *Client) error { - const query string = `INSERT INTO {db: Identifier}.{table: Identifier}` - var params = map[string]string{ - "db": client.dbName, - "table": "job_metrics", - } - - ctx = WithParameters(ctx, params) - - batch, err := client.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("prepare batch: %w", err) - } - - for _, m := range metrics { - err = batch.Append( - m.Name, - m.Labels, - m.Value, - m.Timestamp, - map[string]interface{}{ - "id": m.Job.ID, - "name": m.Job.Name, - }, - ) - if err != nil { - return fmt.Errorf("append batch: %w", err) - } - } - - if err := batch.Send(); err != nil { - return fmt.Errorf("send batch: %w", err) - } - return nil -} - -func timeFromUnixNano(ts int64) time.Time { - const nsecPerSecond int64 = 1e09 - sec := ts / nsecPerSecond - nsec := ts - (sec * nsecPerSecond) - return time.Unix(sec, nsec) -} - -func convertEvents(events []models.SpanEvent) ([]time.Time, []string, []map[string]string) { - var ( - times []time.Time - names []string - attrs []map[string]string - ) - for _, event := range events { - times = append(times, timeFromUnixNano(int64(event.Time))) - names = append(names, event.Name) - attrs = append(attrs, event.Attributes) - } - return times, names, attrs -} - -func convertLinks(links []models.SpanLink) ([]string, []string, []string, []map[string]string) { - var ( - traceIDs []string - spanIDs []string - states []string - attrs []map[string]string - ) - for _, link := range links { - traceIDs = append(traceIDs, link.TraceID) - spanIDs = append(spanIDs, link.SpanID) - states = append(states, link.TraceState) - attrs = append(attrs, link.Attributes) - } - return traceIDs, spanIDs, states, attrs -} - -func InsertTraces(ctx context.Context, traces []models.Trace, client *Client) error { - const query string = `INSERT INTO {db: Identifier}.traces` - var params = map[string]string{ - "db": client.dbName, - } - - ctx = WithParameters(ctx, params) - - batch, err := client.PrepareBatch(ctx, query) - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTraces] %w", err) - } - - scopeName := "" - scopeVersion := "" - - for _, trace := range traces { - for _, span := range trace { - serviceName := "" - if sn, ok := span.Resource.Attributes["service.name"]; ok { - serviceName = sn - } - - eventTimes, eventNames, eventAttrs := convertEvents(span.Events) - linkTraceIDs, linkSpanIDs, linkStates, linkAttrs := convertLinks(span.Links) - - err = batch.Append( - timeFromUnixNano(int64(span.StartTime)), - span.TraceID, - span.SpanID, - span.ParentSpanID, - span.TraceState, - span.Name, - span.Kind.Name(), - serviceName, - span.Resource.Attributes, - scopeName, - scopeVersion, - span.Attributes, - int64(span.EndTime-span.StartTime), - span.Status.Code.Name(), - span.Status.Message, - eventTimes, - eventNames, - eventAttrs, - linkTraceIDs, - linkSpanIDs, - linkStates, - linkAttrs, - ) - - if err != nil { - return fmt.Errorf("[clickhouse.Client.InsertTraces] %w", err) - } - } - } - - return batch.Send() -} - -func InsertTrace(ctx context.Context, trace []*models.Span, client *Client) error { - return InsertTraces(ctx, []models.Trace{trace}, client) -} diff --git a/pkg/clickhouse/store.go b/pkg/clickhouse/store.go deleted file mode 100644 index 1f4105e..0000000 --- a/pkg/clickhouse/store.go +++ /dev/null @@ -1,75 +0,0 @@ -package clickhouse - -import ( - "context" - "fmt" - "time" - - "github.com/cluttrdev/gitlab-exporter/pkg/models" -) - -type ClickHouseDataStore struct { - client *Client -} - -func NewClickHouseDataStore(c *Client) *ClickHouseDataStore { - return &ClickHouseDataStore{ - client: c, - } -} - -func (ds *ClickHouseDataStore) Initialize(ctx context.Context) error { - if err := ds.client.CreateTables(ctx); err != nil { - return fmt.Errorf("error creating tables: %w", err) - } - - return nil -} - -func (ds *ClickHouseDataStore) CheckReadiness(ctx context.Context) error { - return ds.client.CheckReadiness(ctx) -} - -func (ds *ClickHouseDataStore) InsertPipelines(ctx context.Context, pipelines []*models.Pipeline) error { - return InsertPipelines(ctx, pipelines, ds.client) -} - -func (ds *ClickHouseDataStore) InsertJobs(ctx context.Context, jobs []*models.Job) error { - return InsertJobs(ctx, jobs, ds.client) -} - -func (ds *ClickHouseDataStore) InsertSections(ctx context.Context, sections []*models.Section) error { - return InsertSections(ctx, sections, ds.client) -} - -func (ds *ClickHouseDataStore) InsertBridges(ctx context.Context, bridges []*models.Bridge) error { - return InsertBridges(ctx, bridges, ds.client) -} - -func (ds *ClickHouseDataStore) InsertPipelineHierarchy(ctx context.Context, hierarchy *models.PipelineHierarchy) error { - return InsertPipelineHierarchy(ctx, hierarchy, ds.client) -} - -func (ds *ClickHouseDataStore) InsertTraces(ctx context.Context, traces []models.Trace) error { - return InsertTraces(ctx, traces, ds.client) -} - -func (ds *ClickHouseDataStore) InsertTestReports(ctx context.Context, reports []*models.PipelineTestReport) error { - return InsertTestReports(ctx, reports, ds.client) -} - -func (ds *ClickHouseDataStore) InsertTestSuites(ctx context.Context, suites []*models.PipelineTestSuite) error { - return InsertTestSuites(ctx, suites, ds.client) -} - -func (ds *ClickHouseDataStore) InsertTestCases(ctx context.Context, cases []*models.PipelineTestCase) error { - return InsertTestCases(ctx, cases, ds.client) -} - -func (ds *ClickHouseDataStore) InsertJobMetrics(ctx context.Context, metrics []*models.JobMetric) error { - return InsertJobMetrics(ctx, metrics, ds.client) -} - -func (ds *ClickHouseDataStore) QueryProjectPipelinesLatestUpdate(ctx context.Context, projectID int64) (map[int64]time.Time, error) { - return ds.client.QueryProjectPipelinesLatestUpdate(ctx, projectID) -} diff --git a/pkg/config/config.go b/pkg/config/config.go index 8aa014e..2e3952a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -5,11 +5,10 @@ import ( ) type Config struct { - GitLab GitLab `default:"{}" yaml:"gitlab"` - ClickHouse ClickHouse `default:"{}" yaml:"clickhouse"` - Endpoints []Endpoint `default:"[]" yaml:"endpoints"` - Projects []Project `default:"[]" yaml:"projects"` - Server Server `default:"{}" yaml:"server"` + GitLab GitLab `default:"{}" yaml:"gitlab"` + Endpoints []Endpoint `default:"[]" yaml:"endpoints"` + Projects []Project `default:"[]" yaml:"projects"` + Server Server `default:"{}" yaml:"server"` } type GitLab struct { @@ -25,14 +24,6 @@ type GitLab struct { } `yaml:"client"` } -type ClickHouse struct { - Host string `default:"localhost" yaml:"host"` - Port string `default:"9000" yaml:"port"` - Database string `default:"default" yaml:"database"` - User string `default:"default" yaml:"user"` - Password string `default:"" yaml:"password"` -} - type Endpoint struct { Address string `default:"" yaml:"address"` } diff --git a/pkg/config/parser.go b/pkg/config/parser.go index 176c002..cfe7bff 100644 --- a/pkg/config/parser.go +++ b/pkg/config/parser.go @@ -12,8 +12,7 @@ import ( // Config implements the Unmarshaler interface func (c *Config) UnmarshalYAML(v *yaml.Node) error { type _Config struct { - GitLab GitLab `yaml:"gitlab"` - ClickHouse ClickHouse `yaml:"clickhouse"` + GitLab GitLab `yaml:"gitlab"` Endpoints []Endpoint `yaml:"endpoints"` @@ -24,7 +23,6 @@ func (c *Config) UnmarshalYAML(v *yaml.Node) error { var _cfg _Config _cfg.GitLab = c.GitLab - _cfg.ClickHouse = c.ClickHouse _cfg.Endpoints = c.Endpoints _cfg.Server = c.Server @@ -33,7 +31,6 @@ func (c *Config) UnmarshalYAML(v *yaml.Node) error { } c.GitLab = _cfg.GitLab - c.ClickHouse = _cfg.ClickHouse c.Endpoints = _cfg.Endpoints c.Server = _cfg.Server diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a8c9f5a..eab44ce 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -14,19 +14,16 @@ import ( grpc_client "github.com/cluttrdev/gitlab-exporter/grpc/client" - "github.com/cluttrdev/gitlab-exporter/pkg/clickhouse" "github.com/cluttrdev/gitlab-exporter/pkg/config" "github.com/cluttrdev/gitlab-exporter/pkg/gitlab" - "github.com/cluttrdev/gitlab-exporter/internal/datastore" "github.com/cluttrdev/gitlab-exporter/internal/worker" ) type Controller struct { - config config.Config - GitLab gitlab.Client - Exporter *Exporter - DataStore datastore.DataStore + config config.Config + GitLab gitlab.Client + Exporter *Exporter workers []worker.Worker } @@ -48,10 +45,6 @@ func (c *Controller) configure(cfg config.Config) error { return err } - if err := c.configureClickHouseDataStore(cfg.ClickHouse); err != nil { - return err - } - if err := c.configureExporter(cfg.Endpoints); err != nil { return err } @@ -72,24 +65,6 @@ func (c *Controller) configureGitLabClient(cfg config.GitLab) error { }) } -func (c *Controller) configureClickHouseDataStore(cfg config.ClickHouse) error { - var client clickhouse.Client - - conf := clickhouse.ClientConfig{ - Host: cfg.Host, - Port: cfg.Port, - Database: cfg.Database, - User: cfg.User, - Password: cfg.Password, - } - if err := client.Configure(conf); err != nil { - return err - } - - c.DataStore = clickhouse.NewClickHouseDataStore(&client) - return nil -} - func (c *Controller) configureExporter(cfg []config.Endpoint) error { endpoints := make([]grpc_client.EndpointConfig, 0, len(cfg)) for _, cc := range cfg { @@ -113,9 +88,19 @@ func (c *Controller) configureWorkers(cfg config.Config) error { for _, prj := range cfg.Projects { if prj.CatchUp.Enabled { - workers = append(workers, worker.NewCatchUpProjectWorker(prj, &c.GitLab, c.DataStore)) + task := ProjectCatchUpTask{ + Config: prj, + } + workers = append(workers, worker.NewWorker(func(ctx context.Context) { + task.Run(c, ctx) + })) + } + task := ProjectExportTask{ + Config: prj, } - workers = append(workers, worker.NewExportProjectWorker(prj, &c.GitLab, c.DataStore)) + workers = append(workers, worker.NewWorker(func(ctx context.Context) { + task.Run(c, ctx) + })) } c.workers = workers @@ -124,10 +109,6 @@ func (c *Controller) configureWorkers(cfg config.Config) error { } func (c *Controller) init(ctx context.Context) error { - if err := c.DataStore.Initialize(ctx); err != nil { - return err - } - return nil } @@ -135,11 +116,6 @@ func (c *Controller) CheckReadiness(ctx context.Context) error { if err := c.GitLab.CheckReadiness(ctx); err != nil { return err } - - if err := c.DataStore.CheckReadiness(ctx); err != nil { - return err - } - return nil } diff --git a/pkg/controller/tasks.go b/pkg/controller/tasks.go new file mode 100644 index 0000000..628ddc0 --- /dev/null +++ b/pkg/controller/tasks.go @@ -0,0 +1,242 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "log" + "sync" + "time" + + "github.com/cluttrdev/gitlab-exporter/pkg/config" + "github.com/cluttrdev/gitlab-exporter/pkg/gitlab" + "github.com/cluttrdev/gitlab-exporter/pkg/models" +) + +type ExportPipelineHierarchyOptions struct { + ProjectID int64 + PipelineID int64 + + ExportSections bool + ExportTestReports bool + ExportTraces bool + ExportJobMetrics bool +} + +func ExportPipelineHierarchy(ctl *Controller, ctx context.Context, opts ExportPipelineHierarchyOptions) error { + opt := &gitlab.GetPipelineHierarchyOptions{ + FetchSections: opts.ExportSections, + FetchJobMetrics: opts.ExportSections, + } + + phr := <-ctl.GitLab.GetPipelineHierarchy(ctx, opts.ProjectID, opts.PipelineID, opt) + if err := phr.Error; err != nil { + return fmt.Errorf("error getting pipeline hierarchy: %w", err) + } + ph := phr.PipelineHierarchy + + if err := ctl.Exporter.RecordPipelineHierarchy(ctx, ph); err != nil { + return fmt.Errorf("error exporting pipeline hierarchy: %w", err) + } + + if opts.ExportTraces { + traces := ph.GetAllTraces() + ts := make([]*models.Trace, 0, len(traces)) + for _, t := range traces { + ts = append(ts, &t) + } + if err := ctl.Exporter.RecordTraces(ctx, ts); err != nil { + return fmt.Errorf("error exporting traces: %w", err) + } + } + + if opts.ExportTestReports { + trs, err := ctl.GitLab.GetPipelineHierarchyTestReports(ctx, ph) + if err != nil { + return fmt.Errorf("error getting testreports: %w", err) + } + tss := []*models.PipelineTestSuite{} + tcs := []*models.PipelineTestCase{} + for _, tr := range trs { + tss = append(tss, tr.TestSuites...) + for _, ts := range tr.TestSuites { + tcs = append(tcs, ts.TestCases...) + } + } + if err := ctl.Exporter.RecordTestReports(ctx, trs); err != nil { + return fmt.Errorf("error exporting testreports: %w", err) + } + if err := ctl.Exporter.RecordTestSuites(ctx, tss); err != nil { + return fmt.Errorf("error exporting testsuites: %w", err) + } + if err := ctl.Exporter.RecordTestCases(ctx, tcs); err != nil { + return fmt.Errorf("error exporting testcases: %w", err) + } + } + + return nil +} + +// =========================================================================== + +type ProjectExportTask struct { + Config config.Project +} + +func (t *ProjectExportTask) Run(ctl *Controller, ctx context.Context) { + interval := 60 * time.Second + + opt := gitlab.ListProjectPipelineOptions{ + PerPage: 100, + Page: 1, + + Scope: &[]string{"finished"}[0], + } + + before := time.Now().UTC().Add(-interval) + opt.UpdatedBefore = &before + + var first bool = true + ticker := time.NewTicker(1 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if first { + ticker.Stop() + ticker = time.NewTicker(interval) + first = false + } + + now := time.Now().UTC() + opt.UpdatedAfter = opt.UpdatedBefore + opt.UpdatedBefore = &now + + var wg sync.WaitGroup + for r := range ctl.GitLab.ListProjectPipelines(ctx, t.Config.Id, opt) { + if r.Error != nil { + log.Println(r.Error) + continue + } + + wg.Add(1) + go func(pid int64) { + defer wg.Done() + + opts := ExportPipelineHierarchyOptions{ + ProjectID: t.Config.Id, + PipelineID: pid, + + ExportSections: t.Config.Export.Sections.Enabled, + ExportTestReports: t.Config.Export.TestReports.Enabled, + ExportTraces: t.Config.Export.Traces.Enabled, + ExportJobMetrics: t.Config.Export.Sections.Enabled, // for now, export metrics if we fetch the logs for sections anyway + } + + if err := ExportPipelineHierarchy(ctl, ctx, opts); err != nil { + log.Printf("error exporting pipeline hierarchy: %s\n", err) + } else { + log.Printf("Exported projects/%d/pipelines/%d\n", opts.ProjectID, opts.PipelineID) + } + }(r.Pipeline.ID) + } + wg.Wait() + } + } +} + +// =========================================================================== + +type ProjectCatchUpTask struct { + Config config.Project +} + +func (t *ProjectCatchUpTask) Run(ctl *Controller, ctx context.Context) { + opt := gitlab.ListProjectPipelineOptions{ + PerPage: 100, + Page: 1, + + Scope: &[]string{"finished"}[0], + } + if t.Config.CatchUp.UpdatedAfter != "" { + after, err := time.Parse("2006-01-02T15:04:05Z", t.Config.CatchUp.UpdatedAfter) + if err != nil { + log.Println(err) + } else { + opt.UpdatedAfter = &after + } + } + if t.Config.CatchUp.UpdatedBefore != "" { + before, err := time.Parse("2006-01-02T15:04:05Z", t.Config.CatchUp.UpdatedBefore) + if err != nil { + log.Println(err) + } else { + opt.UpdatedBefore = &before + } + } + + ch := t.produce(ctl, ctx, opt) + t.process(ctl, ctx, ch) +} + +func (t *ProjectCatchUpTask) produce(ctl *Controller, ctx context.Context, opt gitlab.ListProjectPipelineOptions) <-chan int64 { + ch := make(chan int64) + + go func() { + defer close(ch) + + resChan := ctl.GitLab.ListProjectPipelines(ctx, t.Config.Id, opt) + for { + select { + case <-ctx.Done(): + return + case r, ok := <-resChan: + if !ok { // channel closed + return + } + + if r.Error != nil && !errors.Is(r.Error, context.Canceled) { + log.Println(r.Error) + continue + } + + ch <- r.Pipeline.ID + } + } + }() + + return ch +} + +func (t *ProjectCatchUpTask) process(ctl *Controller, ctx context.Context, pipelineChan <-chan int64) { + numWorkers := 10 + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for pipelineID := range pipelineChan { + opts := ExportPipelineHierarchyOptions{ + ProjectID: t.Config.Id, + PipelineID: pipelineID, + + ExportSections: t.Config.Export.Sections.Enabled, + ExportTestReports: t.Config.Export.TestReports.Enabled, + ExportTraces: t.Config.Export.Traces.Enabled, + ExportJobMetrics: t.Config.Export.Sections.Enabled, // for now, export metrics if we fetch the logs for sections anyway + } + + if err := ExportPipelineHierarchy(ctl, ctx, opts); err != nil { + if !errors.Is(err, context.Canceled) { + log.Printf("error exporting pipeline hierarchy: %s\n", err) + } + } else { + log.Printf("Caught up on projects/%d/pipelines/%d\n", opts.ProjectID, opts.PipelineID) + } + } + }() + } + wg.Wait() +} diff --git a/pkg/tasks/deduplicate.go b/pkg/tasks/deduplicate.go deleted file mode 100644 index a1960a9..0000000 --- a/pkg/tasks/deduplicate.go +++ /dev/null @@ -1,148 +0,0 @@ -package tasks - -import ( - "context" - "fmt" - "regexp" - "strings" - - "github.com/cluttrdev/gitlab-exporter/pkg/clickhouse" - "golang.org/x/exp/slices" -) - -type DeduplicateTableOptions struct { - Database string - Table string - Final *bool - By []string - Except []string - ThrowIfNoop *bool -} - -func DeduplicateTable(ctx context.Context, opt DeduplicateTableOptions, ch *clickhouse.Client) error { - if err := validateDeduplicateTableOptions(ctx, opt, ch); err != nil { - return fmt.Errorf("error validating deduplication options: %w", err) - } - - query, params := PrepareDeduplicateQuery(opt) - - ctx = clickhouse.WithParameters(ctx, params) - return ch.Exec(ctx, query) -} - -func PrepareDeduplicateQuery(opt DeduplicateTableOptions) (string, map[string]string) { - var ( - query string - params = map[string]string{} - ) - - // OPTIMIZE - query = "OPTIMIZE TABLE {database:Identifier}.{table:Identifier}" - - var dbName string = opt.Database - if dbName == "" { - dbName = "gitlab_ci" - } - params["database"] = dbName - params["table"] = opt.Table - - // FINAL - if opt.Final == nil || *opt.Final { - query += " FINAL" - } - - // DEDUPLICATE - query += " DEDUPLICATE" - - // BY - if len(opt.By) > 0 { - query += " BY " + strings.Join(opt.By, ",") - } else if len(opt.Except) > 0 { - query += " BY *" - } - - // EXCEPT - if len(opt.Except) == 1 { - query += " EXCEPT " + opt.Except[0] - } else if len(opt.Except) > 1 { - query += " EXCEPT (" + strings.Join(opt.Except, ",") + ")" - } - - // SETTINGS - if opt.ThrowIfNoop != nil { - if *opt.ThrowIfNoop { - query += " SETTINGS optimize_throw_if_noop=1" - } else { - query += " SETTINGS optimize_throw_if_noop=0" - } - } - - return query, params -} - -func validateDeduplicateTableOptions(ctx context.Context, opt DeduplicateTableOptions, ch *clickhouse.Client) error { - var dbName string = opt.Database - if dbName == "" { - dbName = "gitlab_ci" - } - - // validate database identifier - if err := matchIdentifier(dbName); err != nil { - return err - } - - // validate table identifier - if err := matchIdentifier(opt.Table); err != nil { - return err - } - - // validate column identifiers - cols, err := getColumnNames(ctx, dbName, opt.Table, ch) - if err != nil { - return fmt.Errorf("error getting table column names: %w", err) - } - - for _, c := range append(append([]string{}, opt.By...), opt.Except...) { - if !slices.Contains(cols, c) { - return fmt.Errorf("Table `%s` has no column `%s`", opt.Table, c) - } - } - - return nil -} - -func matchIdentifier(s string) error { - pattern := `^[a-zA-Z_][0-9a-zA-Z_]*$` - matched, err := regexp.MatchString(pattern, s) - if err != nil { - return err - } else if !matched { - return fmt.Errorf("invalid identifier: `%s`", s) - } - return nil -} - -func getColumnNames(ctx context.Context, database string, table string, ch *clickhouse.Client) ([]string, error) { - var columnNames []string - - query_tpl := ` - SELECT DISTINCT COLUMN_NAME FROM information_schema.COLUMNS - WHERE (TABLE_SCHEMA = '%s') AND (TABLE_NAME = '%s') - ` - - query := fmt.Sprintf(query_tpl, database, table) - - var results []struct { - ColumnName string `ch:"COLUMN_NAME"` - } - - if err := ch.Select(ctx, &results, query); err != nil { - return nil, err - } - - for _, r := range results { - columnNames = append(columnNames, r.ColumnName) - } - - return columnNames, nil -} diff --git a/pkg/tasks/export.go b/pkg/tasks/export.go deleted file mode 100644 index 57f1822..0000000 --- a/pkg/tasks/export.go +++ /dev/null @@ -1,88 +0,0 @@ -package tasks - -import ( - "context" - "fmt" - - "github.com/cluttrdev/gitlab-exporter/pkg/gitlab" - "github.com/cluttrdev/gitlab-exporter/pkg/models" - - "github.com/cluttrdev/gitlab-exporter/internal/datastore" -) - -type ExportPipelineHierarchyOptions struct { - ProjectID int64 - PipelineID int64 - - ExportSections bool - ExportTestReports bool - ExportTraces bool - ExportJobMetrics bool -} - -func ExportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) error { - return <-exportPipelineHierarchy(ctx, opts, gl, ds) -} - -func exportPipelineHierarchy(ctx context.Context, opts ExportPipelineHierarchyOptions, gl *gitlab.Client, ds datastore.DataStore) <-chan error { - out := make(chan error) - - go func() { - defer close(out) - - opt := &gitlab.GetPipelineHierarchyOptions{ - FetchSections: opts.ExportSections, - FetchJobMetrics: opts.ExportSections, - } - - phr := <-gl.GetPipelineHierarchy(ctx, opts.ProjectID, opts.PipelineID, opt) - if err := phr.Error; err != nil { - out <- fmt.Errorf("error getting pipeline hierarchy: %w", err) - return - } - ph := phr.PipelineHierarchy - - if err := ds.InsertPipelineHierarchy(ctx, ph); err != nil { - out <- fmt.Errorf("error inserting pipeline hierarchy: %w", err) - return - } - - if opts.ExportTraces { - pts := ph.GetAllTraces() - if err := ds.InsertTraces(ctx, pts); err != nil { - out <- fmt.Errorf("error inserting traces: %w", err) - return - } - } - - if opts.ExportTestReports { - trs, err := gl.GetPipelineHierarchyTestReports(ctx, ph) - if err != nil { - out <- fmt.Errorf("error getting testreports: %w", err) - return - } - tss := []*models.PipelineTestSuite{} - tcs := []*models.PipelineTestCase{} - for _, tr := range trs { - tss = append(tss, tr.TestSuites...) - for _, ts := range tr.TestSuites { - tcs = append(tcs, ts.TestCases...) - } - } - if err = ds.InsertTestReports(ctx, trs); err != nil { - out <- fmt.Errorf("error inserting testreports: %w", err) - return - } - if err = ds.InsertTestSuites(ctx, tss); err != nil { - out <- fmt.Errorf("error inserting testsuites: %w", err) - return - } - if err = ds.InsertTestCases(ctx, tcs); err != nil { - out <- fmt.Errorf("error inserting testcases: %w", err) - return - } - } - }() - - return out -} diff --git a/test/config/config_test.go b/test/config/config_test.go index 99e1f3e..e55fea4 100644 --- a/test/config/config_test.go +++ b/test/config/config_test.go @@ -15,12 +15,6 @@ func defaultConfig() config.Config { cfg.GitLab.Api.Token = "" cfg.GitLab.Client.Rate.Limit = 0.0 - cfg.ClickHouse.Host = "localhost" - cfg.ClickHouse.Port = "9000" - cfg.ClickHouse.Database = "default" - cfg.ClickHouse.User = "default" - cfg.ClickHouse.Password = "" - cfg.Endpoints = []config.Endpoint{} cfg.Projects = []config.Project{} @@ -142,15 +136,11 @@ func TestLoad_DataWithDefaults(t *testing.T) { client: rate: limit: 20 - - clickhouse: - host: clickhouse.example.com `) expected := defaultConfig() expected.GitLab.Api.Token = "glpat-xxxxxxxxxxxxxxxxxxxx" expected.GitLab.Client.Rate.Limit = 20 - expected.ClickHouse.Host = "clickhouse.example.com" cfg := defaultConfig() if err := config.Load(data, &cfg); err != nil { diff --git a/test/tasks/deduplicate_test.go b/test/tasks/deduplicate_test.go deleted file mode 100644 index 4ec69ee..0000000 --- a/test/tasks/deduplicate_test.go +++ /dev/null @@ -1,235 +0,0 @@ -package tasks_test - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - - "github.com/cluttrdev/gitlab-exporter/pkg/tasks" -) - -func checkQuery(t *testing.T, want string, got string) { - if want != got { - t.Errorf("Expected `%s`, got `%s`", want, got) - } -} - -func checkParams(t *testing.T, want map[string]string, got map[string]string) { - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("Config mismatch (-want +got):\n%s", diff) - } -} - -func TestPrepareDeduplicateQuery_Minimal(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{}, - Except: []string{}, - ThrowIfNoop: nil, - } - - expectedQuery := "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_Full(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "gitlab_ci", - Table: "pipelines", - Final: &[]bool{true}[0], - By: []string{"id", "project_id"}, - Except: []string{"finished_at", "status"}, - ThrowIfNoop: &[]bool{true}[0], - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} FINAL DEDUPLICATE" + - " BY id,project_id" + - " EXCEPT (finished_at,status)" + - " SETTINGS optimize_throw_if_noop=1" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithFinal(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{true}[0], - By: []string{}, - Except: []string{}, - ThrowIfNoop: nil, - } - - expectedQuery := "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} FINAL DEDUPLICATE" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithThrowIfNoopTrue(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{}, - Except: []string{}, - ThrowIfNoop: &[]bool{true}[0], - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" + - " SETTINGS optimize_throw_if_noop=1" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithThrowIfNoopFalse(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{}, - Except: []string{}, - ThrowIfNoop: &[]bool{false}[0], - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" + - " SETTINGS optimize_throw_if_noop=0" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithBy(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{"id", "project_id"}, - Except: []string{}, - ThrowIfNoop: nil, - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" + - " BY id,project_id" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithSingleExcept(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{}, - Except: []string{"project_id"}, - ThrowIfNoop: nil, - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" + - " BY * EXCEPT project_id" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithMultipleExcept(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{}, - Except: []string{"project_id", "status"}, - ThrowIfNoop: nil, - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" + - " BY * EXCEPT (project_id,status)" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -} - -func TestPrepareDeduplicateQuery_WithByAndExcept(t *testing.T) { - opt := tasks.DeduplicateTableOptions{ - Database: "", - Table: "pipelines", - Final: &[]bool{false}[0], - By: []string{"id"}, - Except: []string{"project_id", "status"}, - ThrowIfNoop: nil, - } - - expectedQuery := "" + - "OPTIMIZE TABLE {database:Identifier}.{table:Identifier} DEDUPLICATE" + - " BY id EXCEPT (project_id,status)" - expectedParams := map[string]string{ - "database": "gitlab_ci", - "table": "pipelines", - } - - query, params := tasks.PrepareDeduplicateQuery(opt) - - checkQuery(t, expectedQuery, query) - checkParams(t, expectedParams, params) -}