diff --git a/network/endpoint.go b/network/endpoint.go index 6047fff923fe..eb001b7016d9 100644 --- a/network/endpoint.go +++ b/network/endpoint.go @@ -31,14 +31,7 @@ func (e Endpoint) HttpClient() *http.Client { if e.Auth.Method != authorization.Bearer { return http.DefaultClient } - authTransport := &jwtTransport{ - underlyingTransport: http.DefaultTransport, - jwtSecret: []byte(e.Auth.Value), - } - return &http.Client{ - Timeout: DefaultRPCHTTPTimeout, - Transport: authTransport, - } + return NewHttpClientWithSecret(e.Auth.Value) } // Equals compares two authorization data objects for equality. @@ -70,3 +63,16 @@ func Method(auth string) authorization.AuthorizationMethod { } return authorization.None } + +// NewHttpClientWithSecret returns a http client that utilizes +// jwt authentication. +func NewHttpClientWithSecret(secret string) *http.Client { + authTransport := &jwtTransport{ + underlyingTransport: http.DefaultTransport, + jwtSecret: []byte(secret), + } + return &http.Client{ + Timeout: DefaultRPCHTTPTimeout, + Transport: authTransport, + } +} diff --git a/testing/endtoend/component_handler_test.go b/testing/endtoend/component_handler_test.go index 3603b2f50bab..0a7cf5dd1309 100644 --- a/testing/endtoend/component_handler_test.go +++ b/testing/endtoend/component_handler_test.go @@ -26,6 +26,7 @@ type componentHandler struct { web3Signer e2etypes.ComponentRunner bootnode e2etypes.ComponentRunner eth1Miner e2etypes.ComponentRunner + eth1Proxy e2etypes.ComponentRunner eth1Nodes e2etypes.MultipleComponentRunners beaconNodes e2etypes.MultipleComponentRunners validatorNodes e2etypes.MultipleComponentRunners @@ -132,10 +133,23 @@ func (c *componentHandler) setup() { if config.TestCheckpointSync { appendDebugEndpoints(config) } + // Proxies + proxies := eth1.NewProxySet() + g.Go(func() error { + if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes}); err != nil { + return errors.Wrap(err, "beacon nodes require ETH1 and boot node to run") + } + if err := proxies.Start(ctx); err != nil { + return errors.Wrap(err, "failed to start proxies") + } + return nil + }) + c.eth1Proxy = proxies + // Beacon nodes. beaconNodes := components.NewBeaconNodes(config) g.Go(func() error { - if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, bootNode}); err != nil { + if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode}); err != nil { return errors.Wrap(err, "beacon nodes require ETH1 and boot node to run") } beaconNodes.SetENR(bootNode.ENR()) @@ -149,7 +163,7 @@ func (c *componentHandler) setup() { if multiClientActive { lighthouseNodes = components.NewLighthouseBeaconNodes(config) g.Go(func() error { - if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, bootNode, beaconNodes}); err != nil { + if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode, beaconNodes}); err != nil { return errors.Wrap(err, "lighthouse beacon nodes require ETH1 and boot node to run") } lighthouseNodes.SetENR(bootNode.ENR()) @@ -197,7 +211,7 @@ func (c *componentHandler) setup() { func (c *componentHandler) required() []e2etypes.ComponentRunner { multiClientActive := e2e.TestParams.LighthouseBeaconNodeCount > 0 requiredComponents := []e2etypes.ComponentRunner{ - c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes, + c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes, c.eth1Proxy, } if multiClientActive { requiredComponents = append(requiredComponents, []e2etypes.ComponentRunner{c.keygen, c.lighthouseBeaconNodes, c.lighthouseValidatorNodes}...) diff --git a/testing/endtoend/components/beacon_node.go b/testing/endtoend/components/beacon_node.go index e38ae94dc085..ca7333325322 100644 --- a/testing/endtoend/components/beacon_node.go +++ b/testing/endtoend/components/beacon_node.go @@ -188,7 +188,7 @@ func (node *BeaconNode) Start(ctx context.Context) error { fmt.Sprintf("--%s=%s", cmdshared.LogFileName.Name, stdOutFile.Name()), fmt.Sprintf("--%s=%s", flags.DepositContractFlag.Name, e2e.TestParams.ContractAddress.Hex()), fmt.Sprintf("--%s=%d", flags.RPCPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeRPCPort+index), - fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1AuthRPCPort+index), + fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1ProxyPort+index), fmt.Sprintf("--%s=%s", flags.ExecutionJWTSecretFlag.Name, jwtPath), fmt.Sprintf("--%s=%d", flags.MinSyncPeers.Name, 1), fmt.Sprintf("--%s=%d", cmdshared.P2PUDPPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeUDPPort+index), diff --git a/testing/endtoend/components/eth1/BUILD.bazel b/testing/endtoend/components/eth1/BUILD.bazel index a277808bb865..aa4a1c4750fc 100644 --- a/testing/endtoend/components/eth1/BUILD.bazel +++ b/testing/endtoend/components/eth1/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "miner.go", "node.go", "node_set.go", + "proxy.go", "transactions.go", ], importpath = "github.com/prysmaticlabs/prysm/testing/endtoend/components/eth1", @@ -20,6 +21,7 @@ go_library( "//testing/endtoend/helpers:go_default_library", "//testing/endtoend/params:go_default_library", "//testing/endtoend/types:go_default_library", + "//testing/middleware/engine-api-proxy:go_default_library", "@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library", "@com_github_ethereum_go_ethereum//accounts/keystore:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", diff --git a/testing/endtoend/components/eth1/helpers.go b/testing/endtoend/components/eth1/helpers.go index 761cd6abf183..e6dce3b3edc8 100644 --- a/testing/endtoend/components/eth1/helpers.go +++ b/testing/endtoend/components/eth1/helpers.go @@ -25,8 +25,10 @@ const timeGapPerMiningTX = 250 * time.Millisecond var _ e2etypes.ComponentRunner = (*NodeSet)(nil) var _ e2etypes.MultipleComponentRunners = (*NodeSet)(nil) +var _ e2etypes.MultipleComponentRunners = (*ProxySet)(nil) var _ e2etypes.ComponentRunner = (*Miner)(nil) var _ e2etypes.ComponentRunner = (*Node)(nil) +var _ e2etypes.ComponentRunner = (*Proxy)(nil) // WaitForBlocks waits for a certain amount of blocks to be mined by the ETH1 chain before returning. func WaitForBlocks(web3 *ethclient.Client, keystore *keystore.Key, blocksToWait uint64) error { diff --git a/testing/endtoend/components/eth1/proxy.go b/testing/endtoend/components/eth1/proxy.go new file mode 100644 index 000000000000..54685e2538e0 --- /dev/null +++ b/testing/endtoend/components/eth1/proxy.go @@ -0,0 +1,205 @@ +package eth1 + +import ( + "context" + "encoding/hex" + "fmt" + "os" + "path" + "strconv" + "strings" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/io/file" + "github.com/prysmaticlabs/prysm/testing/endtoend/helpers" + e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params" + e2etypes "github.com/prysmaticlabs/prysm/testing/endtoend/types" + proxy "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy" + log "github.com/sirupsen/logrus" +) + +// ProxySet represents a set of proxies for the engine-api. +type ProxySet struct { + e2etypes.ComponentRunner + started chan struct{} + proxies []e2etypes.ComponentRunner +} + +// NewProxySet creates and returns a set of engine-api proxies. +func NewProxySet() *ProxySet { + return &ProxySet{ + started: make(chan struct{}, 1), + } +} + +// Start starts all the proxies in set. +func (s *ProxySet) Start(ctx context.Context) error { + + totalNodeCount := e2e.TestParams.BeaconNodeCount + e2e.TestParams.LighthouseBeaconNodeCount + nodes := make([]e2etypes.ComponentRunner, totalNodeCount) + for i := 0; i < totalNodeCount; i++ { + // We start indexing nodes from 1 because the miner has an implicit 0 index. + nodes[i] = NewProxy(i) + } + s.proxies = nodes + + // Wait for all nodes to finish their job (blocking). + // Once nodes are ready passed in handler function will be called. + return helpers.WaitOnNodes(ctx, nodes, func() { + // All nodes started, close channel, so that all services waiting on a set, can proceed. + close(s.started) + }) +} + +// Started checks whether proxy set is started and all proxies are ready to be queried. +func (s *ProxySet) Started() <-chan struct{} { + return s.started +} + +// Pause pauses the component and its underlying process. +func (s *ProxySet) Pause() error { + for _, n := range s.proxies { + if err := n.Pause(); err != nil { + return err + } + } + return nil +} + +// Resume resumes the component and its underlying process. +func (s *ProxySet) Resume() error { + for _, n := range s.proxies { + if err := n.Resume(); err != nil { + return err + } + } + return nil +} + +// Stop stops the component and its underlying process. +func (s *ProxySet) Stop() error { + for _, n := range s.proxies { + if err := n.Stop(); err != nil { + return err + } + } + return nil +} + +// PauseAtIndex pauses the component and its underlying process at the desired index. +func (s *ProxySet) PauseAtIndex(i int) error { + if i >= len(s.proxies) { + return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies)) + } + return s.proxies[i].Pause() +} + +// ResumeAtIndex resumes the component and its underlying process at the desired index. +func (s *ProxySet) ResumeAtIndex(i int) error { + if i >= len(s.proxies) { + return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies)) + } + return s.proxies[i].Resume() +} + +// StopAtIndex stops the component and its underlying process at the desired index. +func (s *ProxySet) StopAtIndex(i int) error { + if i >= len(s.proxies) { + return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies)) + } + return s.proxies[i].Stop() +} + +// Proxy represents an engine-api proxy. +type Proxy struct { + e2etypes.ComponentRunner + started chan struct{} + index int + cancel func() +} + +// NewProxy creates and returns an engine-api proxy. +func NewProxy(index int) *Proxy { + return &Proxy{ + started: make(chan struct{}, 1), + index: index, + } +} + +// Start runs a proxy. +func (node *Proxy) Start(ctx context.Context) error { + file, err := os.Create(path.Join(e2e.TestParams.LogPath, "eth1_proxy_"+strconv.Itoa(node.index)+".log")) + if err != nil { + return err + } + jwtPath := path.Join(e2e.TestParams.TestPath, "eth1data/"+strconv.Itoa(node.index)+"/") + if node.index == 0 { + jwtPath = path.Join(e2e.TestParams.TestPath, "eth1data/miner/") + } + jwtPath = path.Join(jwtPath, "geth/jwtsecret") + secret, err := parseJWTSecretFromFile(jwtPath) + if err != nil { + return err + } + opts := []proxy.Option{ + proxy.WithDestinationAddress(fmt.Sprintf("http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+node.index)), + proxy.WithPort(e2e.TestParams.Ports.Eth1ProxyPort + node.index), + proxy.WithLogger(log.New()), + proxy.WithLogFile(file), + proxy.WithJwtSecret(string(secret)), + } + nProxy, err := proxy.New(opts...) + if err != nil { + return err + } + log.Infof("Starting eth1 proxy %d with port: %d and file %s", node.index, e2e.TestParams.Ports.Eth1ProxyPort+node.index, file.Name()) + + // Set cancel into context. + ctx, cancel := context.WithCancel(ctx) + node.cancel = cancel + // Mark node as ready. + close(node.started) + return nProxy.Start(ctx) +} + +// Started checks whether the eth1 proxy is started and ready to be queried. +func (node *Proxy) Started() <-chan struct{} { + return node.started +} + +// Pause pauses the component and its underlying process. +func (node *Proxy) Pause() error { + // no-op + return nil +} + +// Resume resumes the component and its underlying process. +func (node *Proxy) Resume() error { + // no-op + return nil +} + +// Stop kills the component and its underlying process. +func (node *Proxy) Stop() error { + node.cancel() + return nil +} + +func parseJWTSecretFromFile(jwtSecretFile string) ([]byte, error) { + enc, err := file.ReadFileAsBytes(jwtSecretFile) + if err != nil { + return nil, err + } + strData := strings.TrimSpace(string(enc)) + if len(strData) == 0 { + return nil, fmt.Errorf("provided JWT secret in file %s cannot be empty", jwtSecretFile) + } + secret, err := hex.DecodeString(strings.TrimPrefix(strData, "0x")) + if err != nil { + return nil, err + } + if len(secret) < 32 { + return nil, errors.New("provided JWT secret should be a hex string of at least 32 bytes") + } + return secret, nil +} diff --git a/testing/endtoend/components/lighthouse_beacon.go b/testing/endtoend/components/lighthouse_beacon.go index d172ebbbcfd6..51b42e46dd25 100644 --- a/testing/endtoend/components/lighthouse_beacon.go +++ b/testing/endtoend/components/lighthouse_beacon.go @@ -176,7 +176,7 @@ func (node *LighthouseBeaconNode) Start(ctx context.Context) error { fmt.Sprintf("--http-port=%d", e2e.TestParams.Ports.LighthouseBeaconNodeHTTPPort+index), fmt.Sprintf("--target-peers=%d", 10), fmt.Sprintf("--eth1-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1RPCPort+prysmNodeCount+index), - fmt.Sprintf("--execution-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+prysmNodeCount+index), + fmt.Sprintf("--execution-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1ProxyPort+prysmNodeCount+index), fmt.Sprintf("--jwt-secrets=%s", jwtPath), fmt.Sprintf("--boot-nodes=%s", node.enr), fmt.Sprintf("--metrics-port=%d", e2e.TestParams.Ports.LighthouseBeaconNodeMetricsPort+index), diff --git a/testing/endtoend/endtoend_test.go b/testing/endtoend/endtoend_test.go index 527fe95be35d..2c49ed04f90e 100644 --- a/testing/endtoend/endtoend_test.go +++ b/testing/endtoend/endtoend_test.go @@ -263,6 +263,13 @@ func (r *testRunner) testCheckpointSync(ctx context.Context, g *errgroup.Group, if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{ethNode}); err != nil { return fmt.Errorf("sync beacon node not ready: %w", err) } + proxyNode := eth1.NewProxy(i) + g.Go(func() error { + return proxyNode.Start(ctx) + }) + if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{proxyNode}); err != nil { + return fmt.Errorf("sync beacon node not ready: %w", err) + } client, err := beacon.NewClient(bnAPI) if err != nil { @@ -323,6 +330,13 @@ func (r *testRunner) testBeaconChainSync(ctx context.Context, g *errgroup.Group, if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{ethNode}); err != nil { return fmt.Errorf("sync beacon node not ready: %w", err) } + proxyNode := eth1.NewProxy(index) + g.Go(func() error { + return proxyNode.Start(ctx) + }) + if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{proxyNode}); err != nil { + return fmt.Errorf("sync beacon node not ready: %w", err) + } syncBeaconNode := components.NewBeaconNode(config, index, bootnodeEnr) g.Go(func() error { return syncBeaconNode.Start(ctx) diff --git a/testing/endtoend/params/params.go b/testing/endtoend/params/params.go index 36fdf36370ba..0d9bc2e74875 100644 --- a/testing/endtoend/params/params.go +++ b/testing/endtoend/params/params.go @@ -34,6 +34,7 @@ type ports struct { Eth1RPCPort int Eth1AuthRPCPort int Eth1WSPort int + Eth1ProxyPort int PrysmBeaconNodeRPCPort int PrysmBeaconNodeUDPPort int PrysmBeaconNodeTCPPort int @@ -89,6 +90,7 @@ const ( Eth1RPCPort = Eth1Port + portSpan Eth1WSPort = Eth1Port + 2*portSpan Eth1AuthRPCPort = Eth1Port + 3*portSpan + Eth1ProxyPort = Eth1Port + 4*portSpan PrysmBeaconNodeRPCPort = 4150 PrysmBeaconNodeUDPPort = PrysmBeaconNodeRPCPort + portSpan @@ -244,6 +246,10 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR if err != nil { return err } + eth1ProxyPort, err := port(Eth1ProxyPort, shardCount, shardIndex, existingRegistrations) + if err != nil { + return err + } beaconNodeRPCPort, err := port(PrysmBeaconNodeRPCPort, shardCount, shardIndex, existingRegistrations) if err != nil { return err @@ -286,6 +292,7 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR ports.Eth1RPCPort = eth1RPCPort ports.Eth1AuthRPCPort = eth1AuthPort ports.Eth1WSPort = eth1WSPort + ports.Eth1ProxyPort = eth1ProxyPort ports.PrysmBeaconNodeRPCPort = beaconNodeRPCPort ports.PrysmBeaconNodeUDPPort = beaconNodeUDPPort ports.PrysmBeaconNodeTCPPort = beaconNodeTCPPort diff --git a/testing/endtoend/params/params_test.go b/testing/endtoend/params/params_test.go index 9e16748c417e..afb26b107f8f 100644 --- a/testing/endtoend/params/params_test.go +++ b/testing/endtoend/params/params_test.go @@ -30,7 +30,7 @@ func TestStandardPorts(t *testing.T) { var existingRegistrations []int testPorts := &ports{} assert.NoError(t, initializeStandardPorts(2, 0, testPorts, &existingRegistrations)) - assert.Equal(t, 15, len(existingRegistrations)) + assert.Equal(t, 16, len(existingRegistrations)) assert.NotEqual(t, 0, testPorts.PrysmBeaconNodeGatewayPort) assert.NotEqual(t, 0, testPorts.PrysmBeaconNodeTCPPort) assert.NotEqual(t, 0, testPorts.JaegerTracingPort) diff --git a/testing/middleware/engine-api-proxy/BUILD.bazel b/testing/middleware/engine-api-proxy/BUILD.bazel index 4021d20e10e8..b719433dfa12 100644 --- a/testing/middleware/engine-api-proxy/BUILD.bazel +++ b/testing/middleware/engine-api-proxy/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy", visibility = ["//visibility:public"], deps = [ + "//network:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], diff --git a/testing/middleware/engine-api-proxy/options.go b/testing/middleware/engine-api-proxy/options.go index 29c901e85191..6ffa97cecc61 100644 --- a/testing/middleware/engine-api-proxy/options.go +++ b/testing/middleware/engine-api-proxy/options.go @@ -2,6 +2,7 @@ package proxy import ( "net/url" + "os" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -12,6 +13,7 @@ type config struct { proxyHost string destinationUrl *url.URL logger *logrus.Logger + secret string } type Option func(p *Proxy) error @@ -54,3 +56,24 @@ func WithLogger(l *logrus.Logger) Option { return nil } } + +// WithLogFile specifies a log file to write +// the proxies output to. +func WithLogFile(f *os.File) Option { + return func(p *Proxy) error { + if p.cfg.logger == nil { + return errors.New("nil logger provided") + } + p.cfg.logger.SetOutput(f) + return nil + } +} + +// WithJwtSecret adds in support for jwt authenticated +// connections for our proxy. +func WithJwtSecret(secret string) Option { + return func(p *Proxy) error { + p.cfg.secret = secret + return nil + } +} diff --git a/testing/middleware/engine-api-proxy/proxy.go b/testing/middleware/engine-api-proxy/proxy.go index fb98f45948ee..47153715464d 100644 --- a/testing/middleware/engine-api-proxy/proxy.go +++ b/testing/middleware/engine-api-proxy/proxy.go @@ -16,6 +16,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/network" "github.com/sirupsen/logrus" ) @@ -128,6 +129,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}, trigger func() bool) { p.lock.Lock() defer p.lock.Unlock() + p.cfg.logger.Infof("Adding in interceptor for method %s", rpcMethodName) p.interceptors[rpcMethodName] = &interceptorConfig{ response, trigger, @@ -168,6 +170,13 @@ func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (h // Create a new proxy request to the execution client. func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http.Request) { + jreq, err := unmarshalRPCObject(requestBytes) + if err != nil { + p.cfg.logger.WithError(err).Error("Could not unmarshal request") + // Continue and mark it as unknown. + jreq = &jsonRPCObject{Method: "unknown"} + } + p.cfg.logger.Infof("Forwarding %s request for method %s to %s", r.Method, jreq.Method, p.cfg.destinationUrl.String()) proxyReq, err := http.NewRequest(r.Method, p.cfg.destinationUrl.String(), r.Body) if err != nil { p.cfg.logger.WithError(err).Error("Could create new request") @@ -183,11 +192,16 @@ func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http proxyReq.Header.Set("Content-Type", "application/json") client := &http.Client{} + if p.cfg.secret != "" { + client = network.NewHttpClientWithSecret(p.cfg.secret) + } proxyRes, err := client.Do(proxyReq) if err != nil { p.cfg.logger.WithError(err).Error("Could not forward request to destination server") return } + p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, jreq.Method, p.cfg.destinationUrl.String()) + defer func() { if err = proxyRes.Body.Close(); err != nil { p.cfg.logger.WithError(err).Error("Could not do close proxy response body")