Skip to content

Commit

Permalink
Refactor run and catchup commands
Browse files Browse the repository at this point in the history
  • Loading branch information
cluttrdev committed Feb 25, 2024
1 parent f563081 commit 10bad08
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 87 deletions.
114 changes: 82 additions & 32 deletions cmd/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"syscall"

"github.com/cluttrdev/cli"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

"github.com/cluttrdev/gitlab-exporter/pkg/worker"

Expand Down Expand Up @@ -53,10 +56,6 @@ func (c *CatchUpConfig) RegisterFlags(fs *flag.FlagSet) {
}

func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error {
// setup daemon
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

// load configuration
cfg := config.Default()
if err := loadConfig(c.RootConfig.filename, c.flags, &cfg); err != nil {
Expand All @@ -73,6 +72,12 @@ func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error {
}
})

if c.debug {
cfg.HTTP.Enabled = true
cfg.HTTP.Debug = true
cfg.Log.Level = "debug"
}

if cfg.Log.Level == "debug" {
writeConfig(c.out, cfg)
}
Expand All @@ -96,39 +101,84 @@ func (c *CatchUpConfig) Exec(ctx context.Context, args []string) error {
return err
}

// configure workers
pool := worker.NewWorkerPool(42)
var wg sync.WaitGroup
for _, p := range cfg.Projects {
if !p.CatchUp.Enabled {
continue
}
g := &run.Group{}

job := jobs.ProjectCatchUpJob{
Config: p,
GitLab: gitlabclient,
Exporter: exp,
pool := worker.NewWorkerPool(42)
{ // worker pool
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error { // execute
slog.Info("Starting worker pool")
pool.Start(ctx)
<-ctx.Done()
return ctx.Err()
}, func(err error) { // interrupt
defer cancel()
slog.Info("Stopping worker pool...")
pool.Stop()
slog.Info("Stopping worker pool... done")
})
}

WorkerPool: pool,
}
wg.Add(1)
go func() {
defer wg.Done()
job.Run(ctx)
}()
{ // jobs
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error { // execute
var wg sync.WaitGroup
for _, p := range cfg.Projects {
if !p.CatchUp.Enabled {
continue
}

job := jobs.ProjectCatchUpJob{
Config: p,
GitLab: gitlabclient,
Exporter: exp,

WorkerPool: pool,
}
wg.Add(1)
go func() {
defer wg.Done()
job.Run(ctx)
}()
}

wg.Wait()
return nil
}, func(err error) { // interrupt
slog.Info("Cancelling jobs...")
cancel()
<-ctx.Done()
slog.Info("Cancelling jobs... done")
})
}

go func() {
// cancel context when work is done to stop worker pool
wg.Wait()
cancel()
}()
if cfg.HTTP.Enabled {
colls := []prometheus.Collector{
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
}
for _, endpoint := range cfg.Endpoints {
if mc := exp.MetricsCollectorFor(endpoint.Address); mc != nil {
colls = append(colls, mc)
}
}
reg := prometheus.NewRegistry()
reg.MustRegister(colls...)

slog.Info("Starting workers")
pool.Start(ctx)
g.Add(serveHTTP(cfg.HTTP, reg))
}

go startServer(ctx, cfg.HTTP, func() error { return nil })
{ // signal handler
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
g.Add(func() error { // execute
<-ctx.Done()
return ctx.Err()
}, func(err error) { // interrupt
cancel()
})
}

<-ctx.Done()
return nil
return g.Run()
}
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type RootConfig struct {

out io.Writer
flags *flag.FlagSet

debug bool
}

func NewRootCmd(out io.Writer) *cli.Command {
Expand Down Expand Up @@ -52,6 +54,7 @@ func (c *RootConfig) RegisterFlags(fs *flag.FlagSet) {
fs.String("gitlab-api-token", defaults.GitLab.Api.Token, fmt.Sprintf("The GitLab API Token (default: '%s').", defaults.GitLab.Api.Token))

fs.StringVar(&c.filename, "config", "", "Configuration file to use.")
fs.BoolVar(&c.debug, "debug", false, "Enable debug mode.")
}

func (c *RootConfig) Exec(context.Context, []string) error {
Expand Down
Loading

0 comments on commit 10bad08

Please sign in to comment.