Skip to content

Commit

Permalink
Merge branch 'develop' into proposer-sync-aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored Jun 28, 2021
2 parents b209718 + 0f2e6fb commit 2c35cc8
Show file tree
Hide file tree
Showing 24 changed files with 255 additions and 1,295 deletions.
4 changes: 2 additions & 2 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,12 @@ func (b *BeaconNode) registerGRPCGateway() error {
return nil
}
gatewayPort := b.cliCtx.Int(flags.GRPCGatewayPort.Name)
apiMiddlewarePort := b.cliCtx.Int(flags.ApiMiddlewarePort.Name)
ethApiPort := b.cliCtx.Int(flags.EthApiPort.Name)
gatewayHost := b.cliCtx.String(flags.GRPCGatewayHost.Name)
rpcHost := b.cliCtx.String(flags.RPCHost.Name)
selfAddress := fmt.Sprintf("%s:%d", rpcHost, b.cliCtx.Int(flags.RPCPort.Name))
gatewayAddress := fmt.Sprintf("%s:%d", gatewayHost, gatewayPort)
apiMiddlewareAddress := fmt.Sprintf("%s:%d", gatewayHost, apiMiddlewarePort)
apiMiddlewareAddress := fmt.Sprintf("%s:%d", gatewayHost, ethApiPort)
allowedOrigins := strings.Split(b.cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",")
enableDebugRPCEndpoints := b.cliCtx.Bool(flags.EnableDebugRPCEndpoints.Name)
selfCert := b.cliCtx.String(flags.CertFlag.Name)
Expand Down
28 changes: 12 additions & 16 deletions beacon-chain/rpc/apimiddleware/custom_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1"
"github.com/prysmaticlabs/prysm/shared/gateway"
"github.com/prysmaticlabs/prysm/shared/grpcutils"
Expand Down Expand Up @@ -121,8 +120,7 @@ func serializeMiddlewareResponseIntoSSZ(data string) (sszResponse []byte, errJso
// Serialize the SSZ part of the deserialized value.
b, err := base64.StdEncoding.DecodeString(data)
if err != nil {
e := errors.Wrapf(err, "could not decode response body into base64")
return nil, &gateway.DefaultErrorJson{Message: e.Error(), Code: http.StatusInternalServerError}
return nil, gateway.InternalServerErrorWithMessage(err, "could not decode response body into base64")
}
return b, nil
}
Expand All @@ -144,8 +142,7 @@ func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWrite
if statusCodeHeader != "" {
code, err := strconv.Atoi(statusCodeHeader)
if err != nil {
e := errors.Wrapf(err, "could not parse status code")
return &gateway.DefaultErrorJson{Message: e.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerErrorWithMessage(err, "could not parse status code")
}
w.WriteHeader(code)
} else {
Expand All @@ -156,8 +153,7 @@ func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWrite
w.Header().Set("Content-Disposition", "attachment; filename="+fileName)
w.WriteHeader(grpcResp.StatusCode)
if _, err := io.Copy(w, ioutil.NopCloser(bytes.NewReader(responseSsz))); err != nil {
e := errors.Wrapf(err, "could not write response message")
return &gateway.DefaultErrorJson{Message: e.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerErrorWithMessage(err, "could not write response message")
}
return nil
}
Expand All @@ -170,7 +166,7 @@ func handleEvents(m *gateway.ApiProxyMiddleware, _ gateway.Endpoint, w http.Resp
// Because of this subscribing to streams doesn't work as intended, resulting in each event being handled by all subscriptions.
// To handle events properly, we subscribe just once using a placeholder value ('events') and handle all topics inside this subscription.
if err := sseClient.SubscribeChan("events", eventChan); err != nil {
gateway.WriteError(w, &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}, nil)
gateway.WriteError(w, gateway.InternalServerError(err), nil)
sseClient.Unsubscribe(eventChan)
return
}
Expand Down Expand Up @@ -203,11 +199,11 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http
// and assign the attestation back to event data for further processing.
eventData := &aggregatedAttReceivedDataJson{}
if err := json.Unmarshal(msg.Data, eventData); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
attData, err := json.Marshal(eventData.Aggregate)
if err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
msg.Data = attData
case eventsv1.VoluntaryExitTopic:
Expand Down Expand Up @@ -239,7 +235,7 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http

func writeEvent(msg *sse.Event, w http.ResponseWriter, data interface{}) gateway.ErrorJson {
if err := json.Unmarshal(msg.Data, data); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
if errJson := gateway.ProcessMiddlewareResponseFields(data); errJson != nil {
return errJson
Expand All @@ -252,19 +248,19 @@ func writeEvent(msg *sse.Event, w http.ResponseWriter, data interface{}) gateway
w.Header().Set("Content-Type", "text/event-stream")

if _, err := w.Write([]byte("event: ")); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
if _, err := w.Write(msg.Event); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
if _, err := w.Write([]byte("\ndata: ")); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
if _, err := w.Write(dataJson); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}
if _, err := w.Write([]byte("\n\n")); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerError(err)
}

return nil
Expand Down
7 changes: 2 additions & 5 deletions beacon-chain/rpc/apimiddleware/custom_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/gateway"
)
Expand All @@ -18,14 +17,12 @@ func wrapAttestationsArray(endpoint gateway.Endpoint, _ http.ResponseWriter, req
if _, ok := endpoint.PostRequest.(*submitAttestationRequestJson); ok {
atts := make([]*attestationJson, 0)
if err := json.NewDecoder(req.Body).Decode(&atts); err != nil {
e := errors.Wrapf(err, "could not decode attestations array")
return &gateway.DefaultErrorJson{Message: e.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerErrorWithMessage(err, "could not decode attestations array")
}
j := &submitAttestationRequestJson{Data: atts}
b, err := json.Marshal(j)
if err != nil {
e := errors.Wrapf(err, "could not marshal wrapped attestations array")
return &gateway.DefaultErrorJson{Message: e.Error(), Code: http.StatusInternalServerError}
return gateway.InternalServerErrorWithMessage(err, "could not marshal wrapped attestations array")
}
req.Body = ioutil.NopCloser(bytes.NewReader(b))
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var (
beaconRPC = flag.String("beacon-rpc", "localhost:4000", "Beacon chain gRPC endpoint")
port = flag.Int("port", 8000, "Port to serve on")
apiMiddlewarePort = flag.Int("port", 8001, "Port to serve API middleware on")
EthApiPort = flag.Int("port", 8001, "Port to serve Ethereum API on")
host = flag.String("host", "127.0.0.1", "Host to serve on")
debug = flag.Bool("debug", false, "Enable debug logging")
allowedOrigins = flag.String("corsdomain", "localhost:4242", "A comma separated list of CORS domains to allow")
Expand Down Expand Up @@ -48,7 +48,7 @@ func main() {
fmt.Sprintf("%s:%d", *host, *port),
).WithAllowedOrigins(strings.Split(*allowedOrigins, ",")).
WithMaxCallRecvMsgSize(uint64(*grpcMaxMsgSize)).
WithApiMiddleware(fmt.Sprintf("%s:%d", *host, *apiMiddlewarePort), &apimiddleware.BeaconEndpointFactory{})
WithApiMiddleware(fmt.Sprintf("%s:%d", *host, *EthApiPort), &apimiddleware.BeaconEndpointFactory{})

mux := http.NewServeMux()
mux.HandleFunc("/swagger/", gateway.SwaggerServer())
Expand Down
10 changes: 5 additions & 5 deletions cmd/beacon-chain/flags/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ var (
Usage: "The port on which the gateway server runs on",
Value: 3500,
}
// ApiMiddlewarePort specifies the port for an HTTP proxy server which acts as a middleware between Ethereum consensus API clients and Prysm's gRPC gateway.
// The middleware serves JSON values conforming to the specification: https://ethereum.github.io/eth2.0-APIs/
ApiMiddlewarePort = &cli.IntFlag{
Name: "api-middleware-port",
Usage: "The port on which the API middleware runs on",
// EthApiPort specifies the port which runs the official Ethereum REST API.
// Serves JSON values conforming to the specification: https://ethereum.github.io/eth2.0-APIs/
EthApiPort = &cli.IntFlag{
Name: "eth-api-port",
Usage: "The port which exposes a REST API conforming to the official Ethereum API specification.",
Value: 3501,
}
// GPRCGatewayCorsDomain serves preflight requests when serving gRPC JSON gateway.
Expand Down
2 changes: 1 addition & 1 deletion cmd/beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var appFlags = []cli.Flag{
flags.DisableGRPCGateway,
flags.GRPCGatewayHost,
flags.GRPCGatewayPort,
flags.ApiMiddlewarePort,
flags.EthApiPort,
flags.GPRCGatewayCorsDomain,
flags.MinSyncPeers,
flags.ContractDeploymentBlock,
Expand Down
2 changes: 1 addition & 1 deletion cmd/beacon-chain/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var appHelpFlagGroups = []flagGroup{
flags.DisableGRPCGateway,
flags.GRPCGatewayHost,
flags.GRPCGatewayPort,
flags.ApiMiddlewarePort,
flags.EthApiPort,
flags.GPRCGatewayCorsDomain,
flags.HTTPWeb3ProviderFlag,
flags.FallbackWeb3ProviderFlag,
Expand Down
2 changes: 1 addition & 1 deletion endtoend/components/beacon_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%d", cmdshared.P2PTCPPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+20),
fmt.Sprintf("--%s=%d", flags.MonitoringPortFlag.Name, e2e.TestParams.BeaconNodeMetricsPort+index),
fmt.Sprintf("--%s=%d", flags.GRPCGatewayPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+40),
fmt.Sprintf("--%s=%d", flags.ApiMiddlewarePort.Name, e2e.TestParams.BeaconNodeRPCPort+index+30),
fmt.Sprintf("--%s=%d", flags.EthApiPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+30),
fmt.Sprintf("--%s=%d", flags.ContractDeploymentBlock.Name, 0),
fmt.Sprintf("--%s=%d", cmdshared.RPCMaxPageSizeFlag.Name, params.BeaconConfig().MinGenesisActiveValidatorCount),
fmt.Sprintf("--%s=%s", cmdshared.BootstrapNode.Name, enr),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/dgraph-io/ristretto v0.0.4-0.20210318174700-74754f61e018
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/emicklei/dot v0.11.0
github.com/ethereum/go-ethereum v1.9.25
github.com/fatih/color v1.9.0 // indirect
Expand Down
Loading

0 comments on commit 2c35cc8

Please sign in to comment.