Skip to content

Commit

Permalink
Merge pull request #1 from ethpandaops/feat/shell-event-structure
Browse files Browse the repository at this point in the history
feat(contributoor): add events shell + sink(s)
  • Loading branch information
mattevans authored Jan 7, 2025
2 parents 88b2e39 + 68cd403 commit c436b98
Show file tree
Hide file tree
Showing 16 changed files with 1,893 additions and 109 deletions.
202 changes: 126 additions & 76 deletions cmd/sentry/main.go
Original file line number Diff line number Diff line change
@@ -1,96 +1,146 @@
package main

import (
"encoding/json"
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/bufbuild/protovalidate-go"
"github.com/ethpandaops/contributoor/pkg/config/v1"
"google.golang.org/protobuf/encoding/protojson"

"github.com/ethpandaops/contributoor/pkg/ethereum"
"github.com/ethpandaops/contributoor/pkg/sinks"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
)

var (
cfgFile string
log = logrus.New()
"github.com/urfave/cli/v2"
)

var rootCmd = &cobra.Command{
Use: "sentry",
Short: "Contributoor sentry node",
Run: func(cmd *cobra.Command, args []string) {
config, err := loadConfig(cfgFile)
if err != nil {
log.Fatal(err)
}

logCtx := log.WithFields(logrus.Fields{
"config_path": config.ContributoorDirectory,
"run_method": config.RunMethod,
"network_name": config.NetworkName,
"beacon_address": config.BeaconNodeAddress,
"output_server": config.OutputServer.Address,
"version": config.Version,
})

logCtx.Info("Starting sentry...")

// Wait for interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

log.Info("Shutting down...")
},
}

func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file path")
}
var log = logrus.New()

func main() {
if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}

func loadConfig(path string) (*config.Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}

// First unmarshal YAML into a map
var yamlMap map[string]interface{}
if err := yaml.Unmarshal(data, &yamlMap); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Convert YAML to JSON
jsonBytes, err := json.Marshal(yamlMap)
if err != nil {
return nil, err
}
// Handle graceful shutdown.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

cfg := &config.Config{}
if err := protojson.Unmarshal(jsonBytes, cfg); err != nil {
return nil, err
}
go func() {
<-sigChan

validator, err := protovalidate.New()
if err != nil {
return nil, err
log.Info("Received shutdown signal")

cancel()
}()

app := &cli.App{
Name: "sentry",
Usage: "Contributoor sentry node",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Usage: "config file path",
Required: true,
},
&cli.BoolFlag{
Name: "debug",
Usage: "debug mode",
Value: false,
},
},
Action: func(c *cli.Context) error {
cfg, err := config.NewConfigFromPath(c.String("config"))
if err != nil {
return err
}

name := fmt.Sprintf("%s_contributoor", strings.ToLower(cfg.NetworkName.String()))

log.WithFields(logrus.Fields{
"config_path": cfg.ContributoorDirectory,
"name": name,
"version": cfg.Version,
}).Info("Starting sentry")

var activeSinks []sinks.ContributoorSink

// Always create stdout sink in debug mode.
if c.Bool("debug") {
stdoutSink, serr := sinks.NewStdoutSink(log, cfg, name)
if serr != nil {
return serr
}

activeSinks = append(activeSinks, stdoutSink)
}

xatuSink, err := sinks.NewXatuSink(log, cfg, name)
if err != nil {
return err
}

activeSinks = append(activeSinks, xatuSink)

for _, sink := range activeSinks {
if serr := sink.Start(c.Context); serr != nil {
return serr
}
}

// Create beacon node with sinks
beaconOpts := ethereum.Options{}
ethConf := &ethereum.Config{
BeaconNodeAddress: cfg.BeaconNodeAddress,
OverrideNetworkName: name,
}

b, err := ethereum.NewBeaconNode(log, ethConf, name, activeSinks, &beaconOpts)
if err != nil {
return err
}

// Publish on done channel when we're finished cleaning up.
done := make(chan struct{})

go func() {
<-c.Context.Done()

log.Info("Context cancelled, starting shutdown")

if err := b.Stop(context.Background()); err != nil {
log.WithError(err).Error("Failed to stop beacon node")
}

for _, sink := range activeSinks {
if err := sink.Stop(context.Background()); err != nil {
log.WithError(err).WithField("sink", sink.Name()).Error("Failed to stop sink")
}
}

log.Info("Shutdown complete")

close(done)
}()

if err := b.Start(c.Context); err != nil {
if err == context.Canceled {
// Wait for cleanup to complete.
<-done

return nil
}

return err
}

// Wait for shutdown to complete.
<-done

return nil
},
}

if err = validator.Validate(cfg); err != nil {
return nil, err
if err := app.RunContext(ctx, os.Args); err != nil && err != context.Canceled {
log.Fatal(err)
}

return cfg, nil
}
Loading

0 comments on commit c436b98

Please sign in to comment.