Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Engine Proxy into E2E #10808

Merged
merged 8 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions network/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,7 @@ func (e Endpoint) HttpClient() *http.Client {
if e.Auth.Method != authorization.Bearer {
return http.DefaultClient
}
authTransport := &jwtTransport{
underlyingTransport: http.DefaultTransport,
jwtSecret: []byte(e.Auth.Value),
}
return &http.Client{
Timeout: DefaultRPCHTTPTimeout,
Transport: authTransport,
}
return NewHttpClientWithSecret(e.Auth.Value)
}

// Equals compares two authorization data objects for equality.
Expand Down Expand Up @@ -70,3 +63,16 @@ func Method(auth string) authorization.AuthorizationMethod {
}
return authorization.None
}

// NewHttpClientWithSecret returns a http client that utilizes
// jwt authentication.
func NewHttpClientWithSecret(secret string) *http.Client {
authTransport := &jwtTransport{
underlyingTransport: http.DefaultTransport,
jwtSecret: []byte(secret),
}
return &http.Client{
Timeout: DefaultRPCHTTPTimeout,
Transport: authTransport,
}
}
20 changes: 17 additions & 3 deletions testing/endtoend/component_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type componentHandler struct {
web3Signer e2etypes.ComponentRunner
bootnode e2etypes.ComponentRunner
eth1Miner e2etypes.ComponentRunner
eth1Proxy e2etypes.ComponentRunner
eth1Nodes e2etypes.MultipleComponentRunners
beaconNodes e2etypes.MultipleComponentRunners
validatorNodes e2etypes.MultipleComponentRunners
Expand Down Expand Up @@ -132,10 +133,23 @@ func (c *componentHandler) setup() {
if config.TestCheckpointSync {
appendDebugEndpoints(config)
}
// Proxies
proxies := eth1.NewProxySet()
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes}); err != nil {
return errors.Wrap(err, "beacon nodes require ETH1 and boot node to run")
}
if err := proxies.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start proxies")
}
return nil
})
c.eth1Proxy = proxies

