diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 576991c15b96..a986dcf1b37d 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -665,13 +665,13 @@ func (b *BeaconNode) registerGRPCGateway() error { ethpb.RegisterNodeHandler, ethpb.RegisterBeaconChainHandler, ethpb.RegisterBeaconNodeValidatorHandler, - ethpbv1.RegisterEventsHandler, pbrpc.RegisterHealthHandler, } v1Registrations := []gateway.PbHandlerRegistration{ ethpbv1.RegisterBeaconNodeHandler, ethpbv1.RegisterBeaconChainHandler, ethpbv1.RegisterBeaconValidatorHandler, + ethpbv1.RegisterEventsHandler, } if enableDebugRPCEndpoints { v1Alpha1Registrations = append(v1Alpha1Registrations, pbrpc.RegisterDebugHandler) diff --git a/beacon-chain/rpc/apimiddleware/BUILD.bazel b/beacon-chain/rpc/apimiddleware/BUILD.bazel index cd0c1617225f..8d1c7104da4d 100644 --- a/beacon-chain/rpc/apimiddleware/BUILD.bazel +++ b/beacon-chain/rpc/apimiddleware/BUILD.bazel @@ -12,11 +12,13 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/apimiddleware", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//beacon-chain/rpc/eventsv1:go_default_library", "//shared/bytesutil:go_default_library", "//shared/gateway:go_default_library", "//shared/grpcutils:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_r3labs_sse//:go_default_library", ], ) @@ -28,10 +30,12 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//beacon-chain/rpc/eventsv1:go_default_library", "//shared/bytesutil:go_default_library", "//shared/gateway:go_default_library", "//shared/grpcutils:go_default_library", "//shared/testutil/assert:go_default_library", "//shared/testutil/require:go_default_library", + "@com_github_r3labs_sse//:go_default_library", ], ) diff --git a/beacon-chain/rpc/apimiddleware/custom_handlers.go b/beacon-chain/rpc/apimiddleware/custom_handlers.go index 1b5d13a99a58..0ab4a8306f50 100644 --- a/beacon-chain/rpc/apimiddleware/custom_handlers.go +++ b/beacon-chain/rpc/apimiddleware/custom_handlers.go @@ -3,6 +3,8 @@ package apimiddleware import ( "bytes" "encoding/base64" + "encoding/json" + "fmt" "io" "io/ioutil" "net/http" @@ -10,8 +12,10 @@ import ( "strings" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1" "github.com/prysmaticlabs/prysm/shared/gateway" "github.com/prysmaticlabs/prysm/shared/grpcutils" + "github.com/r3labs/sse" ) type sszConfig struct { @@ -157,3 +161,120 @@ func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWrite } return nil } + +func handleEvents(m *gateway.ApiProxyMiddleware, _ gateway.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) { + sseClient := sse.NewClient("http://" + m.GatewayAddress + req.URL.RequestURI()) + eventChan := make(chan *sse.Event) + + // We use grpc-gateway as the server side of events, not the sse library. + // Because of this subscribing to streams doesn't work as intended, resulting in each event being handled by all subscriptions. + // To handle events properly, we subscribe just once using a placeholder value ('events') and handle all topics inside this subscription. + if err := sseClient.SubscribeChan("events", eventChan); err != nil { + gateway.WriteError(w, &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}, nil) + sseClient.Unsubscribe(eventChan) + return + } + + errJson := receiveEvents(eventChan, w, req) + if errJson != nil { + gateway.WriteError(w, errJson, nil) + } + + sseClient.Unsubscribe(eventChan) + return true +} + +func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http.Request) gateway.ErrorJson { + for { + select { + case msg := <-eventChan: + var data interface{} + + switch strings.TrimSpace(string(msg.Event)) { + case eventsv1.HeadTopic: + data = &eventHeadJson{} + case eventsv1.BlockTopic: + data = &receivedBlockDataJson{} + case eventsv1.AttestationTopic: + data = &attestationJson{} + + // Data received in the event does not fit the expected event stream output. + // We extract the underlying attestation from event data + // and assign the attestation back to event data for further processing. + eventData := &aggregatedAttReceivedDataJson{} + if err := json.Unmarshal(msg.Data, eventData); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + attData, err := json.Marshal(eventData.Aggregate) + if err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + msg.Data = attData + case eventsv1.VoluntaryExitTopic: + data = &signedVoluntaryExitJson{} + case eventsv1.FinalizedCheckpointTopic: + data = &eventFinalizedCheckpointJson{} + case eventsv1.ChainReorgTopic: + data = &eventChainReorgJson{} + case "error": + data = &eventErrorJson{} + default: + return &gateway.DefaultErrorJson{ + Message: fmt.Sprintf("Event type '%s' not supported", strings.TrimSpace(string(msg.Event))), + Code: http.StatusInternalServerError, + } + } + + if errJson := writeEvent(msg, w, data); errJson != nil { + return errJson + } + if errJson := flushEvent(w); errJson != nil { + return errJson + } + case <-req.Context().Done(): + return nil + } + } +} + +func writeEvent(msg *sse.Event, w http.ResponseWriter, data interface{}) gateway.ErrorJson { + if err := json.Unmarshal(msg.Data, data); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + if errJson := gateway.ProcessMiddlewareResponseFields(data); errJson != nil { + return errJson + } + dataJson, errJson := gateway.SerializeMiddlewareResponseIntoJson(data) + if errJson != nil { + return errJson + } + + w.Header().Set("Content-Type", "text/event-stream") + + if _, err := w.Write([]byte("event: ")); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + if _, err := w.Write(msg.Event); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + if _, err := w.Write([]byte("\ndata: ")); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + if _, err := w.Write(dataJson); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + if _, err := w.Write([]byte("\n\n")); err != nil { + return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError} + } + + return nil +} + +func flushEvent(w http.ResponseWriter) gateway.ErrorJson { + flusher, ok := w.(http.Flusher) + if !ok { + return &gateway.DefaultErrorJson{Message: fmt.Sprintf("Flush not supported in %T", w), Code: http.StatusInternalServerError} + } + flusher.Flush() + return nil +} diff --git a/beacon-chain/rpc/apimiddleware/custom_handlers_test.go b/beacon-chain/rpc/apimiddleware/custom_handlers_test.go index 46e9981f111c..cd7209a248ff 100644 --- a/beacon-chain/rpc/apimiddleware/custom_handlers_test.go +++ b/beacon-chain/rpc/apimiddleware/custom_handlers_test.go @@ -2,15 +2,20 @@ package apimiddleware import ( "bytes" + "context" + "encoding/json" "net/http" "net/http/httptest" "strings" "testing" + "time" + "github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1" "github.com/prysmaticlabs/prysm/shared/gateway" "github.com/prysmaticlabs/prysm/shared/grpcutils" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" + "github.com/r3labs/sse" ) func TestSSZRequested(t *testing.T) { @@ -136,3 +141,74 @@ func TestWriteSSZResponseHeaderAndBody(t *testing.T) { assert.Equal(t, http.StatusInternalServerError, errJson.StatusCode()) }) } + +func TestReceiveEvents(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan *sse.Event) + w := httptest.NewRecorder() + w.Body = &bytes.Buffer{} + req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{}) + req = req.WithContext(ctx) + + go func() { + base64Val := "Zm9v" + data := &eventFinalizedCheckpointJson{ + Block: base64Val, + State: base64Val, + Epoch: "1", + } + bData, err := json.Marshal(data) + require.NoError(t, err) + msg := &sse.Event{ + Data: bData, + Event: []byte(eventsv1.FinalizedCheckpointTopic), + } + ch <- msg + time.Sleep(time.Second) + cancel() + }() + + errJson := receiveEvents(ch, w, req) + assert.Equal(t, true, errJson == nil) +} + +func TestReceiveEvents_EventNotSupported(t *testing.T) { + ch := make(chan *sse.Event) + w := httptest.NewRecorder() + w.Body = &bytes.Buffer{} + req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{}) + + go func() { + msg := &sse.Event{ + Data: []byte("foo"), + Event: []byte("not_supported"), + } + ch <- msg + }() + + errJson := receiveEvents(ch, w, req) + require.NotNil(t, errJson) + assert.Equal(t, "Event type 'not_supported' not supported", errJson.Msg()) +} + +func TestWriteEvent(t *testing.T) { + base64Val := "Zm9v" + data := &eventFinalizedCheckpointJson{ + Block: base64Val, + State: base64Val, + Epoch: "1", + } + bData, err := json.Marshal(data) + require.NoError(t, err) + msg := &sse.Event{ + Data: bData, + Event: []byte("test_event"), + } + w := httptest.NewRecorder() + w.Body = &bytes.Buffer{} + + errJson := writeEvent(msg, w, &eventFinalizedCheckpointJson{}) + require.Equal(t, true, errJson == nil) + written := w.Body.String() + assert.Equal(t, "event: test_event\ndata: {\"block\":\"0x666f6f\",\"state\":\"0x666f6f\",\"epoch\":\"1\"}\n\n", written) +} diff --git a/beacon-chain/rpc/apimiddleware/endpoint_factory.go b/beacon-chain/rpc/apimiddleware/endpoint_factory.go index 494c3f8ac253..6e8d6b3a1101 100644 --- a/beacon-chain/rpc/apimiddleware/endpoint_factory.go +++ b/beacon-chain/rpc/apimiddleware/endpoint_factory.go @@ -45,6 +45,7 @@ func (f *BeaconEndpointFactory) Paths() []string { "/eth/v1/config/fork_schedule", "/eth/v1/config/deposit_contract", "/eth/v1/config/spec", + "/eth/v1/events", } } @@ -217,6 +218,13 @@ func (f *BeaconEndpointFactory) Create(path string) (*gateway.Endpoint, error) { GetResponse: &specResponseJson{}, Err: &gateway.DefaultErrorJson{}, } + case "/eth/v1/events": + endpoint = gateway.Endpoint{ + Err: &gateway.DefaultErrorJson{}, + Hooks: gateway.HookCollection{ + CustomHandlers: []gateway.CustomHandler{handleEvents}, + }, + } default: return nil, errors.New("invalid path") } diff --git a/beacon-chain/rpc/apimiddleware/structs.go b/beacon-chain/rpc/apimiddleware/structs.go index b1b86b378dce..d941dc0efe38 100644 --- a/beacon-chain/rpc/apimiddleware/structs.go +++ b/beacon-chain/rpc/apimiddleware/structs.go @@ -455,6 +455,45 @@ func (ssz *beaconStateSSZResponseJson) SSZData() string { return ssz.Data } +// TODO: Documentation +// --------------- +// Events. +// --------------- + +type eventHeadJson struct { + Slot string `json:"slot"` + Block string `json:"block" hex:"true"` + State string `json:"state" hex:"true"` + EpochTransition bool `json:"epoch_transition"` + PreviousDutyDependentRoot string `json:"previous_duty_dependent_root" hex:"true"` + CurrentDutyDependentRoot string `json:"current_duty_dependent_root" hex:"true"` +} + +type receivedBlockDataJson struct { + Slot string `json:"slot"` + Block string `json:"block" hex:"true"` +} + +type aggregatedAttReceivedDataJson struct { + Aggregate *attestationJson `json:"aggregate"` +} + +type eventFinalizedCheckpointJson struct { + Block string `json:"block" hex:"true"` + State string `json:"state" hex:"true"` + Epoch string `json:"epoch"` +} + +type eventChainReorgJson struct { + Slot string `json:"slot"` + Depth string `json:"depth"` + OldHeadBlock string `json:"old_head_block" hex:"true"` + NewHeadBlock string `json:"old_head_state" hex:"true"` + OldHeadState string `json:"new_head_block" hex:"true"` + NewHeadState string `json:"new_head_state" hex:"true"` + Epoch string `json:"epoch"` +} + // --------------- // Error handling. // --------------- @@ -470,3 +509,8 @@ type singleAttestationVerificationFailureJson struct { Index int `json:"index"` Message string `json:"message"` } + +type eventErrorJson struct { + StatusCode int `json:"status_code"` + Message string `json:"message"` +} diff --git a/beacon-chain/rpc/eventsv1/BUILD.bazel b/beacon-chain/rpc/eventsv1/BUILD.bazel index 9391cbb18119..c221d63bc6ef 100644 --- a/beacon-chain/rpc/eventsv1/BUILD.bazel +++ b/beacon-chain/rpc/eventsv1/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//proto/eth/v1:go_default_library", "//proto/migration:go_default_library", "@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@org_golang_google_grpc//codes:go_default_library", "@org_golang_google_grpc//status:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", diff --git a/beacon-chain/rpc/eventsv1/events.go b/beacon-chain/rpc/eventsv1/events.go index 075cc1aba2e4..565632c891a6 100644 --- a/beacon-chain/rpc/eventsv1/events.go +++ b/beacon-chain/rpc/eventsv1/events.go @@ -2,6 +2,7 @@ package eventsv1 import ( gwpb "github.com/grpc-ecosystem/grpc-gateway/v2/proto/gateway" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" @@ -15,21 +16,27 @@ import ( ) const ( - headTopic = "head" - blockTopic = "block" - attestationTopic = "attestation" - voluntaryExitTopic = "voluntary_exit" - finalizedCheckpointTopic = "finalized_checkpoint" - chainReorgTopic = "chain_reorg" + // HeadTopic represents a new chain head event topic. + HeadTopic = "head" + // BlockTopic represents a new produced block event topic. + BlockTopic = "block" + // AttestationTopic represents a new submitted attestation event topic. + AttestationTopic = "attestation" + // VoluntaryExitTopic represents a new performed voluntary exit event topic. + VoluntaryExitTopic = "voluntary_exit" + // FinalizedCheckpointTopic represents a new finalized checkpoint event topic. + FinalizedCheckpointTopic = "finalized_checkpoint" + // ChainReorgTopic represents a chain reorganization event topic. + ChainReorgTopic = "chain_reorg" ) var casesHandled = map[string]bool{ - headTopic: true, - blockTopic: true, - attestationTopic: true, - voluntaryExitTopic: true, - finalizedCheckpointTopic: true, - chainReorgTopic: true, + HeadTopic: true, + BlockTopic: true, + AttestationTopic: true, + VoluntaryExitTopic: true, + FinalizedCheckpointTopic: true, + ChainReorgTopic: true, } // StreamEvents allows requesting all events from a set of topics defined in the eth2.0-apis standard. @@ -39,13 +46,13 @@ func (s *Server) StreamEvents( req *ethpb.StreamEventsRequest, stream ethpb.Events_StreamEventsServer, ) error { if req == nil || len(req.Topics) == 0 { - return status.Error(codes.InvalidArgument, "no topics specified to subscribe to") + return status.Error(codes.InvalidArgument, "No topics specified to subscribe to") } // Check if the topics in the request are valid. requestedTopics := make(map[string]bool) for _, topic := range req.Topics { if _, ok := casesHandled[topic]; !ok { - return status.Errorf(codes.InvalidArgument, "topic %s not allowed for event subscriptions", topic) + return status.Errorf(codes.InvalidArgument, "Topic %s not allowed for event subscriptions", topic) } requestedTopics[topic] = true } @@ -69,20 +76,20 @@ func (s *Server) StreamEvents( select { case event := <-blockChan: if err := s.handleBlockEvents(stream, requestedTopics, event); err != nil { - return status.Errorf(codes.Internal, "could not handle block event: %v", err) + return status.Errorf(codes.Internal, "Could not handle block event: %v", err) } case event := <-opsChan: if err := s.handleBlockOperationEvents(stream, requestedTopics, event); err != nil { - return status.Errorf(codes.Internal, "could not handle block operations event: %v", err) + return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err) } case event := <-stateChan: if err := s.handleStateEvents(stream, requestedTopics, event); err != nil { - return status.Errorf(codes.Internal, "could not handle state event: %v", err) + return status.Errorf(codes.Internal, "Could not handle state event: %v", err) } case <-s.Ctx.Done(): - return status.Errorf(codes.Canceled, "context canceled") + return status.Errorf(codes.Canceled, "Context canceled") case <-stream.Context().Done(): - return status.Errorf(codes.Canceled, "context canceled") + return status.Errorf(codes.Canceled, "Context canceled") } } } @@ -92,7 +99,7 @@ func (s *Server) handleBlockEvents( ) error { switch event.Type { case blockfeed.ReceivedBlock: - if _, ok := requestedTopics[blockTopic]; !ok { + if _, ok := requestedTopics[BlockTopic]; !ok { return nil } blkData, ok := event.Data.(*blockfeed.ReceivedBlockData) @@ -105,13 +112,13 @@ func (s *Server) handleBlockEvents( } item, err := v1Data.HashTreeRoot() if err != nil { - return status.Errorf(codes.Internal, "could not hash tree root block %v", err) + return errors.Wrap(err, "could not hash tree root block") } eventBlock := ðpb.EventBlock{ Slot: v1Data.Message.Slot, Block: item[:], } - return s.streamData(stream, blockTopic, eventBlock) + return s.streamData(stream, BlockTopic, eventBlock) default: return nil } @@ -122,7 +129,7 @@ func (s *Server) handleBlockOperationEvents( ) error { switch event.Type { case operation.AggregatedAttReceived: - if _, ok := requestedTopics[attestationTopic]; !ok { + if _, ok := requestedTopics[AttestationTopic]; !ok { return nil } attData, ok := event.Data.(*operation.AggregatedAttReceivedData) @@ -130,9 +137,9 @@ func (s *Server) handleBlockOperationEvents( return nil } v1Data := migration.V1Alpha1AggregateAttAndProofToV1(attData.Attestation) - return s.streamData(stream, attestationTopic, v1Data) + return s.streamData(stream, AttestationTopic, v1Data) case operation.UnaggregatedAttReceived: - if _, ok := requestedTopics[attestationTopic]; !ok { + if _, ok := requestedTopics[AttestationTopic]; !ok { return nil } attData, ok := event.Data.(*operation.UnAggregatedAttReceivedData) @@ -140,9 +147,9 @@ func (s *Server) handleBlockOperationEvents( return nil } v1Data := migration.V1Alpha1AttestationToV1(attData.Attestation) - return s.streamData(stream, attestationTopic, v1Data) + return s.streamData(stream, AttestationTopic, v1Data) case operation.ExitReceived: - if _, ok := requestedTopics[voluntaryExitTopic]; !ok { + if _, ok := requestedTopics[VoluntaryExitTopic]; !ok { return nil } exitData, ok := event.Data.(*operation.ExitReceivedData) @@ -150,7 +157,7 @@ func (s *Server) handleBlockOperationEvents( return nil } v1Data := migration.V1Alpha1ExitToV1(exitData.Exit) - return s.streamData(stream, voluntaryExitTopic, v1Data) + return s.streamData(stream, VoluntaryExitTopic, v1Data) default: return nil } @@ -161,32 +168,32 @@ func (s *Server) handleStateEvents( ) error { switch event.Type { case statefeed.NewHead: - if _, ok := requestedTopics[headTopic]; !ok { + if _, ok := requestedTopics[HeadTopic]; !ok { return nil } head, ok := event.Data.(*ethpb.EventHead) if !ok { return nil } - return s.streamData(stream, headTopic, head) + return s.streamData(stream, HeadTopic, head) case statefeed.FinalizedCheckpoint: - if _, ok := requestedTopics[finalizedCheckpointTopic]; !ok { + if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok { return nil } finalizedCheckpoint, ok := event.Data.(*ethpb.EventFinalizedCheckpoint) if !ok { return nil } - return s.streamData(stream, finalizedCheckpointTopic, finalizedCheckpoint) + return s.streamData(stream, FinalizedCheckpointTopic, finalizedCheckpoint) case statefeed.Reorg: - if _, ok := requestedTopics[chainReorgTopic]; !ok { + if _, ok := requestedTopics[ChainReorgTopic]; !ok { return nil } reorg, ok := event.Data.(*ethpb.EventChainReorg) if !ok { return nil } - return s.streamData(stream, chainReorgTopic, reorg) + return s.streamData(stream, ChainReorgTopic, reorg) default: return nil } diff --git a/beacon-chain/rpc/eventsv1/events_test.go b/beacon-chain/rpc/eventsv1/events_test.go index e462d838df6f..865e792c3f5a 100644 --- a/beacon-chain/rpc/eventsv1/events_test.go +++ b/beacon-chain/rpc/eventsv1/events_test.go @@ -30,7 +30,7 @@ func TestStreamEvents_Preconditions(t *testing.T) { defer ctrl.Finish() mockStream := mock.NewMockEvents_StreamEventsServer(ctrl) err := srv.StreamEvents(ðpb.StreamEventsRequest{Topics: nil}, mockStream) - require.ErrorContains(t, "no topics specified", err) + require.ErrorContains(t, "No topics specified", err) }) t.Run("topic_not_allowed", func(t *testing.T) { srv := &Server{} @@ -38,12 +38,12 @@ func TestStreamEvents_Preconditions(t *testing.T) { defer ctrl.Finish() mockStream := mock.NewMockEvents_StreamEventsServer(ctrl) err := srv.StreamEvents(ðpb.StreamEventsRequest{Topics: []string{"foobar"}}, mockStream) - require.ErrorContains(t, "topic foobar not allowed", err) + require.ErrorContains(t, "Topic foobar not allowed", err) }) } func TestStreamEvents_BlockEvents(t *testing.T) { - t.Run(blockTopic, func(t *testing.T) { + t.Run(BlockTopic, func(t *testing.T) { ctx := context.Background() srv, ctrl, mockStream := setupServer(ctx, t) defer ctrl.Finish() @@ -61,14 +61,14 @@ func TestStreamEvents_BlockEvents(t *testing.T) { }) require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: blockTopic, + Event: BlockTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{blockTopic}, + topics: []string{BlockTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ @@ -98,14 +98,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: attestationTopic, + Event: AttestationTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{attestationTopic}, + topics: []string{AttestationTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ @@ -130,14 +130,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: attestationTopic, + Event: AttestationTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{attestationTopic}, + topics: []string{AttestationTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ @@ -149,7 +149,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { feed: srv.OperationNotifier.OperationFeed(), }) }) - t.Run(voluntaryExitTopic, func(t *testing.T) { + t.Run(VoluntaryExitTopic, func(t *testing.T) { ctx := context.Background() srv, ctrl, mockStream := setupServer(ctx, t) defer ctrl.Finish() @@ -166,14 +166,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: voluntaryExitTopic, + Event: VoluntaryExitTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{voluntaryExitTopic}, + topics: []string{VoluntaryExitTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ @@ -188,7 +188,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { } func TestStreamEvents_StateEvents(t *testing.T) { - t.Run(headTopic, func(t *testing.T) { + t.Run(HeadTopic, func(t *testing.T) { ctx := context.Background() srv, ctrl, mockStream := setupServer(ctx, t) defer ctrl.Finish() @@ -204,14 +204,14 @@ func TestStreamEvents_StateEvents(t *testing.T) { genericResponse, err := anypb.New(wantedHead) require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: headTopic, + Event: HeadTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{headTopic}, + topics: []string{HeadTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ @@ -221,7 +221,7 @@ func TestStreamEvents_StateEvents(t *testing.T) { feed: srv.StateNotifier.StateFeed(), }) }) - t.Run(finalizedCheckpointTopic, func(t *testing.T) { + t.Run(FinalizedCheckpointTopic, func(t *testing.T) { ctx := context.Background() srv, ctrl, mockStream := setupServer(ctx, t) defer ctrl.Finish() @@ -234,14 +234,14 @@ func TestStreamEvents_StateEvents(t *testing.T) { genericResponse, err := anypb.New(wantedCheckpoint) require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: finalizedCheckpointTopic, + Event: FinalizedCheckpointTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{finalizedCheckpointTopic}, + topics: []string{FinalizedCheckpointTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ @@ -251,7 +251,7 @@ func TestStreamEvents_StateEvents(t *testing.T) { feed: srv.StateNotifier.StateFeed(), }) }) - t.Run(chainReorgTopic, func(t *testing.T) { + t.Run(ChainReorgTopic, func(t *testing.T) { ctx := context.Background() srv, ctrl, mockStream := setupServer(ctx, t) defer ctrl.Finish() @@ -268,14 +268,14 @@ func TestStreamEvents_StateEvents(t *testing.T) { genericResponse, err := anypb.New(wantedReorg) require.NoError(t, err) wantedMessage := &gateway.EventSource{ - Event: chainReorgTopic, + Event: ChainReorgTopic, Data: genericResponse, } assertFeedSendAndReceive(ctx, &assertFeedArgs{ t: t, srv: srv, - topics: []string{chainReorgTopic}, + topics: []string{ChainReorgTopic}, stream: mockStream, shouldReceive: wantedMessage, itemToSend: &feed.Event{ diff --git a/beacon-chain/server/main.go b/beacon-chain/server/main.go index f7b354d5b3a5..f45ed3296add 100644 --- a/beacon-chain/server/main.go +++ b/beacon-chain/server/main.go @@ -46,13 +46,13 @@ func main() { ethpb.RegisterNodeHandler, ethpb.RegisterBeaconChainHandler, ethpb.RegisterBeaconNodeValidatorHandler, - ethpbv1.RegisterEventsHandler, pbrpc.RegisterHealthHandler, } v1Registrations := []gateway.PbHandlerRegistration{ ethpbv1.RegisterBeaconNodeHandler, ethpbv1.RegisterBeaconChainHandler, ethpbv1.RegisterBeaconValidatorHandler, + ethpbv1.RegisterEventsHandler, } if *enableDebugRPCEndpoints { v1Alpha1Registrations = append(v1Alpha1Registrations, pbrpc.RegisterDebugHandler) diff --git a/deps.bzl b/deps.bzl index 927cd7292468..d9c52bdd8673 100644 --- a/deps.bzl +++ b/deps.bzl @@ -1800,6 +1800,7 @@ def prysm_deps(): sum = "h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=", version = "v1.2.3", ) + go_repository( name = "com_github_klauspost_cpuid_v2", importpath = "github.com/klauspost/cpuid/v2", @@ -2805,6 +2806,13 @@ def prysm_deps(): sum = "h1:JCHLVE3B+kJde7bIEo5N4J+ZbLhp0J1Fs+ulyRws4gE=", version = "v0.0.0-20160726150825-5bd2802263f2", ) + go_repository( + name = "com_github_r3labs_sse", + importpath = "github.com/r3labs/sse", + sum = "h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=", + version = "v0.0.0-20210224172625-26fe804710bc", + ) + go_repository( name = "com_github_rcrowley_go_metrics", importpath = "github.com/rcrowley/go-metrics", @@ -3411,6 +3419,13 @@ def prysm_deps(): sum = "h1:stTHdEoWg1pQ8riaP5ROrjS6zy6wewH/Q2iwnLCQUXY=", version = "v1.0.0-20160220154919-db14e161995a", ) + go_repository( + name = "in_gopkg_cenkalti_backoff_v1", + importpath = "gopkg.in/cenkalti/backoff.v1", + sum = "h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=", + version = "v1.1.0", + ) + go_repository( name = "in_gopkg_check_v1", importpath = "gopkg.in/check.v1", diff --git a/go.mod b/go.mod index 4a91b4299a00..575e038b8b0d 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/prysmaticlabs/go-bitfield v0.0.0-20210607200045-4da71aaf6c2d github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20210504233148-1e141af6a0a1 + github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/rs/cors v1.7.0 github.com/schollz/progressbar/v3 v3.3.4 github.com/sirupsen/logrus v1.6.0 @@ -133,6 +134,6 @@ replace github.com/ethereum/go-ethereum => github.com/prysmaticlabs/bazel-go-eth replace github.com/json-iterator/go => github.com/prestonvanloon/go v1.1.7-0.20190722034630-4f2e55fcf87b // See https://github.com/prysmaticlabs/grpc-gateway/issues/2 -replace github.com/grpc-ecosystem/grpc-gateway/v2 => github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210604200058-f148bcf3f503 +replace github.com/grpc-ecosystem/grpc-gateway/v2 => github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210622145107-ca3041e1b380 replace github.com/ferranbt/fastssz => github.com/rauljordan/fastssz v0.0.0-20210622230010-a131010e198f diff --git a/go.sum b/go.sum index 2c7c76cd1dd2..0bfcc14d797f 100644 --- a/go.sum +++ b/go.sum @@ -1071,12 +1071,14 @@ github.com/prysmaticlabs/eth2-types v0.0.0-20210303084904-c9735a06829d/go.mod h1 github.com/prysmaticlabs/go-bitfield v0.0.0-20210108222456-8e92c3709aa0/go.mod h1:hCwmef+4qXWjv0jLDbQdWnL0Ol7cS7/lCSS26WR+u6s= github.com/prysmaticlabs/go-bitfield v0.0.0-20210607200045-4da71aaf6c2d h1:46gKr69IlRpv/ENdlzG0SWo5nMLKJxS3tI5NOSdZndQ= github.com/prysmaticlabs/go-bitfield v0.0.0-20210607200045-4da71aaf6c2d/go.mod h1:hCwmef+4qXWjv0jLDbQdWnL0Ol7cS7/lCSS26WR+u6s= -github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210604200058-f148bcf3f503 h1:QzTDCXA7FV2tIJ7TGHfEsYfa8QaAeMB1F4B5jAsGQNg= -github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210604200058-f148bcf3f503/go.mod h1:IOyTYjcIO0rkmnGBfJTL0NJ11exy/Tc2QEuv7hCXp24= +github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210622145107-ca3041e1b380 h1:KzQOksIZB8poBiMk8h5Txzbp/OoBLFhS3H20ZN06hWg= +github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210622145107-ca3041e1b380/go.mod h1:IOyTYjcIO0rkmnGBfJTL0NJ11exy/Tc2QEuv7hCXp24= github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO/VN0s9k+RmLykho7AjDxblNYI5bYKed16NPU= github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c/go.mod h1:ZRws458tYHS/Zs936OQ6oCrL+Ict5O4Xpwve1UQ6C9M= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20210504233148-1e141af6a0a1 h1:k7CCMwN7VooQ7GhfySnaVyI4/9+QbhJTdasoC6VOZOI= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20210504233148-1e141af6a0a1/go.mod h1:au9l1XcWNEKixIlSRzEe54fYGhyELWgJJIxKu8W75Mc= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= github.com/rauljordan/fastssz v0.0.0-20210622230010-a131010e198f h1:CoD/RaM9s8qfHA7jAqntW3jv+z9zPBXaxCaCByrKOmg= github.com/rauljordan/fastssz v0.0.0-20210622230010-a131010e198f/go.mod h1:DyEu2iuLBnb/T51BlsiO3yLYdJC6UbGMrIkqK1KmQxM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1353,6 +1355,7 @@ golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1682,6 +1685,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=