diff --git a/cmd/catchup.go b/cmd/catchup.go index cd3e2ab..c9cf83d 100644 --- a/cmd/catchup.go +++ b/cmd/catchup.go @@ -11,6 +11,9 @@ import ( "syscall" "github.com/cluttrdev/cli" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/cluttrdev/gitlab-exporter/pkg/worker" @@ -53,10 +56,6 @@ func (c *CatchUpConfig) RegisterFlags(fs *flag.FlagSet) { } func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error { - // setup daemon - ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) - defer cancel() - // load configuration cfg := config.Default() if err := loadConfig(c.RootConfig.filename, c.flags, &cfg); err != nil { @@ -73,6 +72,12 @@ func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error { } }) + if c.debug { + cfg.HTTP.Enabled = true + cfg.HTTP.Debug = true + cfg.Log.Level = "debug" + } + if cfg.Log.Level == "debug" { writeConfig(c.out, cfg) } @@ -96,39 +101,84 @@ func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error { return err } - // configure workers - pool := worker.NewWorkerPool(42) - var wg sync.WaitGroup - for _, p := range cfg.Projects { - if !p.CatchUp.Enabled { - continue - } + g := &run.Group{} - job := jobs.ProjectCatchUpJob{ - Config: p, - GitLab: gitlabclient, - Exporter: exp, + pool := worker.NewWorkerPool(42) + { // worker pool + ctx, cancel := context.WithCancel(context.Background()) + + g.Add(func() error { // execute + slog.Info("Starting worker pool") + pool.Start(ctx) + <-ctx.Done() + return ctx.Err() + }, func(err error) { // interrupt + defer cancel() + slog.Info("Stopping worker pool...") + pool.Stop() + slog.Info("Stopping worker pool... done") + }) + } - WorkerPool: pool, - } - wg.Add(1) - go func() { - defer wg.Done() - job.Run(ctx) - }() + { // jobs + ctx, cancel := context.WithCancel(context.Background()) + + g.Add(func() error { // execute + var wg sync.WaitGroup + for _, p := range cfg.Projects { + if !p.CatchUp.Enabled { + continue + } + + job := jobs.ProjectCatchUpJob{ + Config: p, + GitLab: gitlabclient, + Exporter: exp, + + WorkerPool: pool, + } + wg.Add(1) + go func() { + defer wg.Done() + job.Run(ctx) + }() + } + + wg.Wait() + return nil + }, func(err error) { // interrupt + slog.Info("Cancelling jobs...") + cancel() + <-ctx.Done() + slog.Info("Cancelling jobs... done") + }) } - go func() { - // cancel context when work is done to stop worker pool - wg.Wait() - cancel() - }() + if cfg.HTTP.Enabled { + colls := []prometheus.Collector{ + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + } + for _, endpoint := range cfg.Endpoints { + if mc := exp.MetricsCollectorFor(endpoint.Address); mc != nil { + colls = append(colls, mc) + } + } + reg := prometheus.NewRegistry() + reg.MustRegister(colls...) - slog.Info("Starting workers") - pool.Start(ctx) + g.Add(serveHTTP(cfg.HTTP, reg)) + } - go startServer(ctx, cfg.HTTP, func() error { return nil }) + { // signal handler + ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + g.Add(func() error { // execute + <-ctx.Done() + return ctx.Err() + }, func(err error) { // interrupt + cancel() + }) + } - <-ctx.Done() - return nil + return g.Run() } diff --git a/cmd/root.go b/cmd/root.go index 89306ed..29c64e9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -25,6 +25,8 @@ type RootConfig struct { out io.Writer flags *flag.FlagSet + + debug bool } func NewRootCmd(out io.Writer) *cli.Command { @@ -52,6 +54,7 @@ func (c *RootConfig) RegisterFlags(fs *flag.FlagSet) { fs.String("gitlab-api-token", defaults.GitLab.Api.Token, fmt.Sprintf("The GitLab API Token (default: '%s').", defaults.GitLab.Api.Token)) fs.StringVar(&c.filename, "config", "", "Configuration file to use.") + fs.BoolVar(&c.debug, "debug", false, "Enable debug mode.") } func (c *RootConfig) Exec(context.Context, []string) error { diff --git a/cmd/run.go b/cmd/run.go index 53de2c9..44c464b 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -2,10 +2,13 @@ package cmd import ( "context" + "errors" "flag" "fmt" "io" "log/slog" + "net/http" + "net/http/pprof" "os/signal" "strconv" "strings" @@ -14,16 +17,19 @@ import ( "golang.org/x/exp/slices" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/cluttrdev/cli" - "github.com/cluttrdev/gitlab-exporter/pkg/healthz" "github.com/cluttrdev/gitlab-exporter/pkg/worker" "github.com/cluttrdev/gitlab-exporter/internal/config" "github.com/cluttrdev/gitlab-exporter/internal/exporter" "github.com/cluttrdev/gitlab-exporter/internal/gitlab" "github.com/cluttrdev/gitlab-exporter/internal/jobs" - "github.com/cluttrdev/gitlab-exporter/internal/server" ) type RunConfig struct { @@ -81,10 +87,6 @@ func (c *RunConfig) RegisterFlags(fs *flag.FlagSet) { } func (c *RunConfig) Exec(ctx context.Context, _ []string) error { - // setup daemon - ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) - defer cancel() - // load configuration cfg := config.Default() if err := loadConfig(c.RootConfig.filename, c.flags, &cfg); err != nil { @@ -101,6 +103,12 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { } }) + if c.debug { + cfg.HTTP.Enabled = true + cfg.HTTP.Debug = true + cfg.Log.Level = "debug" + } + // add projects passed to run command for _, pid := range c.projects { exists := slices.ContainsFunc(cfg.Projects, func(p config.Project) bool { @@ -138,66 +146,136 @@ func (c *RunConfig) Exec(ctx context.Context, _ []string) error { return err } - // configure workers + g := &run.Group{} + pool := worker.NewWorkerPool(42) - var wg sync.WaitGroup - for _, p := range cfg.Projects { - if c.catchup && p.CatchUp.Enabled { - job := jobs.ProjectCatchUpJob{ - Config: p, - GitLab: gitlabclient, - Exporter: exp, - - WorkerPool: pool, - } - wg.Add(1) - go func() { - defer wg.Done() - job.Run(ctx) - }() - } + { // worker pool + ctx, cancel := context.WithCancel(context.Background()) + + g.Add(func() error { // execute + slog.Info("Starting worker pool") + pool.Start(ctx) + <-ctx.Done() + return ctx.Err() + }, func(err error) { // interrupt + defer cancel() + slog.Info("Stopping worker pool...") + pool.Stop() + slog.Info("Stopping worker pool... done") + }) + } - job := jobs.ProjectExportJob{ - Config: p, - GitLab: gitlabclient, - Exporter: exp, + { // jobs + ctx, cancel := context.WithCancel(context.Background()) + + g.Add(func() error { // execute + var wg sync.WaitGroup + for _, p := range cfg.Projects { + if c.catchup && p.CatchUp.Enabled { + job := jobs.ProjectCatchUpJob{ + Config: p, + GitLab: gitlabclient, + Exporter: exp, + + WorkerPool: pool, + } + wg.Add(1) + go func() { + defer wg.Done() + job.Run(ctx) + }() + } + + job := jobs.ProjectExportJob{ + Config: p, + GitLab: gitlabclient, + Exporter: exp, + + WorkerPool: pool, + } + wg.Add(1) + go func() { + defer wg.Done() + job.Run(ctx) + }() + } - WorkerPool: pool, - } - wg.Add(1) - go func() { - defer wg.Done() - job.Run(ctx) - }() + wg.Wait() + return nil + }, func(err error) { // interrupt + slog.Info("Cancelling jobs...") + cancel() + <-ctx.Done() + slog.Info("Cancelling jobs... done") + }) } - go func() { - // cancel context when work is done to stop worker pool - wg.Wait() - cancel() - }() + if cfg.HTTP.Enabled { + colls := []prometheus.Collector{ + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + } + for _, endpoint := range cfg.Endpoints { + if mc := exp.MetricsCollectorFor(endpoint.Address); mc != nil { + colls = append(colls, mc) + } + } + reg := prometheus.NewRegistry() + reg.MustRegister(colls...) - go startServer(ctx, cfg.HTTP, func() error { - return gitlabclient.CheckReadiness(ctx) - }) + g.Add(serveHTTP(cfg.HTTP, reg)) + } - slog.Info("Starting workers") - pool.Start(ctx) + { // signal handler + ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + g.Add(func() error { // execute + <-ctx.Done() + return ctx.Err() + }, func(err error) { // interrupt + cancel() + }) + } - <-ctx.Done() - return nil + return g.Run() } -func startServer(ctx context.Context, cfg config.HTTP, ready healthz.Check) { - srv := server.New(server.ServerConfig{ - Host: cfg.Host, - Port: cfg.Port, - Debug: false, +func serveHTTP(cfg config.HTTP, reg *prometheus.Registry) (func() error, func(error)) { + m := http.NewServeMux() + + m.Handle( + "/metrics", + promhttp.InstrumentMetricHandler( + reg, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), + ), + ) + + if cfg.Debug { + m.HandleFunc("/debug/pprof/", pprof.Index) + m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + m.HandleFunc("/debug/pprof/profile", pprof.Profile) + m.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + m.HandleFunc("/debug/pprof/trace", pprof.Trace) + } + + httpServer := &http.Server{ + Addr: fmt.Sprintf("%s:%s", cfg.Host, cfg.Port), + Handler: m, + } - ReadinessCheck: ready, - }) + execute := func() error { + slog.Info("Starting http server", "addr", httpServer.Addr) + return httpServer.ListenAndServe() + } - if err := srv.Serve(ctx); err != nil { - slog.Error("error during server shutdown", "error", err) + interrupt := func(error) { + slog.Info("Stopping http server...") + if err := httpServer.Shutdown(context.Background()); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + slog.Error("error shutting down http server", "error", err) + } + } + slog.Info("Stopping http server... done") } + + return execute, interrupt }