Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(contributoor): add events shell + sink(s) #1

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 126 additions & 76 deletions cmd/sentry/main.go
Original file line number Diff line number Diff line change
@@ -1,96 +1,146 @@
package main

import (
"encoding/json"
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/bufbuild/protovalidate-go"
"github.com/ethpandaops/contributoor/pkg/config/v1"
"google.golang.org/protobuf/encoding/protojson"

"github.com/ethpandaops/contributoor/pkg/ethereum"
"github.com/ethpandaops/contributoor/pkg/sinks"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
)

var (
cfgFile string
log = logrus.New()
"github.com/urfave/cli/v2"
)

var rootCmd = &cobra.Command{
Use: "sentry",
Short: "Contributoor sentry node",
Run: func(cmd *cobra.Command, args []string) {
config, err := loadConfig(cfgFile)
if err != nil {
log.Fatal(err)
}

logCtx := log.WithFields(logrus.Fields{
"config_path": config.ContributoorDirectory,
"run_method": config.RunMethod,
"network_name": config.NetworkName,
"beacon_address": config.BeaconNodeAddress,
"output_server": config.OutputServer.Address,
"version": config.Version,
})

logCtx.Info("Starting sentry...")

// Wait for interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

log.Info("Shutting down...")
},
}

func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file path")
}
var log = logrus.New()

func main() {
if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}

func loadConfig(path string) (*config.Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}

// First unmarshal YAML into a map
var yamlMap map[string]interface{}
if err := yaml.Unmarshal(data, &yamlMap); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Convert YAML to JSON
jsonBytes, err := json.Marshal(yamlMap)
if err != nil {
return nil, err
}
// Handle graceful shutdown.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

cfg := &config.Config{}
if err := protojson.Unmarshal(jsonBytes, cfg); err != nil {
return nil, err
}
go func() {
<-sigChan

validator, err := protovalidate.New()
if err != nil {
return nil, err
log.Info("Received shutdown signal")

cancel()
}()

app := &cli.App{
Name: "sentry",
Usage: "Contributoor sentry node",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Usage: "config file path",
Required: true,
},
&cli.BoolFlag{
Name: "debug",
Usage: "debug mode",
Value: false,
},
},
Action: func(c *cli.Context) error {
cfg, err := config.NewConfigFromPath(c.String("config"))
if err != nil {
return err
}

name := fmt.Sprintf("%s_contributoor", strings.ToLower(cfg.NetworkName.String()))

log.WithFields(logrus.Fields{
"config_path": cfg.ContributoorDirectory,
"name": name,
"version": cfg.Version,
}).Info("Starting sentry")

var activeSinks []sinks.ContributoorSink

// Always create stdout sink in debug mode.
if c.Bool("debug") {
stdoutSink, serr := sinks.NewStdoutSink(log, cfg, name)
if serr != nil {
return serr
}

activeSinks = append(activeSinks, stdoutSink)
}

xatuSink, err := sinks.NewXatuSink(log, cfg, name)
if err != nil {
return err
}

activeSinks = append(activeSinks, xatuSink)

for _, sink := range activeSinks {
if serr := sink.Start(c.Context); serr != nil {
return serr
}
}

// Create beacon node with sinks
beaconOpts := ethereum.Options{}
ethConf := &ethereum.Config{
BeaconNodeAddress: cfg.BeaconNodeAddress,
OverrideNetworkName: name,
}

b, err := ethereum.NewBeaconNode(log, ethConf, name, activeSinks, &beaconOpts)
if err != nil {
return err
}

// Publish on done channel when we're finished cleaning up.
done := make(chan struct{})

go func() {
<-c.Context.Done()

log.Info("Context cancelled, starting shutdown")

if err := b.Stop(context.Background()); err != nil {
log.WithError(err).Error("Failed to stop beacon node")
}

for _, sink := range activeSinks {
if err := sink.Stop(context.Background()); err != nil {
log.WithError(err).WithField("sink", sink.Name()).Error("Failed to stop sink")
}
}

log.Info("Shutdown complete")

close(done)
}()

if err := b.Start(c.Context); err != nil {
if err == context.Canceled {
// Wait for cleanup to complete.
<-done

return nil
}

return err
}

// Wait for shutdown to complete.
<-done

return nil
},
}

if err = validator.Validate(cfg); err != nil {
return nil, err
if err := app.RunContext(ctx, os.Args); err != nil && err != context.Canceled {
log.Fatal(err)
}

return cfg, nil
}
Loading