// Beacon nodes.
beaconNodes := components.NewBeaconNodes(config)
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, bootNode}); err != nil {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode}); err != nil {
return errors.Wrap(err, "beacon nodes require ETH1 and boot node to run")
}
beaconNodes.SetENR(bootNode.ENR())
Expand All @@ -149,7 +163,7 @@ func (c *componentHandler) setup() {
if multiClientActive {
lighthouseNodes = components.NewLighthouseBeaconNodes(config)
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, bootNode, beaconNodes}); err != nil {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode, beaconNodes}); err != nil {
return errors.Wrap(err, "lighthouse beacon nodes require ETH1 and boot node to run")
}
lighthouseNodes.SetENR(bootNode.ENR())
Expand Down Expand Up @@ -197,7 +211,7 @@ func (c *componentHandler) setup() {
func (c *componentHandler) required() []e2etypes.ComponentRunner {
multiClientActive := e2e.TestParams.LighthouseBeaconNodeCount > 0
requiredComponents := []e2etypes.ComponentRunner{
c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes,
c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes, c.eth1Proxy,
}
if multiClientActive {
requiredComponents = append(requiredComponents, []e2etypes.ComponentRunner{c.keygen, c.lighthouseBeaconNodes, c.lighthouseValidatorNodes}...)
Expand Down
2 changes: 1 addition & 1 deletion testing/endtoend/components/beacon_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%s", cmdshared.LogFileName.Name, stdOutFile.Name()),
fmt.Sprintf("--%s=%s", flags.DepositContractFlag.Name, e2e.TestParams.ContractAddress.Hex()),
fmt.Sprintf("--%s=%d", flags.RPCPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeRPCPort+index),
fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1AuthRPCPort+index),
fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1ProxyPort+index),
fmt.Sprintf("--%s=%s", flags.ExecutionJWTSecretFlag.Name, jwtPath),
fmt.Sprintf("--%s=%d", flags.MinSyncPeers.Name, 1),
fmt.Sprintf("--%s=%d", cmdshared.P2PUDPPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeUDPPort+index),
Expand Down
2 changes: 2 additions & 0 deletions testing/endtoend/components/eth1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"miner.go",
"node.go",
"node_set.go",
"proxy.go",
"transactions.go",
],
importpath = "github.com/prysmaticlabs/prysm/testing/endtoend/components/eth1",
Expand All @@ -20,6 +21,7 @@ go_library(
"//testing/endtoend/helpers:go_default_library",
"//testing/endtoend/params:go_default_library",
"//testing/endtoend/types:go_default_library",
"//testing/middleware/engine-api-proxy:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/keystore:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions testing/endtoend/components/eth1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ const timeGapPerMiningTX = 250 * time.Millisecond

var _ e2etypes.ComponentRunner = (*NodeSet)(nil)
var _ e2etypes.MultipleComponentRunners = (*NodeSet)(nil)
var _ e2etypes.MultipleComponentRunners = (*ProxySet)(nil)
var _ e2etypes.ComponentRunner = (*Miner)(nil)
var _ e2etypes.ComponentRunner = (*Node)(nil)
var _ e2etypes.ComponentRunner = (*Proxy)(nil)

// WaitForBlocks waits for a certain amount of blocks to be mined by the ETH1 chain before returning.
func WaitForBlocks(web3 *ethclient.Client, keystore *keystore.Key, blocksToWait uint64) error {
Expand Down
205 changes: 205 additions & 0 deletions testing/endtoend/components/eth1/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package eth1

import (
"context"
"encoding/hex"
"fmt"
"os"
"path"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/io/file"
"github.com/prysmaticlabs/prysm/testing/endtoend/helpers"
e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params"
e2etypes "github.com/prysmaticlabs/prysm/testing/endtoend/types"
proxy "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy"
log "github.com/sirupsen/logrus"
)

// ProxySet represents a set of proxies for the engine-api.
type ProxySet struct {
e2etypes.ComponentRunner
started chan struct{}
proxies []e2etypes.ComponentRunner
}

// NewProxySet creates and returns a set of engine-api proxies.
func NewProxySet() *ProxySet {
return &ProxySet{
started: make(chan struct{}, 1),
}
}

// Start starts all the proxies in set.
func (s *ProxySet) Start(ctx context.Context) error {

totalNodeCount := e2e.TestParams.BeaconNodeCount + e2e.TestParams.LighthouseBeaconNodeCount
nodes := make([]e2etypes.ComponentRunner, totalNodeCount)
for i := 0; i < totalNodeCount; i++ {
// We start indexing nodes from 1 because the miner has an implicit 0 index.
nodes[i] = NewProxy(i)
}
s.proxies = nodes

// Wait for all nodes to finish their job (blocking).
// Once nodes are ready passed in handler function will be called.
return helpers.WaitOnNodes(ctx, nodes, func() {
// All nodes started, close channel, so that all services waiting on a set, can proceed.
close(s.started)
})
}

// Started checks whether proxy set is started and all proxies are ready to be queried.
func (s *ProxySet) Started() <-chan struct{} {
return s.started
}

// Pause pauses the component and its underlying process.
func (s *ProxySet) Pause() error {
for _, n := range s.proxies {
if err := n.Pause(); err != nil {
return err
}
}
return nil
}

// Resume resumes the component and its underlying process.
func (s *ProxySet) Resume() error {
for _, n := range s.proxies {
if err := n.Resume(); err != nil {
return err
}
}
return nil
}

// Stop stops the component and its underlying process.
func (s *ProxySet) Stop() error {
for _, n := range s.proxies {
if err := n.Stop(); err != nil {
return err
}
}
return nil
}

// PauseAtIndex pauses the component and its underlying process at the desired index.
func (s *ProxySet) PauseAtIndex(i int) error {
if i >= len(s.proxies) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i].Pause()
}

// ResumeAtIndex resumes the component and its underlying process at the desired index.
func (s *ProxySet) ResumeAtIndex(i int) error {
if i >= len(s.proxies) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i].Resume()
}

// StopAtIndex stops the component and its underlying process at the desired index.
func (s *ProxySet) StopAtIndex(i int) error {
if i >= len(s.proxies) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i].Stop()
}

// Proxy represents an engine-api proxy.
type Proxy struct {
e2etypes.ComponentRunner
started chan struct{}
index int
cancel func()
}

// NewProxy creates and returns an engine-api proxy.
func NewProxy(index int) *Proxy {
return &Proxy{
started: make(chan struct{}, 1),
index: index,
}
}

