Skip to content

Commit

Permalink
feat: rename relayer to streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikelle committed Dec 10, 2024
1 parent 29dd768 commit 74f9b77
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 363 deletions.
52 changes: 26 additions & 26 deletions cl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ go build -o consensus-client cmd/redisapp/main.go

The consensus client can be configured via command-line flags, environment variables, or a YAML configuration file.

#### Command-Line Flags for Relayer
#### Command-Line Flags for Streamer

- `--instance-id`: **(Required)** Unique instance ID for this node.
- `--eth-client-url`: Ethereum client URL (default: `http://localhost:8551`).
Expand Down Expand Up @@ -209,55 +209,55 @@ Run the client with the configuration file:

- **Multiple Instances**: You can run multiple instances of the consensus client by changing the `--instance-id` and `--eth-client-url` parameters.

## Running the Relayer
## Running the Streamer

The Relayer is responsible for streaming payloads to member nodes, allowing them to apply these payloads to their respective Geth instances.
The Streamer is responsible for streaming payloads to member nodes, allowing them to apply these payloads to their respective Geth instances.

### Build the Relayer
### Build the Streamer

Ensure all dependencies are installed and build the Relayer application:
Ensure all dependencies are installed and build the Streamer application:

```bash
go mod tidy
go build -o relayer cmd/relayer/main.go
go build -o streamer cmd/streamer/main.go
```

### Relayer Configuration
### Streamer Configuration

The Relayer can be configured via command-line flags, environment variables, or a YAML configuration file.
The Streamer can be configured via command-line flags, environment variables, or a YAML configuration file.

#### Command-Line Flags

- `--config`: Path to config file.
- `--redis-addr`: Redis address (default: 127.0.0.1:7001).
- `--listen-addr`: Relayer listen address (default: :50051).
- `--listen-addr`: Streamer listen address (default: :50051).
- `--log-fmt`: Log format to use, options are text or json (default: text).
- `--log-level`: Log level to use, options are debug, info, warn, error (default: info).

#### Environment Variables

- `RELAYER_CONFIG`
- `RELAYER_REDIS_ADDR`
- `RELAYER_LISTEN_ADDR`
- `RELAYER_LOG_FMT`
- `RELAYER_LOG_LEVEL`
- `STREAMER_CONFIG`
- `STREAMER_REDIS_ADDR`
- `STREAMER_LISTEN_ADDR`
- `STREAMER_LOG_FMT`
- `STREAMER_LOG_LEVEL`

#### Run the Relayer
#### Run the Streamer

Run the Relayer using command-line flags:
Run the Streamer using command-line flags:

```bash
./relayer start \
./streamer start \
--config "config.yaml" \
--redis-addr "127.0.0.1:7001" \
--listen-addr ":50051" \
--log-fmt "json" \
--log-level "info"
```

#### Using a Configuration File for Relayer
#### Using a Configuration File for Streamer

Create a `relayer_config.yaml` file:
Create a `streamer_config.yaml` file:

```yaml
redis-addr: "127.0.0.1:7001"
Expand All @@ -266,15 +266,15 @@ log-fmt: "json"
log-level: "info"
```
Run the Relayer with the configuration file:
Run the Streamer with the configuration file:
```bash
./relayer start --config relayer_config.yaml
./streamer start --config streamer_config.yaml
```

## Running member nodes

Member nodes connect to the Relayer to receive payloads from the stream and apply them to their Geth instances.
Member nodes connect to the Streamer to receive payloads from the stream and apply them to their Geth instances.

### Build the Member Client

Expand All @@ -293,7 +293,7 @@ The Member Client can be configured via command-line flags, environment variable

