Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run text/event-stream values returned from grpc-gateway through API Middleware #9080

Merged
merged 15 commits into from
Jun 24, 2021
2 changes: 1 addition & 1 deletion beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,13 +665,13 @@ func (b *BeaconNode) registerGRPCGateway() error {
ethpb.RegisterNodeHandler,
ethpb.RegisterBeaconChainHandler,
ethpb.RegisterBeaconNodeValidatorHandler,
ethpbv1.RegisterEventsHandler,
pbrpc.RegisterHealthHandler,
}
v1Registrations := []gateway.PbHandlerRegistration{
ethpbv1.RegisterBeaconNodeHandler,
ethpbv1.RegisterBeaconChainHandler,
ethpbv1.RegisterBeaconValidatorHandler,
ethpbv1.RegisterEventsHandler,
}
if enableDebugRPCEndpoints {
v1Alpha1Registrations = append(v1Alpha1Registrations, pbrpc.RegisterDebugHandler)
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/rpc/apimiddleware/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/apimiddleware",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/rpc/eventsv1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/gateway:go_default_library",
"//shared/grpcutils:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_r3labs_sse//:go_default_library",
],
)

Expand All @@ -28,10 +30,12 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/rpc/eventsv1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/gateway:go_default_library",
"//shared/grpcutils:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"@com_github_r3labs_sse//:go_default_library",
],
)
121 changes: 121 additions & 0 deletions beacon-chain/rpc/apimiddleware/custom_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package apimiddleware
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1"
"github.com/prysmaticlabs/prysm/shared/gateway"
"github.com/prysmaticlabs/prysm/shared/grpcutils"
"github.com/r3labs/sse"
)

type sszConfig struct {
Expand Down Expand Up @@ -157,3 +161,120 @@ func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWrite
}
return nil
}

func handleEvents(m *gateway.ApiProxyMiddleware, _ gateway.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
sseClient := sse.NewClient("http://" + m.GatewayAddress + req.URL.RequestURI())
eventChan := make(chan *sse.Event)

// We use grpc-gateway as the server side of events, not the sse library.
// Because of this subscribing to streams doesn't work as intended, resulting in each event being handled by all subscriptions.
// To handle events properly, we subscribe just once using a placeholder value ('events') and handle all topics inside this subscription.
if err := sseClient.SubscribeChan("events", eventChan); err != nil {
gateway.WriteError(w, &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}, nil)
sseClient.Unsubscribe(eventChan)
return
}

errJson := receiveEvents(eventChan, w, req)
if errJson != nil {
gateway.WriteError(w, errJson, nil)
}

sseClient.Unsubscribe(eventChan)
return true
}

func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http.Request) gateway.ErrorJson {
for {
select {
case msg := <-eventChan:
var data interface{}

switch strings.TrimSpace(string(msg.Event)) {
case eventsv1.HeadTopic:
data = &eventHeadJson{}
case eventsv1.BlockTopic:
data = &receivedBlockDataJson{}
case eventsv1.AttestationTopic:
data = &attestationJson{}

// Data received in the event does not fit the expected event stream output.
// We extract the underlying attestation from event data
// and assign the attestation back to event data for further processing.
eventData := &aggregatedAttReceivedDataJson{}
if err := json.Unmarshal(msg.Data, eventData); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
attData, err := json.Marshal(eventData.Aggregate)
if err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
msg.Data = attData
case eventsv1.VoluntaryExitTopic:
data = &signedVoluntaryExitJson{}
case eventsv1.FinalizedCheckpointTopic:
data = &eventFinalizedCheckpointJson{}
case eventsv1.ChainReorgTopic:
data = &eventChainReorgJson{}
case "error":
data = &eventErrorJson{}
default:
return &gateway.DefaultErrorJson{
Message: fmt.Sprintf("Event type '%s' not supported", strings.TrimSpace(string(msg.Event))),
Code: http.StatusInternalServerError,
}
}

if errJson := writeEvent(msg, w, data); errJson != nil {
return errJson
}
if errJson := flushEvent(w); errJson != nil {
return errJson
}
case <-req.Context().Done():
return nil
}
}
}

func writeEvent(msg *sse.Event, w http.ResponseWriter, data interface{}) gateway.ErrorJson {
if err := json.Unmarshal(msg.Data, data); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
if errJson := gateway.ProcessMiddlewareResponseFields(data); errJson != nil {
return errJson
}
dataJson, errJson := gateway.SerializeMiddlewareResponseIntoJson(data)
if errJson != nil {
return errJson
}

w.Header().Set("Content-Type", "text/event-stream")

if _, err := w.Write([]byte("event: ")); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
if _, err := w.Write(msg.Event); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
if _, err := w.Write([]byte("\ndata: ")); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
if _, err := w.Write(dataJson); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}
if _, err := w.Write([]byte("\n\n")); err != nil {
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
}

return nil
}

func flushEvent(w http.ResponseWriter) gateway.ErrorJson {
flusher, ok := w.(http.Flusher)
if !ok {
return &gateway.DefaultErrorJson{Message: fmt.Sprintf("Flush not supported in %T", w), Code: http.StatusInternalServerError}
}
flusher.Flush()
return nil
}
76 changes: 76 additions & 0 deletions beacon-chain/rpc/apimiddleware/custom_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package apimiddleware

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1"
"github.com/prysmaticlabs/prysm/shared/gateway"
"github.com/prysmaticlabs/prysm/shared/grpcutils"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/r3labs/sse"
)

