Skip to content

Commit

Permalink
Handle unaggregated attestation event (#11558)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkapka authored Oct 18, 2022
1 parent df694aa commit 98b9c9e
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 6 deletions.
26 changes: 20 additions & 6 deletions beacon-chain/rpc/apimiddleware/custom_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,16 +369,30 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http
case events.AttestationTopic:
data = &AttestationJson{}

// Data received in the event does not fit the expected event stream output.
// Data received in the aggregated att event does not fit the expected event stream output.
// We extract the underlying attestation from event data
// and assign the attestation back to event data for further processing.
eventData := &AggregatedAttReceivedDataJson{}
if err := json.Unmarshal(msg.Data, eventData); err != nil {
aggEventData := &AggregatedAttReceivedDataJson{}
if err := json.Unmarshal(msg.Data, aggEventData); err != nil {
return apimiddleware.InternalServerError(err)
}
attData, err := json.Marshal(eventData.Aggregate)
if err != nil {
return apimiddleware.InternalServerError(err)
var attData []byte
var err error
// If true, then we have an unaggregated attestation
if aggEventData.Aggregate == nil {
unaggEventData := &UnaggregatedAttReceivedDataJson{}
if err := json.Unmarshal(msg.Data, unaggEventData); err != nil {
return apimiddleware.InternalServerError(err)
}
attData, err = json.Marshal(unaggEventData)
if err != nil {
return apimiddleware.InternalServerError(err)
}
} else {
attData, err = json.Marshal(aggEventData.Aggregate)
if err != nil {
return apimiddleware.InternalServerError(err)
}
}
msg.Data = attData
case events.VoluntaryExitTopic:
Expand Down
92 changes: 92 additions & 0 deletions beacon-chain/rpc/apimiddleware/custom_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,98 @@ func TestReceiveEvents(t *testing.T) {

errJson := receiveEvents(ch, w, req)
assert.Equal(t, true, errJson == nil)

expectedEvent := `event: finalized_checkpoint
data: {"block":"0x666f6f","state":"0x666f6f","epoch":"1","execution_optimistic":false}
`
assert.DeepEqual(t, expectedEvent, w.Body.String())
}

func TestReceiveEvents_AggregatedAtt(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *sse.Event)
w := httptest.NewRecorder()
w.Body = &bytes.Buffer{}
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
req = req.WithContext(ctx)

go func() {
base64Val := "Zm9v"
data := AggregatedAttReceivedDataJson{
Aggregate: &AttestationJson{
AggregationBits: base64Val,
Data: &AttestationDataJson{
Slot: "1",
CommitteeIndex: "1",
BeaconBlockRoot: base64Val,
Source: nil,
Target: nil,
},
Signature: base64Val,
},
}
bData, err := json.Marshal(data)
require.NoError(t, err)
msg := &sse.Event{
Data: bData,
Event: []byte(events.AttestationTopic),
}
ch <- msg
time.Sleep(time.Second)
cancel()
}()

errJson := receiveEvents(ch, w, req)
assert.Equal(t, true, errJson == nil)

expectedEvent := `event: attestation
data: {"aggregation_bits":"0x666f6f","data":{"slot":"1","index":"1","beacon_block_root":"0x666f6f","source":null,"target":null},"signature":"0x666f6f"}
`
assert.DeepEqual(t, expectedEvent, w.Body.String())
}

func TestReceiveEvents_UnaggregatedAtt(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *sse.Event)
w := httptest.NewRecorder()
w.Body = &bytes.Buffer{}
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
req = req.WithContext(ctx)

go func() {
base64Val := "Zm9v"
data := UnaggregatedAttReceivedDataJson{
AggregationBits: base64Val,
Data: &AttestationDataJson{
Slot: "1",
CommitteeIndex: "1",
BeaconBlockRoot: base64Val,
Source: nil,
Target: nil,
},
Signature: base64Val,
}
bData, err := json.Marshal(data)
require.NoError(t, err)
msg := &sse.Event{
Data: bData,
Event: []byte(events.AttestationTopic),
}
ch <- msg
time.Sleep(time.Second)
cancel()
}()

errJson := receiveEvents(ch, w, req)
assert.Equal(t, true, errJson == nil)

expectedEvent := `event: attestation
data: {"aggregation_bits":"0x666f6f","data":{"slot":"1","index":"1","beacon_block_root":"0x666f6f","source":null,"target":null},"signature":"0x666f6f"}
`
assert.DeepEqual(t, expectedEvent, w.Body.String())
}

func TestReceiveEvents_EventNotSupported(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/rpc/apimiddleware/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,12 @@ type AggregatedAttReceivedDataJson struct {
Aggregate *AttestationJson `json:"aggregate"`
}

type UnaggregatedAttReceivedDataJson struct {
AggregationBits string `json:"aggregation_bits" hex:"true"`
Data *AttestationDataJson `json:"data"`
Signature string `json:"signature" hex:"true"`
}

type EventFinalizedCheckpointJson struct {
Block string `json:"block" hex:"true"`
State string `json:"state" hex:"true"`
Expand Down

0 comments on commit 98b9c9e

Please sign in to comment.