- `--config`: Path to config file.
- `--client-id`: (Required) Unique client ID for this member.
- `--relayer-addr`: (Required) Relayer address.
- `--streamer-addr`: (Required) Streamer address.
- `--eth-client-url`: Ethereum client URL (default: <http://localhost:8551>).
- `--jwt-secret`: JWT secret for Ethereum client.
- `--log-fmt`: Log format to use, options are text or json (default: text).
Expand All @@ -303,7 +303,7 @@ The Member Client can be configured via command-line flags, environment variable

- `MEMBER_CONFIG`
- `MEMBER_CLIENT_ID`
- `MEMBER_RELAYER_ADDR`
- `MEMBER_STREAMER_ADDR`
- `MEMBER_ETH_CLIENT_URL`
- `MEMBER_JWT_SECRET`
- `MEMBER_LOG_FMT`
Expand All @@ -316,7 +316,7 @@ Run the Member Client using command-line flags:
```bash
./memberclient start \
--client-id "member1" \
--relayer-addr "http://localhost:50051" \
--streamer-addr "http://localhost:50051" \
--eth-client-url "http://localhost:8551" \
--jwt-secret "your_jwt_secret" \
--log-fmt "json" \
Expand All @@ -333,7 +333,7 @@ Create a member_config.yaml file:

```yaml
client-id: "member1"
relayer-addr: "http://localhost:50051"
streamer-addr: "http://localhost:50051"
eth-client-url: "http://localhost:8551"
jwt-secret: "your_jwt_secret"
log-fmt: "json"
Expand Down
18 changes: 9 additions & 9 deletions cl/cmd/member/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ var (
Required: true,
})

relayerAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "relayer-addr",
Usage: "Relayer address",
EnvVars: []string{"MEMBER_RELAYER_ADDR"},
streamerAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "streamer-addr",
Usage: "Streamer address",
EnvVars: []string{"MEMBER_STREAMER_ADDR"},
Required: true,
Action: func(_ *cli.Context, s string) error {
if _, err := url.Parse(s); err != nil {
return fmt.Errorf("invalid relayer-addr: %v", err)
return fmt.Errorf("invalid streamer-addr: %v", err)
}
return nil
},
Expand Down Expand Up @@ -87,7 +87,7 @@ var (

type Config struct {
ClientID string
RelayerAddr string
StreamerAddr string
EthClientURL string
JWTSecret string
}
Expand All @@ -96,7 +96,7 @@ func main() {
flags := []cli.Flag{
configFlag,
clientIDFlag,
relayerAddrFlag,
streamerAddrFlag,
ethClientURLFlag,
jwtSecretFlag,
logFmtFlag,
Expand Down Expand Up @@ -138,15 +138,15 @@ func startMemberClient(c *cli.Context) error {

cfg := Config{
ClientID: c.String(clientIDFlag.Name),
RelayerAddr: c.String(relayerAddrFlag.Name),
StreamerAddr: c.String(streamerAddrFlag.Name),
EthClientURL: c.String(ethClientURLFlag.Name),
JWTSecret: c.String(jwtSecretFlag.Name),
}

log.Info("Starting member client with configuration", "config", cfg)

// Initialize the MemberClient
memberClient, err := member.NewMemberClient(cfg.ClientID, cfg.RelayerAddr, cfg.EthClientURL, cfg.JWTSecret, log)
memberClient, err := member.NewMemberClient(cfg.ClientID, cfg.StreamerAddr, cfg.EthClientURL, cfg.JWTSecret, log)
if err != nil {
log.Error("Failed to initialize MemberClient", "error", err)
return err
Expand Down
42 changes: 21 additions & 21 deletions cl/cmd/relayer/main.go → cl/cmd/streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strconv"
"syscall"

"github.com/primev/mev-commit/cl/relayer"
"github.com/primev/mev-commit/cl/streamer"
"github.com/primev/mev-commit/x/util"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
Expand All @@ -18,13 +18,13 @@ var (
configFlag = &cli.StringFlag{
Name: "config",
Usage: "Path to config file",
EnvVars: []string{"RELAYER_CONFIG"},
EnvVars: []string{"STREAMER_CONFIG"},
}

redisAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "redis-addr",
Usage: "Redis address",
EnvVars: []string{"RELAYER_REDIS_ADDR"},
EnvVars: []string{"STREAMER_REDIS_ADDR"},
Value: "127.0.0.1:7001",
Action: func(_ *cli.Context, s string) error {
host, port, err := net.SplitHostPort(s)
Expand All @@ -43,22 +43,22 @@ var (

listenAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "listen-addr",
Usage: "Relayer listen address",
EnvVars: []string{"RELAYER_LISTEN_ADDR"},
Usage: "Streamer listen address",
EnvVars: []string{"STREAMER_LISTEN_ADDR"},
Value: ":50051",
})

logFmtFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "log-fmt",
Usage: "Log format to use, options are 'text' or 'json'",
EnvVars: []string{"RELAYER_LOG_FMT"},
EnvVars: []string{"STREAMER_LOG_FMT"},
Value: "text",
})

logLevelFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "log-level",
Usage: "Log level to use, options are 'debug', 'info', 'warn', 'error'",
EnvVars: []string{"RELAYER_LOG_LEVEL"},
EnvVars: []string{"STREAMER_LOG_LEVEL"},
Value: "info",
})
)
Expand All @@ -78,8 +78,8 @@ func main() {
}

app := &cli.App{
Name: "relayer",
Usage: "Start the relayer",
Name: "streamer",
Usage: "Start the streamer",
Flags: flags,
Before: altsrc.InitInputSourceWithContext(flags,
func(c *cli.Context) (altsrc.InputSourceContext, error) {
Expand All @@ -90,16 +90,16 @@ func main() {
return &altsrc.MapInputSource{}, nil
}),
Action: func(c *cli.Context) error {
return startRelayer(c)
return startStreamer(c)
},
}

if err := app.Run(os.Args); err != nil {
fmt.Println("Error running relayer:", err)
fmt.Println("Error running streamer:", err)
}
}

func startRelayer(c *cli.Context) error {
func startStreamer(c *cli.Context) error {
log, err := util.NewLogger(
c.String(logLevelFlag.Name),
c.String(logFmtFlag.Name),
Expand All @@ -116,30 +116,30 @@ func startRelayer(c *cli.Context) error {
ListenAddr: c.String(listenAddrFlag.Name),
}

log.Info("Starting relayer with configuration", "config", cfg)
log.Info("Starting streamer with configuration", "config", cfg)

// Initialize the Relayer
relayer, err := relayer.NewRelayer(cfg.RedisAddr, log)
// Initialize the Streamer
streamer, err := streamer.NewPayloadStreamer(cfg.RedisAddr, log)
if err != nil {
log.Error("Failed to initialize Relayer", "error", err)
log.Error("Failed to initialize Streamer", "error", err)
return err
}

ctx, stop := signal.NotifyContext(c.Context, os.Interrupt, syscall.SIGTERM)
defer stop()

// Start the relayer
// Start the streamer
go func() {
if err := relayer.Start(cfg.ListenAddr); err != nil {
log.Error("Relayer exited with error", "error", err)
if err := streamer.Start(cfg.ListenAddr); err != nil {
log.Error("Streamer exited with error", "error", err)
stop()
}
}()

<-ctx.Done()

relayer.Stop()
streamer.Stop()

log.Info("Relayer shutdown completed")
log.Info("Streamer shutdown completed")
return nil
}
34 changes: 17 additions & 17 deletions cl/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
)

type MemberClient struct {
clientID string
relayerAddr string
conn *grpc.ClientConn
client pb.RelayerClient
logger *slog.Logger
engineCl EngineClient
bb BlockBuilder
clientID string
streamerAddr string
conn *grpc.ClientConn
client pb.PayloadStreamerClient
logger *slog.Logger
engineCl EngineClient
bb BlockBuilder
}

type EngineClient interface {
Expand All @@ -39,12 +39,12 @@ type BlockBuilder interface {
FinalizeBlock(ctx context.Context, payloadIDStr, executionPayloadStr, msgID string) error
}

func NewMemberClient(clientID, relayerAddr, ecURL, jwtSecret string, logger *slog.Logger) (*MemberClient, error) {
conn, err := grpc.NewClient(relayerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
func NewMemberClient(clientID, streamerAddr, ecURL, jwtSecret string, logger *slog.Logger) (*MemberClient, error) {
conn, err := grpc.NewClient(streamerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
client := pb.NewRelayerClient(conn)
client := pb.NewPayloadStreamerClient(conn)

bytes, err := hex.DecodeString(jwtSecret)
if err != nil {
Expand All @@ -59,13 +59,13 @@ func NewMemberClient(clientID, relayerAddr, ecURL, jwtSecret string, logger *slo
bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger)

return &MemberClient{
clientID: clientID,
relayerAddr: relayerAddr,
conn: conn,
client: client,
engineCl: engineCL,
logger: logger,
bb: bb,
clientID: clientID,
streamerAddr: streamerAddr,
conn: conn,
client: client,
engineCl: engineCL,
logger: logger,
bb: bb,
}, nil
}

Expand Down
Loading

0 comments on commit 74f9b77

Please sign in to comment.