// Start runs a proxy.
func (node *Proxy) Start(ctx context.Context) error {
file, err := os.Create(path.Join(e2e.TestParams.LogPath, "eth1_proxy_"+strconv.Itoa(node.index)+".log"))
if err != nil {
return err
}
jwtPath := path.Join(e2e.TestParams.TestPath, "eth1data/"+strconv.Itoa(node.index)+"/")
if node.index == 0 {
jwtPath = path.Join(e2e.TestParams.TestPath, "eth1data/miner/")
}
jwtPath = path.Join(jwtPath, "geth/jwtsecret")
secret, err := parseJWTSecretFromFile(jwtPath)
if err != nil {
return err
}
opts := []proxy.Option{
proxy.WithDestinationAddress(fmt.Sprintf("http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+node.index)),
proxy.WithPort(e2e.TestParams.Ports.Eth1ProxyPort + node.index),
proxy.WithLogger(log.New()),
proxy.WithLogFile(file),
proxy.WithJwtSecret(string(secret)),
}
nProxy, err := proxy.New(opts...)
if err != nil {
return err
}
log.Infof("Starting eth1 proxy %d with port: %d and file %s", node.index, e2e.TestParams.Ports.Eth1ProxyPort+node.index, file.Name())

// Set cancel into context.
ctx, cancel := context.WithCancel(ctx)
node.cancel = cancel
// Mark node as ready.
close(node.started)
return nProxy.Start(ctx)
}

// Started checks whether the eth1 proxy is started and ready to be queried.
func (node *Proxy) Started() <-chan struct{} {
return node.started
}

// Pause pauses the component and its underlying process.
func (node *Proxy) Pause() error {
// no-op
return nil
}

// Resume resumes the component and its underlying process.
func (node *Proxy) Resume() error {
// no-op
return nil
}

// Stop kills the component and its underlying process.
func (node *Proxy) Stop() error {
node.cancel()
return nil
}

func parseJWTSecretFromFile(jwtSecretFile string) ([]byte, error) {
enc, err := file.ReadFileAsBytes(jwtSecretFile)
if err != nil {
return nil, err
}
strData := strings.TrimSpace(string(enc))
if len(strData) == 0 {
return nil, fmt.Errorf("provided JWT secret in file %s cannot be empty", jwtSecretFile)
}
secret, err := hex.DecodeString(strings.TrimPrefix(strData, "0x"))
if err != nil {
return nil, err
}
if len(secret) < 32 {
return nil, errors.New("provided JWT secret should be a hex string of at least 32 bytes")
}
return secret, nil
}
2 changes: 1 addition & 1 deletion testing/endtoend/components/lighthouse_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (node *LighthouseBeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--http-port=%d", e2e.TestParams.Ports.LighthouseBeaconNodeHTTPPort+index),
fmt.Sprintf("--target-peers=%d", 10),
fmt.Sprintf("--eth1-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1RPCPort+prysmNodeCount+index),
fmt.Sprintf("--execution-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+prysmNodeCount+index),
fmt.Sprintf("--execution-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1ProxyPort+prysmNodeCount+index),
fmt.Sprintf("--jwt-secrets=%s", jwtPath),
fmt.Sprintf("--boot-nodes=%s", node.enr),
fmt.Sprintf("--metrics-port=%d", e2e.TestParams.Ports.LighthouseBeaconNodeMetricsPort+index),
Expand Down
14 changes: 14 additions & 0 deletions testing/endtoend/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ func (r *testRunner) testCheckpointSync(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{ethNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
proxyNode := eth1.NewProxy(i)
g.Go(func() error {
return proxyNode.Start(ctx)
})
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{proxyNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}

client, err := beacon.NewClient(bnAPI)
if err != nil {
Expand Down Expand Up @@ -323,6 +330,13 @@ func (r *testRunner) testBeaconChainSync(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{ethNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
proxyNode := eth1.NewProxy(index)
g.Go(func() error {
return proxyNode.Start(ctx)
})
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{proxyNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
syncBeaconNode := components.NewBeaconNode(config, index, bootnodeEnr)
g.Go(func() error {
return syncBeaconNode.Start(ctx)
Expand Down
7 changes: 7 additions & 0 deletions testing/endtoend/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ports struct {
Eth1RPCPort int
Eth1AuthRPCPort int
Eth1WSPort int
Eth1ProxyPort int
PrysmBeaconNodeRPCPort int
PrysmBeaconNodeUDPPort int
PrysmBeaconNodeTCPPort int
Expand Down Expand Up @@ -89,6 +90,7 @@ const (
Eth1RPCPort = Eth1Port + portSpan
Eth1WSPort = Eth1Port + 2*portSpan
Eth1AuthRPCPort = Eth1Port + 3*portSpan
Eth1ProxyPort = Eth1Port + 4*portSpan

PrysmBeaconNodeRPCPort = 4150
PrysmBeaconNodeUDPPort = PrysmBeaconNodeRPCPort + portSpan
Expand Down Expand Up @@ -244,6 +246,10 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR
if err != nil {
return err
}
eth1ProxyPort, err := port(Eth1ProxyPort, shardCount, shardIndex, existingRegistrations)
if err != nil {
return err
}
beaconNodeRPCPort, err := port(PrysmBeaconNodeRPCPort, shardCount, shardIndex, existingRegistrations)
if err != nil {
return err
Expand Down Expand Up @@ -286,6 +292,7 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR
ports.Eth1RPCPort = eth1RPCPort
ports.Eth1AuthRPCPort = eth1AuthPort
ports.Eth1WSPort = eth1WSPort
ports.Eth1ProxyPort = eth1ProxyPort
ports.PrysmBeaconNodeRPCPort = beaconNodeRPCPort
ports.PrysmBeaconNodeUDPPort = beaconNodeUDPPort
ports.PrysmBeaconNodeTCPPort = beaconNodeTCPPort
Expand Down
Loading