Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eth1 connections #10073

Merged
merged 11 commits into from
Jan 14, 2022
Merged
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func (b *BeaconNode) registerRPCService() error {
SlashingChecker: slasherService,
SyncCommitteeObjectPool: b.syncCommitteePool,
POWChainService: web3Service,
POWChainInfoFetcher: web3Service,
ChainStartFetcher: chainStartFetcher,
MockEth1Votes: mockEth1DataVotes,
SyncService: syncService,
Expand Down
35 changes: 35 additions & 0 deletions beacon-chain/powchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type ChainStartFetcher interface {
type ChainInfoFetcher interface {
Eth2GenesisPowchainInfo() (uint64, *big.Int)
IsConnectedToETH1() bool
CurrentETH1Endpoint() string
CurrentETH1ConnectionError() error
ETH1Endpoints() []string
ETH1ConnectionErrors() []error
}

// POWBlockFetcher defines a struct that can retrieve mainchain blocks.
Expand Down Expand Up @@ -324,6 +328,37 @@ func (s *Service) IsConnectedToETH1() bool {
return s.connectedETH1
}

// CurrentETH1Endpoint returns the URL of the current ETH1 endpoint.
func (s *Service) CurrentETH1Endpoint() string {
return s.cfg.currHttpEndpoint.Url
}

// CurrentETH1ConnectionError returns the error (if any) of the current connection.
func (s *Service) CurrentETH1ConnectionError() error {
_, _, err := s.dialETH1Nodes(s.cfg.currHttpEndpoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clients should be closed after we dial them otherwise there is a resource leak and the connection is
kept active.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, totally missed this. thanks for addressing!

return err
}

// ETH1Endpoints returns the slice of HTTP endpoint URLs (default is 0th element).
func (s *Service) ETH1Endpoints() []string {
var eps []string
for _, ep := range s.cfg.httpEndpoints {
eps = append(eps, ep.Url)
}
return eps
}

// ETH1ConnectionErrors returns a slice of errors for each HTTP endpoint. An error
// of nil means the connection was successful.
func (s *Service) ETH1ConnectionErrors() []error {
var errs []error
for _, ep := range s.cfg.httpEndpoints {
_, _, err := s.dialETH1Nodes(ep)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing applies here

errs = append(errs, err)
}
return errs
}

// DepositRoot returns the Merkle root of the latest deposit trie
// from the ETH1.0 deposit contract.
func (s *Service) DepositRoot() [32]byte {
Expand Down
26 changes: 26 additions & 0 deletions beacon-chain/powchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,29 @@ func TestTimestampIsChecked(t *testing.T) {
timestamp = uint64(time.Now().Add(-eth1Threshold).Add(-1 * time.Minute).Unix())
assert.Equal(t, true, eth1HeadIsBehind(timestamp))
}

func TestETH1Endpoints(t *testing.T) {
firstEndpoint := "A"
secondEndpoint := "B"
endpoints := []string{firstEndpoint, secondEndpoint}

testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)

mbs := &mockBSUpdater{}
s1, err := NewService(context.Background(),
WithHttpEndpoints(endpoints),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithBeaconNodeStatsUpdater(mbs),
)
s1.cfg.beaconNodeStatsUpdater = mbs
require.NoError(t, err)

// Check default endpoint is set to current.
assert.Equal(t, firstEndpoint, s1.CurrentETH1Endpoint(), "Unexpected http endpoint")

// Check endpoints are all present.
assert.DeepSSZEqual(t, endpoints, s1.ETH1Endpoints(), "Unexpected http endpoint slice")
}
20 changes: 20 additions & 0 deletions beacon-chain/powchain/testing/mock_powchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type POWChain struct {
Eth1Data *ethpb.Eth1Data
GenesisEth1Block *big.Int
GenesisState state.BeaconState
CurrEndpoint string
CurrError error
Endpoints []string
Errors []error
}

// GenesisTime represents a static past date - JAN 01 2000.
Expand Down Expand Up @@ -134,6 +138,22 @@ func (_ *POWChain) IsConnectedToETH1() bool {
return true
}

func (m *POWChain) CurrentETH1Endpoint() string {
return m.CurrEndpoint
}

func (m *POWChain) CurrentETH1ConnectionError() error {
return m.CurrError
}

func (m *POWChain) ETH1Endpoints() []string {
return m.Endpoints
}

func (m *POWChain) ETH1ConnectionErrors() []error {
return m.Errors
}

// RPCClient defines the mock rpc client.
type RPCClient struct {
Backend *backends.SimulatedBackend
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/prysm/v1alpha1/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/sync:go_default_library",
"//io/logs:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
Expand All @@ -33,6 +34,7 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/rpc/testutil:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
Expand Down
18 changes: 18 additions & 0 deletions beacon-chain/rpc/prysm/v1alpha1/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/io/logs"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
Expand All @@ -39,6 +40,7 @@ type Server struct {
PeerManager p2p.PeerManager
GenesisTimeFetcher blockchain.TimeFetcher
GenesisFetcher blockchain.GenesisFetcher
POWChainInfoFetcher powchain.ChainInfoFetcher
BeaconMonitoringHost string
BeaconMonitoringPort int
}
Expand Down Expand Up @@ -219,6 +221,22 @@ func (ns *Server) ListPeers(ctx context.Context, _ *empty.Empty) (*ethpb.Peers,
}, nil
}

// GetETH1ConnectionStatus gets data about the ETH1 endpoints.
func (ns *Server) GetETH1ConnectionStatus(ctx context.Context, _ *empty.Empty) (*ethpb.ETH1ConnectionStatus, error) {
var errStrs []string
errs := ns.POWChainInfoFetcher.ETH1ConnectionErrors()
// Extract string version of the errors.
for _, err := range errs {
errStrs = append(errStrs, err.Error())
}
return &ethpb.ETH1ConnectionStatus{
CurrentAddress: ns.POWChainInfoFetcher.CurrentETH1Endpoint(),
CurrentConnectionError: ns.POWChainInfoFetcher.CurrentETH1ConnectionError().Error(),
Addresses: ns.POWChainInfoFetcher.ETH1Endpoints(),
ConnectionErrors: errStrs,
}, nil
}

// StreamBeaconLogs from the beacon node via a gRPC server-side stream.
func (ns *Server) StreamBeaconLogs(_ *empty.Empty, stream ethpb.Health_StreamBeaconLogsServer) error {
ch := make(chan []byte, ns.StreamLogsBufferSize)
Expand Down
27 changes: 27 additions & 0 deletions beacon-chain/rpc/prysm/v1alpha1/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -12,6 +13,7 @@ import (
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
mockP2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc/testutil"
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -147,3 +149,28 @@ func TestNodeServer_ListPeers(t *testing.T) {
assert.Equal(t, int(ethpb.PeerDirection_INBOUND), int(res.Peers[0].Direction))
assert.Equal(t, ethpb.PeerDirection_OUTBOUND, res.Peers[1].Direction)
}

func TestNodeServer_GetETH1ConnectionStatus(t *testing.T) {
server := grpc.NewServer()
eps := []string{"foo", "bar"}
errs := []error{fmt.Errorf("error 1"), fmt.Errorf("error 2")}
errStrs := []string{"error 1", "error 2"}
mockFetcher := &testutil.MockPOWChainInfoFetcher{
CurrEndpoint: eps[0],
CurrError: errs[0],
Endpoints: eps,
Errors: errs,
}
ns := &Server{
POWChainInfoFetcher: mockFetcher,
}
ethpb.RegisterNodeServer(server, ns)
reflection.Register(server)

res, err := ns.GetETH1ConnectionStatus(context.Background(), &emptypb.Empty{})
require.NoError(t, err)
assert.Equal(t, eps[0], res.CurrentAddress)
assert.Equal(t, errStrs[0], res.CurrentConnectionError)
assert.DeepSSZEqual(t, eps, res.Addresses)
assert.DeepSSZEqual(t, errStrs, res.ConnectionErrors)
}
2 changes: 2 additions & 0 deletions beacon-chain/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Config struct {
BlockReceiver blockchain.BlockReceiver
POWChainService powchain.Chain
ChainStartFetcher powchain.ChainStartFetcher
POWChainInfoFetcher powchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
GenesisFetcher blockchain.GenesisFetcher
EnableDebugRPCEndpoints bool
Expand Down Expand Up @@ -222,6 +223,7 @@ func (s *Service) Start() {
PeersFetcher: s.cfg.PeersFetcher,
PeerManager: s.cfg.PeerManager,
GenesisFetcher: s.cfg.GenesisFetcher,
POWChainInfoFetcher: s.cfg.POWChainInfoFetcher,
BeaconMonitoringHost: s.cfg.BeaconMonitoringHost,
BeaconMonitoringPort: s.cfg.BeaconMonitoringPort,
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/testutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
testonly = True,
srcs = [
"mock_genesis_timefetcher.go",
"mock_powchain_info_fetcher.go",
"mock_state_fetcher.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testutil",
Expand Down
37 changes: 37 additions & 0 deletions beacon-chain/rpc/testutil/mock_powchain_info_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package testutil

import (
"math/big"
)

// MockGenesisTimeFetcher is a fake implementation of the powchain.ChainInfoFetcher
type MockPOWChainInfoFetcher struct {
CurrEndpoint string
CurrError error
Endpoints []string
Errors []error
}

func (m *MockPOWChainInfoFetcher) Eth2GenesisPowchainInfo() (uint64, *big.Int) {
return uint64(0), &big.Int{}
}

func (m *MockPOWChainInfoFetcher) IsConnectedToETH1() bool {
return true
}

func (m *MockPOWChainInfoFetcher) CurrentETH1Endpoint() string {
return m.CurrEndpoint
}

func (m *MockPOWChainInfoFetcher) CurrentETH1ConnectionError() error {
return m.CurrError
}

func (m *MockPOWChainInfoFetcher) ETH1Endpoints() []string {
return m.Endpoints
}

func (m *MockPOWChainInfoFetcher) ETH1ConnectionErrors() []error {
return m.Errors
}
Loading