diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 32e312b0093d..46bc8a868b60 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -52,7 +52,7 @@ type Service struct { depositCache *depositcache.DepositCache chainStartFetcher powchain.ChainStartFetcher attPool attestations.Pool - slashingPool *slashings.Pool + slashingPool slashings.PoolManager exitPool *voluntaryexits.Pool genesisTime time.Time p2p p2p.Broadcaster @@ -89,7 +89,7 @@ type Config struct { DepositCache *depositcache.DepositCache AttPool attestations.Pool ExitPool *voluntaryexits.Pool - SlashingPool *slashings.Pool + SlashingPool slashings.PoolManager P2p p2p.Broadcaster MaxRoutines int StateNotifier statefeed.Notifier diff --git a/beacon-chain/db/kv/state_test.go b/beacon-chain/db/kv/state_test.go index e4a6d4f7b215..f7c1872013da 100644 --- a/beacon-chain/db/kv/state_test.go +++ b/beacon-chain/db/kv/state_test.go @@ -5,7 +5,6 @@ import ( "reflect" "testing" - "github.com/gogo/protobuf/proto" types "github.com/prysmaticlabs/eth2-types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/state" @@ -57,7 +56,7 @@ func TestGenesisState_CanSaveRetrieve(t *testing.T) { savedGenesisS, err := db.GenesisState(context.Background()) require.NoError(t, err) - assert.DeepEqual(t, st.InnerStateUnsafe(), savedGenesisS.InnerStateUnsafe(), "Did not retrieve saved state") + assert.DeepSSZEqual(t, st.InnerStateUnsafe(), savedGenesisS.InnerStateUnsafe(), "Did not retrieve saved state") require.NoError(t, db.SaveGenesisBlockRoot(context.Background(), [32]byte{'C'})) } @@ -197,15 +196,15 @@ func TestStore_SaveDeleteState_CanGetHighestBelow(t *testing.T) { highest, err := db.HighestSlotStatesBelow(context.Background(), 2) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(highest[0].InnerStateUnsafe(), s0), "Did not retrieve saved state: %v != %v", highest, s0) + assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), s0) highest, err = db.HighestSlotStatesBelow(context.Background(), 101) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(highest[0].InnerStateUnsafe(), s1), "Did not retrieve saved state: %v != %v", highest, s1) + assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), s1) highest, err = db.HighestSlotStatesBelow(context.Background(), 1001) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(highest[0].InnerStateUnsafe(), s2), "Did not retrieve saved state: %v != %v", highest, s2) + assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), s2) } func TestStore_GenesisState_CanGetHighestBelow(t *testing.T) { @@ -230,14 +229,14 @@ func TestStore_GenesisState_CanGetHighestBelow(t *testing.T) { highest, err := db.HighestSlotStatesBelow(context.Background(), 2) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(highest[0].InnerStateUnsafe(), st.InnerStateUnsafe())) + assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), st.InnerStateUnsafe()) highest, err = db.HighestSlotStatesBelow(context.Background(), 1) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(highest[0].InnerStateUnsafe(), genesisState.InnerStateUnsafe())) + assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), genesisState.InnerStateUnsafe()) highest, err = db.HighestSlotStatesBelow(context.Background(), 0) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(highest[0].InnerStateUnsafe(), genesisState.InnerStateUnsafe())) + assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), genesisState.InnerStateUnsafe()) } func TestStore_CleanUpDirtyStates_AboveThreshold(t *testing.T) { diff --git a/beacon-chain/db/restore_test.go b/beacon-chain/db/restore_test.go index b4a9a03b8afb..879df02d56b6 100644 --- a/beacon-chain/db/restore_test.go +++ b/beacon-chain/db/restore_test.go @@ -8,6 +8,7 @@ import ( "path" "testing" + types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/shared/cmd" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -65,7 +66,7 @@ func TestRestore(t *testing.T) { require.NoError(t, err) headBlock, err := restoredDb.HeadBlock(ctx) require.NoError(t, err) - assert.Equal(t, uint64(5000), headBlock.Block.Slot, "Restored database has incorrect data") + assert.Equal(t, types.Slot(5000), headBlock.Block.Slot, "Restored database has incorrect data") assert.LogsContain(t, logHook, "Restore completed successfully") } diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index dab52bc36589..be1e984a7954 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -68,7 +68,7 @@ type BeaconNode struct { db db.Database attestationPool attestations.Pool exitPool *voluntaryexits.Pool - slashingsPool *slashings.Pool + slashingsPool slashings.PoolManager depositCache *depositcache.DepositCache stateFeed *event.Feed blockFeed *event.Feed diff --git a/beacon-chain/operations/attestations/kv/aggregated_test.go b/beacon-chain/operations/attestations/kv/aggregated_test.go index f8f480423b5d..efba144a8e18 100644 --- a/beacon-chain/operations/attestations/kv/aggregated_test.go +++ b/beacon-chain/operations/attestations/kv/aggregated_test.go @@ -512,6 +512,6 @@ func TestKV_Aggregated_DuplicateAggregatedAttestations(t *testing.T) { returned := cache.AggregatedAttestations() // It should have only returned att2. - assert.DeepEqual(t, att2, returned[0], "Did not receive correct aggregated atts") + assert.DeepSSZEqual(t, att2, returned[0], "Did not receive correct aggregated atts") assert.Equal(t, 1, len(returned), "Did not receive correct aggregated atts") } diff --git a/beacon-chain/operations/slashings/BUILD.bazel b/beacon-chain/operations/slashings/BUILD.bazel index 07ab86affac5..e3eab4cf8722 100644 --- a/beacon-chain/operations/slashings/BUILD.bazel +++ b/beacon-chain/operations/slashings/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "doc.go", "log.go", "metrics.go", + "mock.go", "service.go", "types.go", ], diff --git a/beacon-chain/operations/slashings/mock.go b/beacon-chain/operations/slashings/mock.go new file mode 100644 index 000000000000..3929ac04eece --- /dev/null +++ b/beacon-chain/operations/slashings/mock.go @@ -0,0 +1,43 @@ +package slashings + +import ( + "context" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/state" +) + +// PoolMock is a fake implementation of PoolManager. +type PoolMock struct { + PendingAttSlashings []*ethpb.AttesterSlashing +} + +// PendingAttesterSlashings -- +func (m *PoolMock) PendingAttesterSlashings(ctx context.Context, state *state.BeaconState, noLimit bool) []*ethpb.AttesterSlashing { + return m.PendingAttSlashings +} + +// PendingProposerSlashings -- +func (m *PoolMock) PendingProposerSlashings(ctx context.Context, state *state.BeaconState, noLimit bool) []*ethpb.ProposerSlashing { + panic("implement me") +} + +// InsertAttesterSlashing -- +func (m *PoolMock) InsertAttesterSlashing(ctx context.Context, state *state.BeaconState, slashing *ethpb.AttesterSlashing) error { + panic("implement me") +} + +// InsertProposerSlashing -- +func (m *PoolMock) InsertProposerSlashing(ctx context.Context, state *state.BeaconState, slashing *ethpb.ProposerSlashing) error { + panic("implement me") +} + +// MarkIncludedAttesterSlashing -- +func (m *PoolMock) MarkIncludedAttesterSlashing(as *ethpb.AttesterSlashing) { + panic("implement me") +} + +// MarkIncludedProposerSlashing -- +func (m *PoolMock) MarkIncludedProposerSlashing(ps *ethpb.ProposerSlashing) { + panic("implement me") +} diff --git a/beacon-chain/operations/slashings/types.go b/beacon-chain/operations/slashings/types.go index 893de20fbf31..f367424a18cb 100644 --- a/beacon-chain/operations/slashings/types.go +++ b/beacon-chain/operations/slashings/types.go @@ -1,14 +1,34 @@ package slashings import ( + "context" "sync" types "github.com/prysmaticlabs/eth2-types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/state" ) -// Pool implements a struct to maintain pending and recently included attester and -// proposer slashings. This pool is used by proposers to insert into new blocks. +// PoolManager maintains a pool of pending and recently included attester and proposer slashings. +// This pool is used by proposers to insert data into new blocks. +type PoolManager interface { + PendingAttesterSlashings(ctx context.Context, state *state.BeaconState, noLimit bool) []*ethpb.AttesterSlashing + PendingProposerSlashings(ctx context.Context, state *state.BeaconState, noLimit bool) []*ethpb.ProposerSlashing + InsertAttesterSlashing( + ctx context.Context, + state *state.BeaconState, + slashing *ethpb.AttesterSlashing, + ) error + InsertProposerSlashing( + ctx context.Context, + state *state.BeaconState, + slashing *ethpb.ProposerSlashing, + ) error + MarkIncludedAttesterSlashing(as *ethpb.AttesterSlashing) + MarkIncludedProposerSlashing(ps *ethpb.ProposerSlashing) +} + +// Pool is a concrete implementation of PoolManager. type Pool struct { lock sync.RWMutex pendingProposerSlashing []*ethpb.ProposerSlashing diff --git a/beacon-chain/rpc/beacon/blocks_test.go b/beacon-chain/rpc/beacon/blocks_test.go index c9bad7197134..71d7c4494873 100644 --- a/beacon-chain/rpc/beacon/blocks_test.go +++ b/beacon-chain/rpc/beacon/blocks_test.go @@ -152,6 +152,9 @@ func TestServer_ListBlocks_Genesis_MultiBlocks(t *testing.T) { } func TestServer_ListBlocks_Pagination(t *testing.T) { + params.UseMinimalConfig() + defer params.UseMainnetConfig() + db := dbTest.SetupDB(t) chain := &chainMock.ChainService{ CanonicalRoots: map[[32]byte]bool{}, @@ -357,8 +360,10 @@ func TestServer_GetChainHead_NoHeadBlock(t *testing.T) { } func TestServer_GetChainHead(t *testing.T) { - db := dbTest.SetupDB(t) + params.UseMinimalConfig() + defer params.UseMainnetConfig() + db := dbTest.SetupDB(t) genBlock := testutil.NewBeaconBlock() genBlock.Block.ParentRoot = bytesutil.PadTo([]byte{'G'}, 32) require.NoError(t, db.SaveBlock(context.Background(), genBlock)) diff --git a/beacon-chain/rpc/beacon/server.go b/beacon-chain/rpc/beacon/server.go index 8e01d9fd2394..9ca92baad56f 100644 --- a/beacon-chain/rpc/beacon/server.go +++ b/beacon-chain/rpc/beacon/server.go @@ -41,7 +41,7 @@ type Server struct { AttestationNotifier operation.Notifier Broadcaster p2p.Broadcaster AttestationsPool attestations.Pool - SlashingsPool *slashings.Pool + SlashingsPool slashings.PoolManager CanonicalStateChan chan *pbp2p.BeaconState ChainStartChan chan time.Time ReceivedAttestationsBuffer chan *ethpb.Attestation diff --git a/beacon-chain/rpc/beacon/validators_test.go b/beacon-chain/rpc/beacon/validators_test.go index 92b1cf3c236e..751629a54599 100644 --- a/beacon-chain/rpc/beacon/validators_test.go +++ b/beacon-chain/rpc/beacon/validators_test.go @@ -1556,8 +1556,10 @@ func TestServer_GetValidatorParticipation_CurrentAndPrevEpoch(t *testing.T) { } func TestServer_GetValidatorParticipation_OrphanedUntilGenesis(t *testing.T) { - beaconDB := dbTest.SetupDB(t) + helpers.ClearCache() + params.UseMainnetConfig() + beaconDB := dbTest.SetupDB(t) ctx := context.Background() validatorCount := uint64(100) @@ -1644,6 +1646,10 @@ func TestGetValidatorPerformance_Syncing(t *testing.T) { } func TestGetValidatorPerformance_OK(t *testing.T) { + helpers.ClearCache() + params.UseMinimalConfig() + defer params.UseMainnetConfig() + ctx := context.Background() epoch := types.Epoch(1) headState, err := testutil.NewBeaconState() @@ -1916,9 +1922,9 @@ func TestServer_GetIndividualVotes_RequestFutureSlot(t *testing.T) { } func TestServer_GetIndividualVotes_ValidatorsDontExist(t *testing.T) { - params.UseMinimalConfig() defer params.UseMainnetConfig() + beaconDB := dbTest.SetupDB(t) ctx := context.Background() diff --git a/beacon-chain/rpc/beaconv1/BUILD.bazel b/beacon-chain/rpc/beaconv1/BUILD.bazel index 8043b6b50665..e5c59dbb1ae3 100644 --- a/beacon-chain/rpc/beaconv1/BUILD.bazel +++ b/beacon-chain/rpc/beaconv1/BUILD.bazel @@ -53,6 +53,7 @@ go_test( srcs = [ "blocks_test.go", "config_test.go", + "pool_test.go", "server_test.go", "state_test.go", ], @@ -61,6 +62,7 @@ go_test( "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/testing:go_default_library", + "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/powchain/testing:go_default_library", "//beacon-chain/state/stategen:go_default_library", diff --git a/beacon-chain/rpc/beaconv1/pool.go b/beacon-chain/rpc/beaconv1/pool.go index 1efbfed08dfa..94a5db6731ff 100644 --- a/beacon-chain/rpc/beaconv1/pool.go +++ b/beacon-chain/rpc/beaconv1/pool.go @@ -5,8 +5,11 @@ import ( "errors" ptypes "github.com/gogo/protobuf/types" - ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1" + "github.com/prysmaticlabs/prysm/proto/migration" + "go.opencensus.io/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // ListPoolAttestations retrieves attestations known by the node but @@ -24,7 +27,23 @@ func (bs *Server) SubmitAttestation(ctx context.Context, req *ethpb.Attestation) // ListPoolAttesterSlashings retrieves attester slashings known by the node but // not necessarily incorporated into any block. func (bs *Server) ListPoolAttesterSlashings(ctx context.Context, req *ptypes.Empty) (*ethpb.AttesterSlashingsPoolResponse, error) { - return nil, errors.New("unimplemented") + ctx, span := trace.StartSpan(ctx, "beaconv1.ListPoolAttesterSlashings") + defer span.End() + + headState, err := bs.ChainInfoFetcher.HeadState(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not get head state: %v", err) + } + sourceSlashings := bs.SlashingsPool.PendingAttesterSlashings(ctx, headState, true) + + slashings := make([]*ethpb.AttesterSlashing, len(sourceSlashings)) + for i, s := range sourceSlashings { + slashings[i] = migration.V1Alpha1AttSlashingToV1(s) + } + + return ðpb.AttesterSlashingsPoolResponse{ + Data: slashings, + }, nil } // SubmitAttesterSlashing submits AttesterSlashing object to node's pool and diff --git a/beacon-chain/rpc/beaconv1/pool_test.go b/beacon-chain/rpc/beaconv1/pool_test.go new file mode 100644 index 000000000000..71a1da393777 --- /dev/null +++ b/beacon-chain/rpc/beaconv1/pool_test.go @@ -0,0 +1,104 @@ +package beaconv1 + +import ( + "context" + "testing" + + "github.com/gogo/protobuf/types" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + chainMock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" + "github.com/prysmaticlabs/prysm/proto/migration" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/testutil" + "github.com/prysmaticlabs/prysm/shared/testutil/assert" + "github.com/prysmaticlabs/prysm/shared/testutil/require" +) + +func TestListPoolAttesterSlashings(t *testing.T) { + state, err := testutil.NewBeaconState() + require.NoError(t, err) + slashing1 := ð.AttesterSlashing{ + Attestation_1: ð.IndexedAttestation{ + AttestingIndices: []uint64{1, 10}, + Data: ð.AttestationData{ + Slot: 1, + CommitteeIndex: 1, + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot1"), 32), + Source: ð.Checkpoint{ + Epoch: 1, + Root: bytesutil.PadTo([]byte("sourceroot1"), 32), + }, + Target: ð.Checkpoint{ + Epoch: 10, + Root: bytesutil.PadTo([]byte("targetroot1"), 32), + }, + }, + Signature: bytesutil.PadTo([]byte("signature1"), 96), + }, + Attestation_2: ð.IndexedAttestation{ + AttestingIndices: []uint64{2, 20}, + Data: ð.AttestationData{ + Slot: 2, + CommitteeIndex: 2, + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot2"), 32), + Source: ð.Checkpoint{ + Epoch: 2, + Root: bytesutil.PadTo([]byte("sourceroot2"), 32), + }, + Target: ð.Checkpoint{ + Epoch: 20, + Root: bytesutil.PadTo([]byte("targetroot2"), 32), + }, + }, + Signature: bytesutil.PadTo([]byte("signature2"), 96), + }, + } + slashing2 := ð.AttesterSlashing{ + Attestation_1: ð.IndexedAttestation{ + AttestingIndices: []uint64{3, 30}, + Data: ð.AttestationData{ + Slot: 3, + CommitteeIndex: 3, + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot3"), 32), + Source: ð.Checkpoint{ + Epoch: 3, + Root: bytesutil.PadTo([]byte("sourceroot3"), 32), + }, + Target: ð.Checkpoint{ + Epoch: 30, + Root: bytesutil.PadTo([]byte("targetroot3"), 32), + }, + }, + Signature: bytesutil.PadTo([]byte("signature3"), 96), + }, + Attestation_2: ð.IndexedAttestation{ + AttestingIndices: []uint64{4, 40}, + Data: ð.AttestationData{ + Slot: 4, + CommitteeIndex: 4, + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot4"), 32), + Source: ð.Checkpoint{ + Epoch: 4, + Root: bytesutil.PadTo([]byte("sourceroot4"), 32), + }, + Target: ð.Checkpoint{ + Epoch: 40, + Root: bytesutil.PadTo([]byte("targetroot4"), 32), + }, + }, + Signature: bytesutil.PadTo([]byte("signature4"), 96), + }, + } + + s := &Server{ + ChainInfoFetcher: &chainMock.ChainService{State: state}, + SlashingsPool: &slashings.PoolMock{PendingAttSlashings: []*eth.AttesterSlashing{slashing1, slashing2}}, + } + + resp, err := s.ListPoolAttesterSlashings(context.Background(), &types.Empty{}) + require.NoError(t, err) + require.Equal(t, 2, len(resp.Data)) + assert.DeepEqual(t, migration.V1Alpha1AttSlashingToV1(slashing1), resp.Data[0]) + assert.DeepEqual(t, migration.V1Alpha1AttSlashingToV1(slashing2), resp.Data[1]) +} diff --git a/beacon-chain/rpc/beaconv1/server.go b/beacon-chain/rpc/beaconv1/server.go index dc9563f75b8b..ff220591c189 100644 --- a/beacon-chain/rpc/beaconv1/server.go +++ b/beacon-chain/rpc/beaconv1/server.go @@ -39,7 +39,7 @@ type Server struct { AttestationNotifier operation.Notifier Broadcaster p2p.Broadcaster AttestationsPool attestations.Pool - SlashingsPool *slashings.Pool + SlashingsPool slashings.PoolManager CanonicalStateChan chan *pbp2p.BeaconState ChainStartChan chan time.Time StateGenService stategen.StateManager diff --git a/beacon-chain/rpc/node/server.go b/beacon-chain/rpc/node/server.go index 2c8c13227abf..86544978af37 100644 --- a/beacon-chain/rpc/node/server.go +++ b/beacon-chain/rpc/node/server.go @@ -224,9 +224,11 @@ func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.Peers, // StreamBeaconLogs from the beacon node via a gRPC server-side stream. func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaconLogsServer) error { ch := make(chan []byte, ns.StreamLogsBufferSize) - defer close(ch) sub := ns.LogsStreamer.LogsFeed().Subscribe(ch) - defer sub.Unsubscribe() + defer func() { + sub.Unsubscribe() + close(ch) + }() recentLogs := ns.LogsStreamer.GetLastFewLogs() logStrings := make([]string, len(recentLogs)) @@ -247,6 +249,8 @@ func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaco if err := stream.Send(resp); err != nil { return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) } + case err := <-sub.Err(): + return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err) case <-stream.Context().Done(): return status.Error(codes.Canceled, "Context canceled") } diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index bf4e3ecee03b..6043d4893cef 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -71,7 +71,7 @@ type Service struct { enableDebugRPCEndpoints bool attestationsPool attestations.Pool exitPool *voluntaryexits.Pool - slashingsPool *slashings.Pool + slashingsPool slashings.PoolManager syncService chainSync.Checker host string port string @@ -123,7 +123,7 @@ type Config struct { MockEth1Votes bool AttestationsPool attestations.Pool ExitPool *voluntaryexits.Pool - SlashingsPool *slashings.Pool + SlashingsPool slashings.PoolManager SyncService chainSync.Checker Broadcaster p2p.Broadcaster PeersFetcher p2p.PeersProvider @@ -259,7 +259,7 @@ func (s *Service) Start() { } nodeServer := &node.Server{ LogsStreamer: logutil.NewStreamServer(), - StreamLogsBufferSize: 100, // Enough to handle bursts of beacon node logs for gRPC streaming. + StreamLogsBufferSize: 1000, // Enough to handle bursts of beacon node logs for gRPC streaming. BeaconDB: s.beaconDB, Server: s.grpcServer, SyncChecker: s.syncService, diff --git a/beacon-chain/rpc/validator/aggregator_test.go b/beacon-chain/rpc/validator/aggregator_test.go index a476632d802e..e4e9d3ef2676 100644 --- a/beacon-chain/rpc/validator/aggregator_test.go +++ b/beacon-chain/rpc/validator/aggregator_test.go @@ -350,7 +350,7 @@ func TestSubmitAggregateAndProof_PreferOwnAttestation(t *testing.T) { res, err := aggregatorServer.SubmitAggregateSelectionProof(ctx, req) require.NoError(t, err) - assert.DeepEqual(t, att1, res.AggregateAndProof.Aggregate, "Did not receive wanted attestation") + assert.DeepSSZEqual(t, att1, res.AggregateAndProof.Aggregate, "Did not receive wanted attestation") } func TestSubmitAggregateAndProof_SelectsMostBitsWhenOwnAttestationNotPresent(t *testing.T) { @@ -401,7 +401,7 @@ func TestSubmitAggregateAndProof_SelectsMostBitsWhenOwnAttestationNotPresent(t * res, err := aggregatorServer.SubmitAggregateSelectionProof(ctx, req) require.NoError(t, err) - assert.DeepEqual(t, att1, res.AggregateAndProof.Aggregate, "Did not receive wanted attestation") + assert.DeepSSZEqual(t, att1, res.AggregateAndProof.Aggregate, "Did not receive wanted attestation") } func TestSubmitSignedAggregateSelectionProof_ZeroHashesSignatures(t *testing.T) { diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index 06b45e6307dd..0ecb74cce62a 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -55,7 +55,7 @@ type Server struct { BlockNotifier blockfeed.Notifier P2P p2p.Broadcaster AttPool attestations.Pool - SlashingsPool *slashings.Pool + SlashingsPool slashings.PoolManager ExitPool *voluntaryexits.Pool BlockReceiver blockchain.BlockReceiver MockEth1Votes bool diff --git a/beacon-chain/state/stategen/epoch_boundary_state_cache_test.go b/beacon-chain/state/stategen/epoch_boundary_state_cache_test.go index 822d2b8a7fc3..a88d3f1d30b2 100644 --- a/beacon-chain/state/stategen/epoch_boundary_state_cache_test.go +++ b/beacon-chain/state/stategen/epoch_boundary_state_cache_test.go @@ -35,7 +35,7 @@ func TestEpochBoundaryStateCache_CanSave(t *testing.T) { got, exists, err = e.getByRoot([32]byte{'a'}) require.NoError(t, err) assert.Equal(t, true, exists, "Should exist") - assert.DeepEqual(t, s.InnerStateUnsafe(), got.state.InnerStateUnsafe(), "Should have the same state") + assert.DeepSSZEqual(t, s.InnerStateUnsafe(), got.state.InnerStateUnsafe(), "Should have the same state") got, exists, err = e.getBySlot(2) require.NoError(t, err) @@ -45,7 +45,7 @@ func TestEpochBoundaryStateCache_CanSave(t *testing.T) { got, exists, err = e.getBySlot(1) require.NoError(t, err) assert.Equal(t, true, exists, "Should exist") - assert.DeepEqual(t, s.InnerStateUnsafe(), got.state.InnerStateUnsafe(), "Should have the same state") + assert.DeepSSZEqual(t, s.InnerStateUnsafe(), got.state.InnerStateUnsafe(), "Should have the same state") } func TestEpochBoundaryStateCache_CanTrim(t *testing.T) { diff --git a/beacon-chain/state/stategen/getter_test.go b/beacon-chain/state/stategen/getter_test.go index b58b2f434304..b6e4a50931f6 100644 --- a/beacon-chain/state/stategen/getter_test.go +++ b/beacon-chain/state/stategen/getter_test.go @@ -4,8 +4,8 @@ import ( "context" "testing" - "github.com/gogo/protobuf/proto" types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -36,9 +36,7 @@ func TestStateByRoot_ColdState(t *testing.T) { require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, bRoot)) loadedState, err := service.StateByRoot(ctx, bRoot) require.NoError(t, err) - if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) { - t.Error("Did not correctly save state") - } + require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) } func TestStateByRoot_HotStateUsingEpochBoundaryCacheNoReplay(t *testing.T) { @@ -97,9 +95,7 @@ func TestStateByRoot_HotStateCached(t *testing.T) { loadedState, err := service.StateByRoot(ctx, r) require.NoError(t, err) - if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) { - t.Error("Did not correctly cache state") - } + require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) } func TestStateByRootInitialSync_UseEpochStateCache(t *testing.T) { @@ -133,9 +129,7 @@ func TestStateByRootInitialSync_UseCache(t *testing.T) { loadedState, err := service.StateByRootInitialSync(ctx, r) require.NoError(t, err) - if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) { - t.Error("Did not correctly cache state") - } + require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) if service.hotStateCache.has(r) { t.Error("Hot state cache was not invalidated") } @@ -234,10 +228,7 @@ func TestLoadeStateByRoot_Cached(t *testing.T) { // This tests where hot state was already cached. loadedState, err := service.loadStateByRoot(ctx, r) require.NoError(t, err) - - if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) { - t.Error("Did not correctly cache state") - } + require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) } func TestLoadeStateByRoot_FinalizedState(t *testing.T) { @@ -261,10 +252,7 @@ func TestLoadeStateByRoot_FinalizedState(t *testing.T) { // This tests where hot state was already cached. loadedState, err := service.loadStateByRoot(ctx, gRoot) require.NoError(t, err) - - if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) { - t.Error("Did not correctly retrieve finalized state") - } + require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) } func TestLoadeStateByRoot_EpochBoundaryStateCanProcess(t *testing.T) { diff --git a/beacon-chain/state/stategen/migrate_test.go b/beacon-chain/state/stategen/migrate_test.go index 079d58c78e40..808f9ec84155 100644 --- a/beacon-chain/state/stategen/migrate_test.go +++ b/beacon-chain/state/stategen/migrate_test.go @@ -51,7 +51,7 @@ func TestMigrateToCold_HappyPath(t *testing.T) { gotState, err := service.beaconDB.State(ctx, fRoot) require.NoError(t, err) - assert.DeepEqual(t, beaconState.InnerStateUnsafe(), gotState.InnerStateUnsafe(), "Did not save state") + assert.DeepSSZEqual(t, beaconState.InnerStateUnsafe(), gotState.InnerStateUnsafe(), "Did not save state") gotRoot := service.beaconDB.ArchivedPointRoot(ctx, stateSlot/service.slotsPerArchivedPoint) assert.Equal(t, fRoot, gotRoot, "Did not save archived root") lastIndex, err := service.beaconDB.LastArchivedSlot(ctx) diff --git a/beacon-chain/state/stategen/replay_test.go b/beacon-chain/state/stategen/replay_test.go index b9cee22bc0c4..befbb0ae6ef3 100644 --- a/beacon-chain/state/stategen/replay_test.go +++ b/beacon-chain/state/stategen/replay_test.go @@ -364,7 +364,7 @@ func TestLastSavedState_Genesis(t *testing.T) { savedState, err := s.lastSavedState(ctx, 0) require.NoError(t, err) - require.DeepEqual(t, gState.InnerStateUnsafe(), savedState.InnerStateUnsafe()) + require.DeepSSZEqual(t, gState.InnerStateUnsafe(), savedState.InnerStateUnsafe()) } func TestLastSavedState_CanGet(t *testing.T) { @@ -394,9 +394,7 @@ func TestLastSavedState_CanGet(t *testing.T) { savedState, err := s.lastSavedState(ctx, s.finalizedInfo.slot+100) require.NoError(t, err) - if !proto.Equal(st.InnerStateUnsafe(), savedState.InnerStateUnsafe()) { - t.Error("Did not save correct root") - } + require.DeepSSZEqual(t, st.InnerStateUnsafe(), savedState.InnerStateUnsafe()) } func TestLastSavedState_NoSavedBlockState(t *testing.T) { diff --git a/beacon-chain/state/stategen/service_test.go b/beacon-chain/state/stategen/service_test.go index 1a09d008d265..755b765d5118 100644 --- a/beacon-chain/state/stategen/service_test.go +++ b/beacon-chain/state/stategen/service_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/shared/params" @@ -30,10 +29,7 @@ func TestResume(t *testing.T) { resumeState, err := service.Resume(ctx) require.NoError(t, err) - - if !proto.Equal(beaconState.InnerStateUnsafe(), resumeState.InnerStateUnsafe()) { - t.Error("Diff saved state") - } + require.DeepSSZEqual(t, beaconState.InnerStateUnsafe(), resumeState.InnerStateUnsafe()) assert.Equal(t, params.BeaconConfig().SlotsPerEpoch, service.finalizedInfo.slot, "Did not get watned slot") assert.Equal(t, service.finalizedInfo.root, root, "Did not get wanted root") assert.NotNil(t, service.finalizedState(), "Wanted a non nil finalized state") diff --git a/beacon-chain/state/types_test.go b/beacon-chain/state/types_test.go index 05dfffdb7b16..02d92349d548 100644 --- a/beacon-chain/state/types_test.go +++ b/beacon-chain/state/types_test.go @@ -27,9 +27,7 @@ func TestBeaconState_ProtoBeaconStateCompatibility(t *testing.T) { cloned, ok := proto.Clone(genesis).(*pb.BeaconState) assert.Equal(t, true, ok, "Object is not of type *pb.BeaconState") custom := customState.CloneInnerState() - if !proto.Equal(cloned, custom) { - t.Fatal("Cloned states did not match") - } + assert.DeepSSZEqual(t, cloned, custom) r1, err := customState.HashTreeRoot(ctx) require.NoError(t, err) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index c512226fda91..6b35841c4e50 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -54,7 +54,7 @@ type Config struct { DB db.NoHeadAccessDatabase AttPool attestations.Pool ExitPool *voluntaryexits.Pool - SlashingPool *slashings.Pool + SlashingPool slashings.PoolManager Chain blockchainService InitialSync Checker StateNotifier statefeed.Notifier @@ -84,7 +84,7 @@ type Service struct { db db.NoHeadAccessDatabase attPool attestations.Pool exitPool *voluntaryexits.Pool - slashingPool *slashings.Pool + slashingPool slashings.PoolManager chain blockchainService slotToPendingBlocks *gcache.Cache seenPendingBlocks map[[32]byte]bool diff --git a/proto/migration/migration.go b/proto/migration/migration.go index 17555cdfdb86..7876ae970ef3 100644 --- a/proto/migration/migration.go +++ b/proto/migration/migration.go @@ -50,3 +50,46 @@ func V1ToV1Alpha1Block(alphaBlk *ethpb.SignedBeaconBlock) (*ethpb_alpha.SignedBe } return v1alpha1Block, nil } + +// V1Alpha1IndexedAttToV1 converts a v1alpha1 indexed attestation to v1. +func V1Alpha1IndexedAttToV1(v1alpha1Att *ethpb_alpha.IndexedAttestation) *ethpb.IndexedAttestation { + if v1alpha1Att == nil { + return ðpb.IndexedAttestation{} + } + return ðpb.IndexedAttestation{ + AttestingIndices: v1alpha1Att.AttestingIndices, + Data: V1Alpha1AttDataToV1(v1alpha1Att.Data), + Signature: v1alpha1Att.Signature, + } +} + +// V1Alpha1AttDataToV1 converts a v1alpha1 attestation data to v1. +func V1Alpha1AttDataToV1(v1alpha1AttData *ethpb_alpha.AttestationData) *ethpb.AttestationData { + if v1alpha1AttData == nil || v1alpha1AttData.Source == nil || v1alpha1AttData.Target == nil { + return ðpb.AttestationData{} + } + return ðpb.AttestationData{ + Slot: v1alpha1AttData.Slot, + CommitteeIndex: v1alpha1AttData.CommitteeIndex, + BeaconBlockRoot: v1alpha1AttData.BeaconBlockRoot, + Source: ðpb.Checkpoint{ + Root: v1alpha1AttData.Source.Root, + Epoch: v1alpha1AttData.Source.Epoch, + }, + Target: ðpb.Checkpoint{ + Root: v1alpha1AttData.Target.Root, + Epoch: v1alpha1AttData.Target.Epoch, + }, + } +} + +// V1Alpha1AttSlashingToV1 converts a v1alpha1 attester slashing to v1. +func V1Alpha1AttSlashingToV1(v1alpha1Slashing *ethpb_alpha.AttesterSlashing) *ethpb.AttesterSlashing { + if v1alpha1Slashing == nil { + return ðpb.AttesterSlashing{} + } + return ðpb.AttesterSlashing{ + Attestation_1: V1Alpha1IndexedAttToV1(v1alpha1Slashing.Attestation_1), + Attestation_2: V1Alpha1IndexedAttToV1(v1alpha1Slashing.Attestation_2), + } +} diff --git a/validator/rpc/health.go b/validator/rpc/health.go index 8c9d02854a6b..6d0e0725bfdb 100644 --- a/validator/rpc/health.go +++ b/validator/rpc/health.go @@ -82,9 +82,11 @@ func (s *Server) StreamBeaconLogs(req *ptypes.Empty, stream pb.Health_StreamBeac // StreamValidatorLogs from the validator client via a gRPC server-side stream. func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamValidatorLogsServer) error { ch := make(chan []byte, s.streamLogsBufferSize) - defer close(ch) sub := s.logsStreamer.LogsFeed().Subscribe(ch) - defer sub.Unsubscribe() + defer func() { + sub.Unsubscribe() + defer close(ch) + }() recentLogs := s.logsStreamer.GetLastFewLogs() logStrings := make([]string, len(recentLogs)) @@ -107,6 +109,8 @@ func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamVal } case <-s.ctx.Done(): return status.Error(codes.Canceled, "Context canceled") + case err := <-sub.Err(): + return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err) case <-stream.Context().Done(): return status.Error(codes.Canceled, "Context canceled") }