diff --git a/cl/README.md b/cl/README.md index 0f8973342..106edba73 100644 --- a/cl/README.md +++ b/cl/README.md @@ -152,7 +152,7 @@ go build -o consensus-client cmd/redisapp/main.go The consensus client can be configured via command-line flags, environment variables, or a YAML configuration file. -#### Command-Line Flags for Relayer +#### Command-Line Flags for Streamer - `--instance-id`: **(Required)** Unique instance ID for this node. - `--eth-client-url`: Ethereum client URL (default: `http://localhost:8551`). @@ -209,45 +209,45 @@ Run the client with the configuration file: - **Multiple Instances**: You can run multiple instances of the consensus client by changing the `--instance-id` and `--eth-client-url` parameters. -## Running the Relayer +## Running the Streamer -The Relayer is responsible for streaming payloads to member nodes, allowing them to apply these payloads to their respective Geth instances. +The Streamer is responsible for streaming payloads to member nodes, allowing them to apply these payloads to their respective Geth instances. -### Build the Relayer +### Build the Streamer -Ensure all dependencies are installed and build the Relayer application: +Ensure all dependencies are installed and build the Streamer application: ```bash go mod tidy -go build -o relayer cmd/relayer/main.go +go build -o streamer cmd/streamer/main.go ``` -### Relayer Configuration +### Streamer Configuration -The Relayer can be configured via command-line flags, environment variables, or a YAML configuration file. +The Streamer can be configured via command-line flags, environment variables, or a YAML configuration file. #### Command-Line Flags - `--config`: Path to config file. - `--redis-addr`: Redis address (default: 127.0.0.1:7001). -- `--listen-addr`: Relayer listen address (default: :50051). +- `--listen-addr`: Streamer listen address (default: :50051). - `--log-fmt`: Log format to use, options are text or json (default: text). - `--log-level`: Log level to use, options are debug, info, warn, error (default: info). #### Environment Variables -- `RELAYER_CONFIG` -- `RELAYER_REDIS_ADDR` -- `RELAYER_LISTEN_ADDR` -- `RELAYER_LOG_FMT` -- `RELAYER_LOG_LEVEL` +- `STREAMER_CONFIG` +- `STREAMER_REDIS_ADDR` +- `STREAMER_LISTEN_ADDR` +- `STREAMER_LOG_FMT` +- `STREAMER_LOG_LEVEL` -#### Run the Relayer +#### Run the Streamer -Run the Relayer using command-line flags: +Run the Streamer using command-line flags: ```bash -./relayer start \ +./streamer start \ --config "config.yaml" \ --redis-addr "127.0.0.1:7001" \ --listen-addr ":50051" \ @@ -255,9 +255,9 @@ Run the Relayer using command-line flags: --log-level "info" ``` -#### Using a Configuration File for Relayer +#### Using a Configuration File for Streamer -Create a `relayer_config.yaml` file: +Create a `streamer_config.yaml` file: ```yaml redis-addr: "127.0.0.1:7001" @@ -266,15 +266,15 @@ log-fmt: "json" log-level: "info" ``` -Run the Relayer with the configuration file: +Run the Streamer with the configuration file: ```bash -./relayer start --config relayer_config.yaml +./streamer start --config streamer_config.yaml ``` ## Running member nodes -Member nodes connect to the Relayer to receive payloads from the stream and apply them to their Geth instances. +Member nodes connect to the Streamer to receive payloads from the stream and apply them to their Geth instances. ### Build the Member Client @@ -293,7 +293,7 @@ The Member Client can be configured via command-line flags, environment variable - `--config`: Path to config file. - `--client-id`: (Required) Unique client ID for this member. -- `--relayer-addr`: (Required) Relayer address. +- `--streamer-addr`: (Required) Streamer address. - `--eth-client-url`: Ethereum client URL (default: ). - `--jwt-secret`: JWT secret for Ethereum client. - `--log-fmt`: Log format to use, options are text or json (default: text). @@ -303,7 +303,7 @@ The Member Client can be configured via command-line flags, environment variable - `MEMBER_CONFIG` - `MEMBER_CLIENT_ID` -- `MEMBER_RELAYER_ADDR` +- `MEMBER_STREAMER_ADDR` - `MEMBER_ETH_CLIENT_URL` - `MEMBER_JWT_SECRET` - `MEMBER_LOG_FMT` @@ -316,7 +316,7 @@ Run the Member Client using command-line flags: ```bash ./memberclient start \ --client-id "member1" \ - --relayer-addr "http://localhost:50051" \ + --streamer-addr "http://localhost:50051" \ --eth-client-url "http://localhost:8551" \ --jwt-secret "your_jwt_secret" \ --log-fmt "json" \ @@ -333,7 +333,7 @@ Create a member_config.yaml file: ```yaml client-id: "member1" -relayer-addr: "http://localhost:50051" +streamer-addr: "http://localhost:50051" eth-client-url: "http://localhost:8551" jwt-secret: "your_jwt_secret" log-fmt: "json" diff --git a/cl/cmd/member/main.go b/cl/cmd/member/main.go index 405b78235..19daaef57 100644 --- a/cl/cmd/member/main.go +++ b/cl/cmd/member/main.go @@ -28,14 +28,14 @@ var ( Required: true, }) - relayerAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{ - Name: "relayer-addr", - Usage: "Relayer address", - EnvVars: []string{"MEMBER_RELAYER_ADDR"}, + streamerAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "streamer-addr", + Usage: "Streamer address", + EnvVars: []string{"MEMBER_STREAMER_ADDR"}, Required: true, Action: func(_ *cli.Context, s string) error { if _, err := url.Parse(s); err != nil { - return fmt.Errorf("invalid relayer-addr: %v", err) + return fmt.Errorf("invalid streamer-addr: %v", err) } return nil }, @@ -87,7 +87,7 @@ var ( type Config struct { ClientID string - RelayerAddr string + StreamerAddr string EthClientURL string JWTSecret string } @@ -96,7 +96,7 @@ func main() { flags := []cli.Flag{ configFlag, clientIDFlag, - relayerAddrFlag, + streamerAddrFlag, ethClientURLFlag, jwtSecretFlag, logFmtFlag, @@ -138,7 +138,7 @@ func startMemberClient(c *cli.Context) error { cfg := Config{ ClientID: c.String(clientIDFlag.Name), - RelayerAddr: c.String(relayerAddrFlag.Name), + StreamerAddr: c.String(streamerAddrFlag.Name), EthClientURL: c.String(ethClientURLFlag.Name), JWTSecret: c.String(jwtSecretFlag.Name), } @@ -146,7 +146,7 @@ func startMemberClient(c *cli.Context) error { log.Info("Starting member client with configuration", "config", cfg) // Initialize the MemberClient - memberClient, err := member.NewMemberClient(cfg.ClientID, cfg.RelayerAddr, cfg.EthClientURL, cfg.JWTSecret, log) + memberClient, err := member.NewMemberClient(cfg.ClientID, cfg.StreamerAddr, cfg.EthClientURL, cfg.JWTSecret, log) if err != nil { log.Error("Failed to initialize MemberClient", "error", err) return err diff --git a/cl/cmd/relayer/main.go b/cl/cmd/streamer/main.go similarity index 73% rename from cl/cmd/relayer/main.go rename to cl/cmd/streamer/main.go index af15f1d27..a955796a0 100644 --- a/cl/cmd/relayer/main.go +++ b/cl/cmd/streamer/main.go @@ -8,7 +8,7 @@ import ( "strconv" "syscall" - "github.com/primev/mev-commit/cl/relayer" + "github.com/primev/mev-commit/cl/streamer" "github.com/primev/mev-commit/x/util" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" @@ -18,13 +18,13 @@ var ( configFlag = &cli.StringFlag{ Name: "config", Usage: "Path to config file", - EnvVars: []string{"RELAYER_CONFIG"}, + EnvVars: []string{"STREAMER_CONFIG"}, } redisAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "redis-addr", Usage: "Redis address", - EnvVars: []string{"RELAYER_REDIS_ADDR"}, + EnvVars: []string{"STREAMER_REDIS_ADDR"}, Value: "127.0.0.1:7001", Action: func(_ *cli.Context, s string) error { host, port, err := net.SplitHostPort(s) @@ -43,22 +43,22 @@ var ( listenAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "listen-addr", - Usage: "Relayer listen address", - EnvVars: []string{"RELAYER_LISTEN_ADDR"}, + Usage: "Streamer listen address", + EnvVars: []string{"STREAMER_LISTEN_ADDR"}, Value: ":50051", }) logFmtFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "log-fmt", Usage: "Log format to use, options are 'text' or 'json'", - EnvVars: []string{"RELAYER_LOG_FMT"}, + EnvVars: []string{"STREAMER_LOG_FMT"}, Value: "text", }) logLevelFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "log-level", Usage: "Log level to use, options are 'debug', 'info', 'warn', 'error'", - EnvVars: []string{"RELAYER_LOG_LEVEL"}, + EnvVars: []string{"STREAMER_LOG_LEVEL"}, Value: "info", }) ) @@ -78,8 +78,8 @@ func main() { } app := &cli.App{ - Name: "relayer", - Usage: "Start the relayer", + Name: "streamer", + Usage: "Start the streamer", Flags: flags, Before: altsrc.InitInputSourceWithContext(flags, func(c *cli.Context) (altsrc.InputSourceContext, error) { @@ -90,16 +90,16 @@ func main() { return &altsrc.MapInputSource{}, nil }), Action: func(c *cli.Context) error { - return startRelayer(c) + return startStreamer(c) }, } if err := app.Run(os.Args); err != nil { - fmt.Println("Error running relayer:", err) + fmt.Println("Error running streamer:", err) } } -func startRelayer(c *cli.Context) error { +func startStreamer(c *cli.Context) error { log, err := util.NewLogger( c.String(logLevelFlag.Name), c.String(logFmtFlag.Name), @@ -116,30 +116,30 @@ func startRelayer(c *cli.Context) error { ListenAddr: c.String(listenAddrFlag.Name), } - log.Info("Starting relayer with configuration", "config", cfg) + log.Info("Starting streamer with configuration", "config", cfg) - // Initialize the Relayer - relayer, err := relayer.NewRelayer(cfg.RedisAddr, log) + // Initialize the Streamer + streamer, err := streamer.NewPayloadStreamer(cfg.RedisAddr, log) if err != nil { - log.Error("Failed to initialize Relayer", "error", err) + log.Error("Failed to initialize Streamer", "error", err) return err } ctx, stop := signal.NotifyContext(c.Context, os.Interrupt, syscall.SIGTERM) defer stop() - // Start the relayer + // Start the streamer go func() { - if err := relayer.Start(cfg.ListenAddr); err != nil { - log.Error("Relayer exited with error", "error", err) + if err := streamer.Start(cfg.ListenAddr); err != nil { + log.Error("Streamer exited with error", "error", err) stop() } }() <-ctx.Done() - relayer.Stop() + streamer.Stop() - log.Info("Relayer shutdown completed") + log.Info("Streamer shutdown completed") return nil } diff --git a/cl/member/member.go b/cl/member/member.go index 2cb349abe..5ce0abb33 100644 --- a/cl/member/member.go +++ b/cl/member/member.go @@ -20,13 +20,13 @@ import ( ) type MemberClient struct { - clientID string - relayerAddr string - conn *grpc.ClientConn - client pb.RelayerClient - logger *slog.Logger - engineCl EngineClient - bb BlockBuilder + clientID string + streamerAddr string + conn *grpc.ClientConn + client pb.PayloadStreamerClient + logger *slog.Logger + engineCl EngineClient + bb BlockBuilder } type EngineClient interface { @@ -39,12 +39,12 @@ type BlockBuilder interface { FinalizeBlock(ctx context.Context, payloadIDStr, executionPayloadStr, msgID string) error } -func NewMemberClient(clientID, relayerAddr, ecURL, jwtSecret string, logger *slog.Logger) (*MemberClient, error) { - conn, err := grpc.NewClient(relayerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) +func NewMemberClient(clientID, streamerAddr, ecURL, jwtSecret string, logger *slog.Logger) (*MemberClient, error) { + conn, err := grpc.NewClient(streamerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } - client := pb.NewRelayerClient(conn) + client := pb.NewPayloadStreamerClient(conn) bytes, err := hex.DecodeString(jwtSecret) if err != nil { @@ -59,13 +59,13 @@ func NewMemberClient(clientID, relayerAddr, ecURL, jwtSecret string, logger *slo bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger) return &MemberClient{ - clientID: clientID, - relayerAddr: relayerAddr, - conn: conn, - client: client, - engineCl: engineCL, - logger: logger, - bb: bb, + clientID: clientID, + streamerAddr: streamerAddr, + conn: conn, + client: client, + engineCl: engineCL, + logger: logger, + bb: bb, }, nil } diff --git a/cl/member/member_test.go b/cl/member/member_test.go index 9bbef7889..386ae80e8 100644 --- a/cl/member/member_test.go +++ b/cl/member/member_test.go @@ -61,9 +61,9 @@ func (f *mockBlockBuilder) Calls() []finalizeCall { return append([]finalizeCall(nil), f.finalizeCalls...) } -// fakeRelayerServer simulates the Relayer gRPC service for testing. -type fakeRelayerServer struct { - pb.UnimplementedRelayerServer +// fakePayloadStreamerServer simulates the PayloadStreamer gRPC service for testing. +type fakePayloadStreamerServer struct { + pb.UnimplementedPayloadStreamerServer mu sync.Mutex subscribed bool @@ -72,7 +72,7 @@ type fakeRelayerServer struct { serverStopped bool } -func (s *fakeRelayerServer) Subscribe(stream pb.Relayer_SubscribeServer) error { +func (s *fakePayloadStreamerServer) Subscribe(stream pb.PayloadStreamer_SubscribeServer) error { for { msg, err := stream.Recv() if err == io.EOF || s.serverStopped { @@ -121,8 +121,8 @@ func TestMemberClientRun(t *testing.T) { s := grpc.NewServer() defer s.Stop() - relayerServer := &fakeRelayerServer{} - pb.RegisterRelayerServer(s, relayerServer) + streamerServer := &fakePayloadStreamerServer{} + pb.RegisterPayloadStreamerServer(s, streamerServer) errChan := make(chan error, 1) go func() { @@ -139,26 +139,26 @@ func TestMemberClientRun(t *testing.T) { } clientID := "test-client-id" - relayerAddr := lis.Addr().String() + streamerAddr := lis.Addr().String() logger := slog.Default() engineClient := &mockEngineClient{} blockBuilder := &mockBlockBuilder{} - conn, err := grpc.NewClient(relayerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(streamerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("failed to dial test server: %v", err) } - relayerClient := pb.NewRelayerClient(conn) + streamerClient := pb.NewPayloadStreamerClient(conn) mc := &MemberClient{ - clientID: clientID, - relayerAddr: relayerAddr, - conn: conn, - client: relayerClient, - logger: logger, - engineCl: engineClient, - bb: blockBuilder, + clientID: clientID, + streamerAddr: streamerAddr, + conn: conn, + client: streamerClient, + logger: logger, + engineCl: engineClient, + bb: blockBuilder, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -169,10 +169,10 @@ func TestMemberClientRun(t *testing.T) { t.Errorf("MemberClient.Run returned an error: %v", err) } - relayerServer.mu.Lock() - subscribed := relayerServer.subscribed - sentPayload := relayerServer.sentPayload - relayerServer.mu.Unlock() + streamerServer.mu.Lock() + subscribed := streamerServer.subscribed + sentPayload := streamerServer.sentPayload + streamerServer.mu.Unlock() if !subscribed { t.Errorf("Server did not receive subscription from client") diff --git a/cl/pb/pb/relayer_grpc.pb.go b/cl/pb/pb/relayer_grpc.pb.go deleted file mode 100644 index 329d17c57..000000000 --- a/cl/pb/pb/relayer_grpc.pb.go +++ /dev/null @@ -1,141 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v3.19.1 -// source: relayer.proto - -package pb - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -const ( - Relayer_Subscribe_FullMethodName = "/pb.Relayer/Subscribe" -) - -// RelayerClient is the client API for Relayer service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type RelayerClient interface { - Subscribe(ctx context.Context, opts ...grpc.CallOption) (Relayer_SubscribeClient, error) -} - -type relayerClient struct { - cc grpc.ClientConnInterface -} - -func NewRelayerClient(cc grpc.ClientConnInterface) RelayerClient { - return &relayerClient{cc} -} - -func (c *relayerClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (Relayer_SubscribeClient, error) { - stream, err := c.cc.NewStream(ctx, &Relayer_ServiceDesc.Streams[0], Relayer_Subscribe_FullMethodName, opts...) - if err != nil { - return nil, err - } - x := &relayerSubscribeClient{stream} - return x, nil -} - -type Relayer_SubscribeClient interface { - Send(*ClientMessage) error - Recv() (*PayloadMessage, error) - grpc.ClientStream -} - -type relayerSubscribeClient struct { - grpc.ClientStream -} - -func (x *relayerSubscribeClient) Send(m *ClientMessage) error { - return x.ClientStream.SendMsg(m) -} - -func (x *relayerSubscribeClient) Recv() (*PayloadMessage, error) { - m := new(PayloadMessage) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// RelayerServer is the server API for Relayer service. -// All implementations must embed UnimplementedRelayerServer -// for forward compatibility -type RelayerServer interface { - Subscribe(Relayer_SubscribeServer) error - mustEmbedUnimplementedRelayerServer() -} - -// UnimplementedRelayerServer must be embedded to have forward compatible implementations. -type UnimplementedRelayerServer struct { -} - -func (UnimplementedRelayerServer) Subscribe(Relayer_SubscribeServer) error { - return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") -} -func (UnimplementedRelayerServer) mustEmbedUnimplementedRelayerServer() {} - -// UnsafeRelayerServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to RelayerServer will -// result in compilation errors. -type UnsafeRelayerServer interface { - mustEmbedUnimplementedRelayerServer() -} - -func RegisterRelayerServer(s grpc.ServiceRegistrar, srv RelayerServer) { - s.RegisterService(&Relayer_ServiceDesc, srv) -} - -func _Relayer_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(RelayerServer).Subscribe(&relayerSubscribeServer{stream}) -} - -type Relayer_SubscribeServer interface { - Send(*PayloadMessage) error - Recv() (*ClientMessage, error) - grpc.ServerStream -} - -type relayerSubscribeServer struct { - grpc.ServerStream -} - -func (x *relayerSubscribeServer) Send(m *PayloadMessage) error { - return x.ServerStream.SendMsg(m) -} - -func (x *relayerSubscribeServer) Recv() (*ClientMessage, error) { - m := new(ClientMessage) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// Relayer_ServiceDesc is the grpc.ServiceDesc for Relayer service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Relayer_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "pb.Relayer", - HandlerType: (*RelayerServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "Subscribe", - Handler: _Relayer_Subscribe_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "relayer.proto", -} diff --git a/cl/pb/pb/relayer.pb.go b/cl/pb/pb/streamer.pb.go similarity index 60% rename from cl/pb/pb/relayer.pb.go rename to cl/pb/pb/streamer.pb.go index 652b326a9..c5fe2ad62 100644 --- a/cl/pb/pb/relayer.pb.go +++ b/cl/pb/pb/streamer.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.34.2 // protoc v3.19.1 -// source: relayer.proto +// source: streamer.proto package pb @@ -35,7 +35,7 @@ type ClientMessage struct { func (x *ClientMessage) Reset() { *x = ClientMessage{} if protoimpl.UnsafeEnabled { - mi := &file_relayer_proto_msgTypes[0] + mi := &file_streamer_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -48,7 +48,7 @@ func (x *ClientMessage) String() string { func (*ClientMessage) ProtoMessage() {} func (x *ClientMessage) ProtoReflect() protoreflect.Message { - mi := &file_relayer_proto_msgTypes[0] + mi := &file_streamer_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -61,7 +61,7 @@ func (x *ClientMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ClientMessage.ProtoReflect.Descriptor instead. func (*ClientMessage) Descriptor() ([]byte, []int) { - return file_relayer_proto_rawDescGZIP(), []int{0} + return file_streamer_proto_rawDescGZIP(), []int{0} } func (m *ClientMessage) GetMessage() isClientMessage_Message { @@ -112,7 +112,7 @@ type SubscribeRequest struct { func (x *SubscribeRequest) Reset() { *x = SubscribeRequest{} if protoimpl.UnsafeEnabled { - mi := &file_relayer_proto_msgTypes[1] + mi := &file_streamer_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -125,7 +125,7 @@ func (x *SubscribeRequest) String() string { func (*SubscribeRequest) ProtoMessage() {} func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { - mi := &file_relayer_proto_msgTypes[1] + mi := &file_streamer_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -138,7 +138,7 @@ func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. func (*SubscribeRequest) Descriptor() ([]byte, []int) { - return file_relayer_proto_rawDescGZIP(), []int{1} + return file_streamer_proto_rawDescGZIP(), []int{1} } func (x *SubscribeRequest) GetClientId() string { @@ -162,7 +162,7 @@ type PayloadMessage struct { func (x *PayloadMessage) Reset() { *x = PayloadMessage{} if protoimpl.UnsafeEnabled { - mi := &file_relayer_proto_msgTypes[2] + mi := &file_streamer_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -175,7 +175,7 @@ func (x *PayloadMessage) String() string { func (*PayloadMessage) ProtoMessage() {} func (x *PayloadMessage) ProtoReflect() protoreflect.Message { - mi := &file_relayer_proto_msgTypes[2] + mi := &file_streamer_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -188,7 +188,7 @@ func (x *PayloadMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use PayloadMessage.ProtoReflect.Descriptor instead. func (*PayloadMessage) Descriptor() ([]byte, []int) { - return file_relayer_proto_rawDescGZIP(), []int{2} + return file_streamer_proto_rawDescGZIP(), []int{2} } func (x *PayloadMessage) GetPayloadId() string { @@ -232,7 +232,7 @@ type AckPayloadRequest struct { func (x *AckPayloadRequest) Reset() { *x = AckPayloadRequest{} if protoimpl.UnsafeEnabled { - mi := &file_relayer_proto_msgTypes[3] + mi := &file_streamer_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -245,7 +245,7 @@ func (x *AckPayloadRequest) String() string { func (*AckPayloadRequest) ProtoMessage() {} func (x *AckPayloadRequest) ProtoReflect() protoreflect.Message { - mi := &file_relayer_proto_msgTypes[3] + mi := &file_streamer_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -258,7 +258,7 @@ func (x *AckPayloadRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AckPayloadRequest.ProtoReflect.Descriptor instead. func (*AckPayloadRequest) Descriptor() ([]byte, []int) { - return file_relayer_proto_rawDescGZIP(), []int{3} + return file_streamer_proto_rawDescGZIP(), []int{3} } func (x *AckPayloadRequest) GetClientId() string { @@ -282,72 +282,73 @@ func (x *AckPayloadRequest) GetMessageId() string { return "" } -var File_relayer_proto protoreflect.FileDescriptor - -var file_relayer_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x02, 0x70, 0x62, 0x22, 0x99, 0x01, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x43, 0x0a, 0x11, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, - 0x62, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x14, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0b, 0x61, 0x63, - 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, - 0x2f, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, - 0x22, 0xa9, 0x01, 0x0a, 0x0e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, - 0x49, 0x64, 0x12, 0x2b, 0x0a, 0x11, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x65, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, - 0x2c, 0x0a, 0x12, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, - 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x73, 0x65, 0x6e, - 0x64, 0x65, 0x72, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x1d, 0x0a, - 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x6e, 0x0a, 0x11, - 0x41, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1d, - 0x0a, 0x0a, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x64, 0x12, 0x1d, 0x0a, - 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x32, 0x41, 0x0a, 0x07, - 0x52, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x12, 0x36, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x12, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, - 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var File_streamer_proto protoreflect.FileDescriptor + +var file_streamer_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x02, 0x70, 0x62, 0x22, 0x99, 0x01, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x43, 0x0a, 0x11, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0b, 0x61, + 0x63, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x63, 0x6b, 0x50, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x2f, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, + 0x64, 0x22, 0xa9, 0x01, 0x0a, 0x0e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x49, 0x64, 0x12, 0x2b, 0x0a, 0x11, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, + 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x2c, 0x0a, 0x12, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, + 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x73, 0x65, + 0x6e, 0x64, 0x65, 0x72, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x1d, + 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x6e, 0x0a, + 0x11, 0x41, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, + 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x64, 0x12, 0x1d, + 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x32, 0x49, 0x0a, + 0x0f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, + 0x12, 0x36, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x11, 0x2e, + 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_relayer_proto_rawDescOnce sync.Once - file_relayer_proto_rawDescData = file_relayer_proto_rawDesc + file_streamer_proto_rawDescOnce sync.Once + file_streamer_proto_rawDescData = file_streamer_proto_rawDesc ) -func file_relayer_proto_rawDescGZIP() []byte { - file_relayer_proto_rawDescOnce.Do(func() { - file_relayer_proto_rawDescData = protoimpl.X.CompressGZIP(file_relayer_proto_rawDescData) +func file_streamer_proto_rawDescGZIP() []byte { + file_streamer_proto_rawDescOnce.Do(func() { + file_streamer_proto_rawDescData = protoimpl.X.CompressGZIP(file_streamer_proto_rawDescData) }) - return file_relayer_proto_rawDescData + return file_streamer_proto_rawDescData } -var file_relayer_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_relayer_proto_goTypes = []any{ +var file_streamer_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_streamer_proto_goTypes = []any{ (*ClientMessage)(nil), // 0: pb.ClientMessage (*SubscribeRequest)(nil), // 1: pb.SubscribeRequest (*PayloadMessage)(nil), // 2: pb.PayloadMessage (*AckPayloadRequest)(nil), // 3: pb.AckPayloadRequest } -var file_relayer_proto_depIdxs = []int32{ +var file_streamer_proto_depIdxs = []int32{ 1, // 0: pb.ClientMessage.subscribe_request:type_name -> pb.SubscribeRequest 3, // 1: pb.ClientMessage.ack_payload:type_name -> pb.AckPayloadRequest - 0, // 2: pb.Relayer.Subscribe:input_type -> pb.ClientMessage - 2, // 3: pb.Relayer.Subscribe:output_type -> pb.PayloadMessage + 0, // 2: pb.PayloadStreamer.Subscribe:input_type -> pb.ClientMessage + 2, // 3: pb.PayloadStreamer.Subscribe:output_type -> pb.PayloadMessage 3, // [3:4] is the sub-list for method output_type 2, // [2:3] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name @@ -355,13 +356,13 @@ var file_relayer_proto_depIdxs = []int32{ 0, // [0:2] is the sub-list for field type_name } -func init() { file_relayer_proto_init() } -func file_relayer_proto_init() { - if File_relayer_proto != nil { +func init() { file_streamer_proto_init() } +func file_streamer_proto_init() { + if File_streamer_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_relayer_proto_msgTypes[0].Exporter = func(v any, i int) any { + file_streamer_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*ClientMessage); i { case 0: return &v.state @@ -373,7 +374,7 @@ func file_relayer_proto_init() { return nil } } - file_relayer_proto_msgTypes[1].Exporter = func(v any, i int) any { + file_streamer_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*SubscribeRequest); i { case 0: return &v.state @@ -385,7 +386,7 @@ func file_relayer_proto_init() { return nil } } - file_relayer_proto_msgTypes[2].Exporter = func(v any, i int) any { + file_streamer_proto_msgTypes[2].Exporter = func(v any, i int) any { switch v := v.(*PayloadMessage); i { case 0: return &v.state @@ -397,7 +398,7 @@ func file_relayer_proto_init() { return nil } } - file_relayer_proto_msgTypes[3].Exporter = func(v any, i int) any { + file_streamer_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*AckPayloadRequest); i { case 0: return &v.state @@ -410,7 +411,7 @@ func file_relayer_proto_init() { } } } - file_relayer_proto_msgTypes[0].OneofWrappers = []any{ + file_streamer_proto_msgTypes[0].OneofWrappers = []any{ (*ClientMessage_SubscribeRequest)(nil), (*ClientMessage_AckPayload)(nil), } @@ -418,18 +419,18 @@ func file_relayer_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_relayer_proto_rawDesc, + RawDescriptor: file_streamer_proto_rawDesc, NumEnums: 0, NumMessages: 4, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_relayer_proto_goTypes, - DependencyIndexes: file_relayer_proto_depIdxs, - MessageInfos: file_relayer_proto_msgTypes, + GoTypes: file_streamer_proto_goTypes, + DependencyIndexes: file_streamer_proto_depIdxs, + MessageInfos: file_streamer_proto_msgTypes, }.Build() - File_relayer_proto = out.File - file_relayer_proto_rawDesc = nil - file_relayer_proto_goTypes = nil - file_relayer_proto_depIdxs = nil + File_streamer_proto = out.File + file_streamer_proto_rawDesc = nil + file_streamer_proto_goTypes = nil + file_streamer_proto_depIdxs = nil } diff --git a/cl/pb/pb/streamer_grpc.pb.go b/cl/pb/pb/streamer_grpc.pb.go new file mode 100644 index 000000000..9b43768d8 --- /dev/null +++ b/cl/pb/pb/streamer_grpc.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.19.1 +// source: streamer.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + PayloadStreamer_Subscribe_FullMethodName = "/pb.PayloadStreamer/Subscribe" +) + +// PayloadStreamerClient is the client API for PayloadStreamer service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PayloadStreamerClient interface { + Subscribe(ctx context.Context, opts ...grpc.CallOption) (PayloadStreamer_SubscribeClient, error) +} + +type payloadStreamerClient struct { + cc grpc.ClientConnInterface +} + +func NewPayloadStreamerClient(cc grpc.ClientConnInterface) PayloadStreamerClient { + return &payloadStreamerClient{cc} +} + +func (c *payloadStreamerClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (PayloadStreamer_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &PayloadStreamer_ServiceDesc.Streams[0], PayloadStreamer_Subscribe_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &payloadStreamerSubscribeClient{stream} + return x, nil +} + +type PayloadStreamer_SubscribeClient interface { + Send(*ClientMessage) error + Recv() (*PayloadMessage, error) + grpc.ClientStream +} + +type payloadStreamerSubscribeClient struct { + grpc.ClientStream +} + +func (x *payloadStreamerSubscribeClient) Send(m *ClientMessage) error { + return x.ClientStream.SendMsg(m) +} + +func (x *payloadStreamerSubscribeClient) Recv() (*PayloadMessage, error) { + m := new(PayloadMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// PayloadStreamerServer is the server API for PayloadStreamer service. +// All implementations must embed UnimplementedPayloadStreamerServer +// for forward compatibility +type PayloadStreamerServer interface { + Subscribe(PayloadStreamer_SubscribeServer) error + mustEmbedUnimplementedPayloadStreamerServer() +} + +// UnimplementedPayloadStreamerServer must be embedded to have forward compatible implementations. +type UnimplementedPayloadStreamerServer struct { +} + +func (UnimplementedPayloadStreamerServer) Subscribe(PayloadStreamer_SubscribeServer) error { + return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") +} +func (UnimplementedPayloadStreamerServer) mustEmbedUnimplementedPayloadStreamerServer() {} + +// UnsafePayloadStreamerServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PayloadStreamerServer will +// result in compilation errors. +type UnsafePayloadStreamerServer interface { + mustEmbedUnimplementedPayloadStreamerServer() +} + +func RegisterPayloadStreamerServer(s grpc.ServiceRegistrar, srv PayloadStreamerServer) { + s.RegisterService(&PayloadStreamer_ServiceDesc, srv) +} + +func _PayloadStreamer_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(PayloadStreamerServer).Subscribe(&payloadStreamerSubscribeServer{stream}) +} + +type PayloadStreamer_SubscribeServer interface { + Send(*PayloadMessage) error + Recv() (*ClientMessage, error) + grpc.ServerStream +} + +type payloadStreamerSubscribeServer struct { + grpc.ServerStream +} + +func (x *payloadStreamerSubscribeServer) Send(m *PayloadMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *payloadStreamerSubscribeServer) Recv() (*ClientMessage, error) { + m := new(ClientMessage) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// PayloadStreamer_ServiceDesc is the grpc.ServiceDesc for PayloadStreamer service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PayloadStreamer_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.PayloadStreamer", + HandlerType: (*PayloadStreamerServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Subscribe", + Handler: _PayloadStreamer_Subscribe_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "streamer.proto", +} diff --git a/cl/pb/relayer.proto b/cl/pb/streamer.proto similarity index 95% rename from cl/pb/relayer.proto rename to cl/pb/streamer.proto index 9b22685af..09ba27dcf 100644 --- a/cl/pb/relayer.proto +++ b/cl/pb/streamer.proto @@ -4,7 +4,7 @@ package pb; option go_package = "./pb"; -service Relayer { +service PayloadStreamer { rpc Subscribe(stream ClientMessage) returns (stream PayloadMessage); } diff --git a/cl/relayer/relayer.go b/cl/streamer/streamer.go similarity index 57% rename from cl/relayer/relayer.go rename to cl/streamer/streamer.go index fafd77e30..c3a728036 100644 --- a/cl/relayer/relayer.go +++ b/cl/streamer/streamer.go @@ -1,4 +1,4 @@ -package relayer +package streamer import ( "context" @@ -26,14 +26,14 @@ const ( RedisMsgTypeNew RedisMsgType = ">" ) -type Relayer struct { - pb.UnimplementedRelayerServer +type PayloadStreamer struct { + pb.UnimplementedPayloadStreamerServer redisClient *redis.Client logger *slog.Logger server *grpc.Server } -func NewRelayer(redisAddr string, logger *slog.Logger) (*Relayer, error) { +func NewPayloadStreamer(redisAddr string, logger *slog.Logger) (*PayloadStreamer, error) { redisClient := redis.NewClient(&redis.Options{ Addr: redisAddr, }) @@ -44,40 +44,40 @@ func NewRelayer(redisAddr string, logger *slog.Logger) (*Relayer, error) { return nil, err } - return &Relayer{ + return &PayloadStreamer{ redisClient: redisClient, logger: logger, server: grpc.NewServer(), }, nil } -func (r *Relayer) Start(address string) error { +func (s *PayloadStreamer) Start(address string) error { lis, err := net.Listen("tcp", address) if err != nil { return err } - pb.RegisterRelayerServer(r.server, r) - reflection.Register(r.server) + pb.RegisterPayloadStreamerServer(s.server, s) + reflection.Register(s.server) - r.logger.Info("Relayer is listening", "address", address) - return r.server.Serve(lis) + s.logger.Info("PayloadStreamer is listening", "address", address) + return s.server.Serve(lis) } -func (r *Relayer) Stop() { - r.server.GracefulStop() - if err := r.redisClient.Close(); err != nil { - r.logger.Error("Error closing Redis client in Relayer", "error", err) +func (s *PayloadStreamer) Stop() { + s.server.GracefulStop() + if err := s.redisClient.Close(); err != nil { + s.logger.Error("Error closing Redis client in PayloadStreamer", "error", err) } } -func (r *Relayer) Subscribe(stream pb.Relayer_SubscribeServer) error { +func (s *PayloadStreamer) Subscribe(stream pb.PayloadStreamer_SubscribeServer) error { ctx := stream.Context() var clientID string firstMessage, err := stream.Recv() if err != nil { - r.logger.Error("Failed to receive initial message", "error", err) + s.logger.Error("Failed to receive initial message", "error", err) return err } if req := firstMessage.GetSubscribeRequest(); req != nil { @@ -89,34 +89,34 @@ func (r *Relayer) Subscribe(stream pb.Relayer_SubscribeServer) error { groupName := "member_group:" + clientID consumerName := "member_consumer:" + clientID - err = r.createConsumerGroup(ctx, groupName) + err = s.createConsumerGroup(ctx, groupName) if err != nil { - r.logger.Error("Failed to create consumer group", "clientID", clientID, "error", err) + s.logger.Error("Failed to create consumer group", "clientID", clientID, "error", err) return err } - r.logger.Info("Subscriber connected", "clientID", clientID) - return r.handleBidirectionalStream(stream, clientID, groupName, consumerName) + s.logger.Info("Subscriber connected", "clientID", clientID) + return s.handleBidirectionalStream(stream, clientID, groupName, consumerName) } -func (r *Relayer) createConsumerGroup(ctx context.Context, groupName string) error { - err := r.redisClient.XGroupCreateMkStream(ctx, blockStreamName, groupName, "0").Err() +func (s *PayloadStreamer) createConsumerGroup(ctx context.Context, groupName string) error { + err := s.redisClient.XGroupCreateMkStream(ctx, blockStreamName, groupName, "0").Err() if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") { return err } return nil } -func (r *Relayer) handleBidirectionalStream(stream pb.Relayer_SubscribeServer, clientID, groupName, consumerName string) error { +func (s *PayloadStreamer) handleBidirectionalStream(stream pb.PayloadStreamer_SubscribeServer, clientID, groupName, consumerName string) error { ctx := stream.Context() var pendingMessageID string for { if pendingMessageID == "" { // No pending message, read the next message from Redis - messages, err := r.readMessages(ctx, groupName, consumerName) + messages, err := s.readMessages(ctx, groupName, consumerName) if err != nil { - r.logger.Error("Error reading messages", "clientID", clientID, "error", err) + s.logger.Error("Error reading messages", "clientID", clientID, "error", err) return err } if len(messages) == 0 { @@ -131,11 +131,11 @@ func (r *Relayer) handleBidirectionalStream(stream pb.Relayer_SubscribeServer, c executionPayloadStr, okPayload := field.Values["execution_payload"].(string) senderInstanceID, okSenderID := field.Values["sender_instance_id"].(string) if !ok || !okPayload || !okSenderID { - r.logger.Error("Invalid message format", "clientID", clientID) + s.logger.Error("Invalid message format", "clientID", clientID) // Acknowledge malformed messages to prevent reprocessing - err = r.ackMessage(ctx, field.ID, groupName) + err = s.ackMessage(ctx, field.ID, groupName) if err != nil { - r.logger.Error("Failed to acknowledge malformed message", "clientID", clientID, "error", err) + s.logger.Error("Failed to acknowledge malformed message", "clientID", clientID, "error", err) } pendingMessageID = "" continue @@ -148,43 +148,43 @@ func (r *Relayer) handleBidirectionalStream(stream pb.Relayer_SubscribeServer, c MessageId: field.ID, }) if err != nil { - r.logger.Error("Failed to send message to client", "clientID", clientID, "error", err) + s.logger.Error("Failed to send message to client", "clientID", clientID, "error", err) return err } } clientMsg, err := stream.Recv() if err != nil { - r.logger.Error("Failed to receive acknowledgment", "clientID", clientID, "error", err) + s.logger.Error("Failed to receive acknowledgment", "clientID", clientID, "error", err) return err } if ack := clientMsg.GetAckPayload(); ack != nil { if ack.MessageId == pendingMessageID { - err := r.ackMessage(ctx, pendingMessageID, groupName) + err := s.ackMessage(ctx, pendingMessageID, groupName) if err != nil { - r.logger.Error("Failed to acknowledge message", "clientID", clientID, "error", err) + s.logger.Error("Failed to acknowledge message", "clientID", clientID, "error", err) return err } - r.logger.Info("Message acknowledged", "clientID", clientID, "messageID", pendingMessageID) + s.logger.Info("Message acknowledged", "clientID", clientID, "messageID", pendingMessageID) pendingMessageID = "" } else { - r.logger.Error("Received acknowledgment for unknown message ID", "clientID", clientID, "messageID", ack.MessageId) + s.logger.Error("Received acknowledgment for unknown message ID", "clientID", clientID, "messageID", ack.MessageId) } } else { - r.logger.Error("Expected AckPayloadRequest, got something else", "clientID", clientID) + s.logger.Error("Expected AckPayloadRequest, got something else", "clientID", clientID) } } } -func (r *Relayer) readMessages(ctx context.Context, groupName, consumerName string) ([]redis.XStream, error) { - messages, err := r.readMessagesFromStream(ctx, RedisMsgTypePending, groupName, consumerName) +func (s *PayloadStreamer) readMessages(ctx context.Context, groupName, consumerName string) ([]redis.XStream, error) { + messages, err := s.readMessagesFromStream(ctx, RedisMsgTypePending, groupName, consumerName) if err != nil { return nil, err } if len(messages) == 0 || len(messages[0].Messages) == 0 { - messages, err = r.readMessagesFromStream(ctx, RedisMsgTypeNew, groupName, consumerName) + messages, err = s.readMessagesFromStream(ctx, RedisMsgTypeNew, groupName, consumerName) if err != nil { return nil, err } @@ -193,7 +193,7 @@ func (r *Relayer) readMessages(ctx context.Context, groupName, consumerName stri return messages, nil } -func (s *Relayer) readMessagesFromStream(ctx context.Context, msgType RedisMsgType, groupName, consumerName string) ([]redis.XStream, error) { +func (s *PayloadStreamer) readMessagesFromStream(ctx context.Context, msgType RedisMsgType, groupName, consumerName string) ([]redis.XStream, error) { args := &redis.XReadGroupArgs{ Group: groupName, Consumer: consumerName, @@ -210,6 +210,6 @@ func (s *Relayer) readMessagesFromStream(ctx context.Context, msgType RedisMsgTy return messages, nil } -func (r *Relayer) ackMessage(ctx context.Context, messageID, groupName string) error { - return r.redisClient.XAck(ctx, blockStreamName, groupName, messageID).Err() +func (s *PayloadStreamer) ackMessage(ctx context.Context, messageID, groupName string) error { + return s.redisClient.XAck(ctx, blockStreamName, groupName, messageID).Err() } diff --git a/cl/relayer/relayer_test.go b/cl/streamer/streamer_test.go similarity index 96% rename from cl/relayer/relayer_test.go rename to cl/streamer/streamer_test.go index 1d304741d..eb7c6dbd9 100644 --- a/cl/relayer/relayer_test.go +++ b/cl/streamer/streamer_test.go @@ -1,4 +1,4 @@ -package relayer +package streamer import ( "context" @@ -23,7 +23,7 @@ func TestCreateConsumerGroup(t *testing.T) { logger := slog.Default() db, mock := redismock.NewClientMock() - r := &Relayer{ + r := &PayloadStreamer{ redisClient: db, logger: logger, server: grpc.NewServer(), @@ -52,7 +52,7 @@ func TestAckMessage(t *testing.T) { logger := slog.Default() db, mock := redismock.NewClientMock() - r := &Relayer{ + r := &PayloadStreamer{ redisClient: db, logger: logger, } @@ -75,7 +75,7 @@ func TestReadMessages(t *testing.T) { logger := slog.Default() db, mock := redismock.NewClientMock() - r := &Relayer{ + r := &PayloadStreamer{ redisClient: db, logger: logger, } @@ -129,13 +129,13 @@ func TestSubscribe(t *testing.T) { logger := slog.Default() db, mock := redismock.NewClientMock() - r := &Relayer{ + r := &PayloadStreamer{ redisClient: db, logger: logger, server: grpc.NewServer(), } - pb.RegisterRelayerServer(r.server, r) + pb.RegisterPayloadStreamerServer(r.server, r) lis := bufconn.Listen(1024 * 1024) @@ -224,7 +224,7 @@ func TestSubscribe(t *testing.T) { } defer conn.Close() - client := pb.NewRelayerClient(conn) + client := pb.NewPayloadStreamerClient(conn) // Call Subscribe stream, err := client.Subscribe(ctx)