func TestSSZRequested(t *testing.T) {
Expand Down Expand Up @@ -136,3 +141,74 @@ func TestWriteSSZResponseHeaderAndBody(t *testing.T) {
assert.Equal(t, http.StatusInternalServerError, errJson.StatusCode())
})
}

func TestReceiveEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *sse.Event)
w := httptest.NewRecorder()
w.Body = &bytes.Buffer{}
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
req = req.WithContext(ctx)

go func() {
base64Val := "Zm9v"
data := &eventFinalizedCheckpointJson{
Block: base64Val,
State: base64Val,
Epoch: "1",
}
bData, err := json.Marshal(data)
require.NoError(t, err)
msg := &sse.Event{
Data: bData,
Event: []byte(eventsv1.FinalizedCheckpointTopic),
}
ch <- msg
time.Sleep(time.Second)
cancel()
}()

errJson := receiveEvents(ch, w, req)
assert.Equal(t, true, errJson == nil)
}

func TestReceiveEvents_EventNotSupported(t *testing.T) {
ch := make(chan *sse.Event)
w := httptest.NewRecorder()
w.Body = &bytes.Buffer{}
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})

go func() {
msg := &sse.Event{
Data: []byte("foo"),
Event: []byte("not_supported"),
}
ch <- msg
}()

errJson := receiveEvents(ch, w, req)
require.NotNil(t, errJson)
assert.Equal(t, "Event type 'not_supported' not supported", errJson.Msg())
}

func TestWriteEvent(t *testing.T) {
base64Val := "Zm9v"
data := &eventFinalizedCheckpointJson{
Block: base64Val,
State: base64Val,
Epoch: "1",
}
bData, err := json.Marshal(data)
require.NoError(t, err)
msg := &sse.Event{
Data: bData,
Event: []byte("test_event"),
}
w := httptest.NewRecorder()
w.Body = &bytes.Buffer{}

errJson := writeEvent(msg, w, &eventFinalizedCheckpointJson{})
require.Equal(t, true, errJson == nil)
written := w.Body.String()
assert.Equal(t, "event: test_event\ndata: {\"block\":\"0x666f6f\",\"state\":\"0x666f6f\",\"epoch\":\"1\"}\n\n", written)
}
8 changes: 8 additions & 0 deletions beacon-chain/rpc/apimiddleware/endpoint_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (f *BeaconEndpointFactory) Paths() []string {
"/eth/v1/config/fork_schedule",
"/eth/v1/config/deposit_contract",
"/eth/v1/config/spec",
"/eth/v1/events",
}
}

Expand Down Expand Up @@ -217,6 +218,13 @@ func (f *BeaconEndpointFactory) Create(path string) (*gateway.Endpoint, error) {
GetResponse: &specResponseJson{},
Err: &gateway.DefaultErrorJson{},
}
case "/eth/v1/events":
endpoint = gateway.Endpoint{
Err: &gateway.DefaultErrorJson{},
Hooks: gateway.HookCollection{
CustomHandlers: []gateway.CustomHandler{handleEvents},
},
}
default:
return nil, errors.New("invalid path")
}
Expand Down
44 changes: 44 additions & 0 deletions beacon-chain/rpc/apimiddleware/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,45 @@ func (ssz *beaconStateSSZResponseJson) SSZData() string {
return ssz.Data
}

// TODO: Documentation
// ---------------
// Events.
// ---------------

type eventHeadJson struct {
Slot string `json:"slot"`
Block string `json:"block" hex:"true"`
State string `json:"state" hex:"true"`
EpochTransition bool `json:"epoch_transition"`
PreviousDutyDependentRoot string `json:"previous_duty_dependent_root" hex:"true"`
CurrentDutyDependentRoot string `json:"current_duty_dependent_root" hex:"true"`
}

type receivedBlockDataJson struct {
Slot string `json:"slot"`
Block string `json:"block" hex:"true"`
}

type aggregatedAttReceivedDataJson struct {
Aggregate *attestationJson `json:"aggregate"`
}

type eventFinalizedCheckpointJson struct {
Block string `json:"block" hex:"true"`
State string `json:"state" hex:"true"`
Epoch string `json:"epoch"`
}

type eventChainReorgJson struct {
Slot string `json:"slot"`
Depth string `json:"depth"`
OldHeadBlock string `json:"old_head_block" hex:"true"`
NewHeadBlock string `json:"old_head_state" hex:"true"`
OldHeadState string `json:"new_head_block" hex:"true"`
NewHeadState string `json:"new_head_state" hex:"true"`
Epoch string `json:"epoch"`
}

// ---------------
// Error handling.
// ---------------
Expand All @@ -470,3 +509,8 @@ type singleAttestationVerificationFailureJson struct {
Index int `json:"index"`
Message string `json:"message"`
}

type eventErrorJson struct {
StatusCode int `json:"status_code"`
Message string `json:"message"`
}
1 change: 1 addition & 0 deletions beacon-chain/rpc/eventsv1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//proto/eth/v1:go_default_library",
"//proto/migration:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
Expand Down
Loading