diff --git a/beacon-chain/rpc/beacon/BUILD.bazel b/beacon-chain/rpc/beacon/BUILD.bazel index ff5e3190c648..fac5ff90e6f3 100644 --- a/beacon-chain/rpc/beacon/BUILD.bazel +++ b/beacon-chain/rpc/beacon/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//shared/hashutil:go_default_library", "//shared/pagination:go_default_library", "//shared/params:go_default_library", + "//shared/slotutil:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", @@ -56,6 +57,7 @@ go_test( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/testing:go_default_library", + "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/rpc/testing:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/params:go_default_library", diff --git a/beacon-chain/rpc/beacon/attestations.go b/beacon-chain/rpc/beacon/attestations.go index 685850162ebf..12758c3ba0b7 100644 --- a/beacon-chain/rpc/beacon/attestations.go +++ b/beacon-chain/rpc/beacon/attestations.go @@ -11,6 +11,8 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" "github.com/prysmaticlabs/prysm/shared/pagination" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/slotutil" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -113,12 +115,29 @@ func (bs *Server) ListAttestations( }, nil } -// StreamAttestations to clients every single time a new attestation is received. -// TODO(#4184): Implement. +// StreamAttestations to clients at the end of every slot. This method retrieves the +// aggregated attestations currently in the pool at the start of a slot and sends +// them over a gRPC stream. func (bs *Server) StreamAttestations( - _ *ptypes.Empty, _ ethpb.BeaconChain_StreamAttestationsServer, + _ *ptypes.Empty, stream ethpb.BeaconChain_StreamAttestationsServer, ) error { - return status.Error(codes.Unimplemented, "Not yet implemented") + genesisTime := bs.GenesisTimeFetcher.GenesisTime() + st := slotutil.GetSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot) + for { + select { + case <-st.C(): + atts := bs.Pool.AggregatedAttestations() + for i := 0; i < len(atts); i++ { + if err := stream.Send(atts[i]); err != nil { + return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) + } + } + case <-bs.Ctx.Done(): + return status.Error(codes.Canceled, "Context canceled") + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Context canceled") + } + } } // AttestationPool retrieves pending attestations. diff --git a/beacon-chain/rpc/beacon/attestations_test.go b/beacon-chain/rpc/beacon/attestations_test.go index a40c7992e848..71ea428dca88 100644 --- a/beacon-chain/rpc/beacon/attestations_test.go +++ b/beacon-chain/rpc/beacon/attestations_test.go @@ -7,13 +7,18 @@ import ( "strconv" "strings" "testing" + "time" "github.com/gogo/protobuf/proto" + ptypes "github.com/gogo/protobuf/types" + "github.com/golang/mock/gomock" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" + mockRPC "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testing" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" ) @@ -513,3 +518,78 @@ func TestServer_ListAttestations_Pagination_DefaultPageSize(t *testing.T) { t.Error("Incorrect attestations response") } } + +func TestServer_StreamAttestations_ContextCanceled(t *testing.T) { + db := dbTest.SetupDB(t) + defer dbTest.TeardownDB(t, db) + ctx := context.Background() + + ctx, cancel := context.WithCancel(ctx) + chainService := &mock.ChainService{ + Genesis: time.Now(), + } + server := &Server{ + Ctx: ctx, + GenesisTimeFetcher: chainService, + } + + exitRoutine := make(chan bool) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStream := mockRPC.NewMockBeaconChain_StreamAttestationsServer(ctrl) + mockStream.EXPECT().Context().Return(ctx) + go func(tt *testing.T) { + if err := server.StreamAttestations( + &ptypes.Empty{}, + mockStream, + ); !strings.Contains(err.Error(), "Context canceled") { + tt.Errorf("Expected context canceled error got: %v", err) + } + <-exitRoutine + }(t) + cancel() + exitRoutine <- true +} + +func TestServer_StreamAttestations_OnSlotTick(t *testing.T) { + db := dbTest.SetupDB(t) + defer dbTest.TeardownDB(t, db) + exitRoutine := make(chan bool) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + secondsPerSlot := time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) + chainService := &mock.ChainService{ + Genesis: time.Now().Add(-secondsPerSlot), + } + server := &Server{ + Ctx: ctx, + GenesisTimeFetcher: chainService, + Pool: attestations.NewPool(), + } + + atts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}}, + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}}, + {Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}}, + } + if err := server.Pool.SaveAggregatedAttestations(atts); err != nil { + t.Fatal(err) + } + + mockStream := mockRPC.NewMockBeaconChain_StreamAttestationsServer(ctrl) + mockStream.EXPECT().Send(atts[0]) + mockStream.EXPECT().Send(atts[1]) + mockStream.EXPECT().Send(atts[2]).Do(func(arg0 interface{}) { + exitRoutine <- true + }) + mockStream.EXPECT().Context().Return(ctx).AnyTimes() + + go func(tt *testing.T) { + if err := server.StreamAttestations(&ptypes.Empty{}, mockStream); err != nil { + tt.Errorf("Could not call RPC method: %v", err) + } + }(t) + + <-exitRoutine +} diff --git a/beacon-chain/rpc/beacon/server.go b/beacon-chain/rpc/beacon/server.go index 419f0ebd1b4b..2952b6a963f7 100644 --- a/beacon-chain/rpc/beacon/server.go +++ b/beacon-chain/rpc/beacon/server.go @@ -27,4 +27,5 @@ type Server struct { IncomingAttestation chan *ethpb.Attestation CanonicalStateChan chan *pbp2p.BeaconState ChainStartChan chan time.Time + GenesisTimeFetcher blockchain.GenesisTimeFetcher } diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 74d78dc4175d..29edb5c6e61b 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -213,6 +213,7 @@ func (s *Service) Start() { ChainStartFetcher: s.chainStartFetcher, CanonicalStateChan: s.canonicalStateChan, StateNotifier: s.stateNotifier, + GenesisTimeFetcher: s.genesisTimeFetcher, } aggregatorServer := &aggregator.Server{ BeaconDB: s.beaconDB, diff --git a/beacon-chain/rpc/testing/beacon_chain_service_mock.go b/beacon-chain/rpc/testing/beacon_chain_service_mock.go index b12f6ad139b2..71873b89e475 100644 --- a/beacon-chain/rpc/testing/beacon_chain_service_mock.go +++ b/beacon-chain/rpc/testing/beacon_chain_service_mock.go @@ -1,16 +1,15 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/prysmaticlabs/ethereumapis/eth/v1alpha1 (interfaces: BeaconChain_StreamChainHeadServer) +// Source: github.com/prysmaticlabs/ethereumapis/eth/v1alpha1 (interfaces: BeaconChain_StreamChainHeadServer,BeaconChain_StreamAttestationsServer) // Package testing is a generated GoMock package. package testing import ( context "context" - reflect "reflect" - gomock "github.com/golang/mock/gomock" v1alpha1 "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" metadata "google.golang.org/grpc/metadata" + reflect "reflect" ) // MockBeaconChain_StreamChainHeadServer is a mock of BeaconChain_StreamChainHeadServer interface @@ -131,3 +130,122 @@ func (mr *MockBeaconChain_StreamChainHeadServerMockRecorder) SetTrailer(arg0 int mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockBeaconChain_StreamChainHeadServer)(nil).SetTrailer), arg0) } + +// MockBeaconChain_StreamAttestationsServer is a mock of BeaconChain_StreamAttestationsServer interface +type MockBeaconChain_StreamAttestationsServer struct { + ctrl *gomock.Controller + recorder *MockBeaconChain_StreamAttestationsServerMockRecorder +} + +// MockBeaconChain_StreamAttestationsServerMockRecorder is the mock recorder for MockBeaconChain_StreamAttestationsServer +type MockBeaconChain_StreamAttestationsServerMockRecorder struct { + mock *MockBeaconChain_StreamAttestationsServer +} + +// NewMockBeaconChain_StreamAttestationsServer creates a new mock instance +func NewMockBeaconChain_StreamAttestationsServer(ctrl *gomock.Controller) *MockBeaconChain_StreamAttestationsServer { + mock := &MockBeaconChain_StreamAttestationsServer{ctrl: ctrl} + mock.recorder = &MockBeaconChain_StreamAttestationsServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockBeaconChain_StreamAttestationsServer) EXPECT() *MockBeaconChain_StreamAttestationsServerMockRecorder { + return m.recorder +} + +// Context mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).Context)) +} + +// RecvMsg mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).RecvMsg), arg0) +} + +// Send mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) Send(arg0 *v1alpha1.Attestation) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).Send), arg0) +} + +// SendHeader mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).SendMsg), arg0) +} + +// SetHeader mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method +func (m *MockBeaconChain_StreamAttestationsServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer +func (mr *MockBeaconChain_StreamAttestationsServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockBeaconChain_StreamAttestationsServer)(nil).SetTrailer), arg0) +}