Skip to content

Commit

Permalink
SSE Payload Attributes subscription (#318)
Browse files Browse the repository at this point in the history
* add sse endpoint

* Add SSE subscription for payload attributes

* Update beaconclient/multi_beacon_client.go

Co-authored-by: Chris Hager <chris@linuxuser.at>

* Update services/api/service.go

Co-authored-by: Chris Hager <chris@linuxuser.at>

* Add SSE subscription for payload attributes

* pr comments

* renaming variables

* pr comments

* cleanup

---------

Co-authored-by: Chris Hager <chris@linuxuser.at>
  • Loading branch information
avalonche and metachris authored Mar 29, 2023
1 parent c56ed6e commit fc0c773
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 10 deletions.
2 changes: 2 additions & 0 deletions beaconclient/mock_beacon_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (c *MockBeaconInstance) CurrentSlot() (uint64, error) {

func (c *MockBeaconInstance) SubscribeToHeadEvents(slotC chan HeadEventData) {}

func (c *MockBeaconInstance) SubscribeToPayloadAttributesEvents(slotC chan PayloadAttributesEvent) {}

func (c *MockBeaconInstance) GetProposerDuties(epoch uint64) (*ProposerDutiesResponse, error) {
c.addDelay()
return c.MockProposerDuties, c.MockProposerDutiesErr
Expand Down
9 changes: 9 additions & 0 deletions beaconclient/multi_beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
type IMultiBeaconClient interface {
BestSyncStatus() (*SyncStatusPayloadData, error)
SubscribeToHeadEvents(slotC chan HeadEventData)
// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals
SubscribeToPayloadAttributesEvents(payloadAttrC chan PayloadAttributesEvent)

// FetchValidators returns all active and pending validators from the beacon node
FetchValidators(headSlot uint64) (map[types.PubkeyHex]ValidatorResponseEntry, error)
Expand All @@ -41,6 +43,7 @@ type IBeaconInstance interface {
SyncStatus() (*SyncStatusPayloadData, error)
CurrentSlot() (uint64, error)
SubscribeToHeadEvents(slotC chan HeadEventData)
SubscribeToPayloadAttributesEvents(slotC chan PayloadAttributesEvent)
FetchValidators(headSlot uint64) (map[types.PubkeyHex]ValidatorResponseEntry, error)
GetProposerDuties(epoch uint64) (*ProposerDutiesResponse, error)
GetURI() string
Expand Down Expand Up @@ -139,6 +142,12 @@ func (c *MultiBeaconClient) SubscribeToHeadEvents(slotC chan HeadEventData) {
}
}

func (c *MultiBeaconClient) SubscribeToPayloadAttributesEvents(slotC chan PayloadAttributesEvent) {
for _, instance := range c.beaconInstances {
go instance.SubscribeToPayloadAttributesEvents(slotC)
}
}

func (c *MultiBeaconClient) FetchValidators(headSlot uint64) (map[types.PubkeyHex]ValidatorResponseEntry, error) {
// return the first successful beacon node response
clients := c.beaconInstancesByLastResponse()
Expand Down
51 changes: 50 additions & 1 deletion beaconclient/prod_beacon_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,37 @@ type HeadEventData struct {
State string `json:"state"`
}

// PayloadAttributesEvent represents the data of a payload_attributes event
// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}}
type PayloadAttributesEvent struct {
Version string `json:"version"`
Data PayloadAttributesEventData `json:"data"`
}

type PayloadAttributesEventData struct {
ProposerIndex uint64 `json:"proposer_index,string"`
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockNumber uint64 `json:"parent_block_number,string"`
ParentBlockRoot string `json:"parent_block_root"`
ParentBlockHash string `json:"parent_block_hash"`
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
}

type PayloadAttributes struct {
Timestamp uint64 `json:"timestamp,string"`
PrevRandao string `json:"prev_randao"`
SuggestedFeeRecipient string `json:"suggested_fee_recipient"`
Withdrawals []*capella.Withdrawal `json:"withdrawals"`
}

func (c *ProdBeaconInstance) SubscribeToHeadEvents(slotC chan HeadEventData) {
eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=head", c.beaconURI)
log := c.log.WithField("url", eventsURL)
log.Info("subscribing to head events")

client := sse.NewClient(eventsURL)

for {
client := sse.NewClient(eventsURL)
err := client.SubscribeRaw(func(msg *sse.Event) {
var data HeadEventData
err := json.Unmarshal(msg.Data, &data)
Expand All @@ -58,6 +82,31 @@ func (c *ProdBeaconInstance) SubscribeToHeadEvents(slotC chan HeadEventData) {
}
}

func (c *ProdBeaconInstance) SubscribeToPayloadAttributesEvents(payloadAttributesC chan PayloadAttributesEvent) {
eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", c.beaconURI)
log := c.log.WithField("url", eventsURL)
log.Info("subscribing to payload_attributes events")

client := sse.NewClient(eventsURL)

for {
err := client.SubscribeRaw(func(msg *sse.Event) {
var data PayloadAttributesEvent
err := json.Unmarshal(msg.Data, &data)
if err != nil {
log.WithError(err).Error("could not unmarshal payload_attributes event")
} else {
payloadAttributesC <- data
}
})
if err != nil {
log.WithError(err).Error("failed to subscribe to payload_attributes events")
time.Sleep(1 * time.Second)
}
c.log.Warn("beaconclient SubscribeRaw ended, reconnecting")
}
}

func (c *ProdBeaconInstance) FetchValidators(headSlot uint64) (map[types.PubkeyHex]ValidatorResponseEntry, error) {
vd, err := fetchAllValidators(c.beaconURI, headSlot)
if err != nil {
Expand Down
93 changes: 84 additions & 9 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,13 @@ type RelayAPI struct {
getPayloadCallsInFlight sync.WaitGroup

// Feature flags
ffForceGetHeader204 bool
ffDisableBlockPublishing bool
ffDisableLowPrioBuilders bool
ffDisablePayloadDBStorage bool // disable storing the execution payloads in the database
ffForceGetHeader204 bool
ffDisableBlockPublishing bool
ffDisableLowPrioBuilders bool
ffDisablePayloadDBStorage bool // disable storing the execution payloads in the database
ffEnableSSEPayloadAttributes bool // instead of polling withdrawals+prevRandao, use SSE event (requires Prysm v4+)

latestParentBlockHash uberatomic.String // used to cache the latest parent block hash, to avoid repetitive similar SSE events

expectedPrevRandao randaoHelper
expectedPrevRandaoLock sync.RWMutex
Expand Down Expand Up @@ -245,6 +248,12 @@ func NewRelayAPI(opts RelayAPIOpts) (api *RelayAPI, err error) {
api.log.Warn("env: DISABLE_PAYLOAD_DATABASE_STORAGE - disabling storing payloads in the database")
api.ffDisablePayloadDBStorage = true
}

if os.Getenv("ENABLE_SSE_PAYLOAD_ATTRIBUTES") == "1" {
api.log.Warn("env: ENABLE_SSE_PAYLOAD_ATTRIBUTES - enable SSE subscription for validating payload attributes")
api.ffEnableSSEPayloadAttributes = true
}

return api, nil
}

Expand Down Expand Up @@ -394,6 +403,19 @@ func (api *RelayAPI) StartServer() (err error) {
}
}()

// Start regular payload attributes updates only if builder-api is enabled
// and if using see subscriptions instead of querying for payload attributes
if api.opts.BlockBuilderAPI && api.ffEnableSSEPayloadAttributes {
go func() {
c := make(chan beaconclient.PayloadAttributesEvent)
api.beaconClient.SubscribeToPayloadAttributesEvents(c)
for {
payloadAttributes := <-c
api.processPayloadAttributes(payloadAttributes)
}
}()
}

api.srv = &http.Server{
Addr: api.opts.ListenAddr,
Handler: api.getRouter(),
Expand Down Expand Up @@ -457,6 +479,56 @@ func (api *RelayAPI) startValidatorRegistrationDBProcessor() {
}
}

func (api *RelayAPI) processPayloadAttributes(payloadAttributes beaconclient.PayloadAttributesEvent) {
apiHeadSlot := api.headSlot.Load()
proposalSlot := payloadAttributes.Data.ProposalSlot

// require proposal slot in the future
if proposalSlot <= apiHeadSlot {
return
}
log := api.log.WithFields(logrus.Fields{
"headSlot": apiHeadSlot,
"proposalSlot": proposalSlot,
})

// discard repetitive payload attributes (we receive them once from each beacon node)
latestParentBlockHash := api.latestParentBlockHash.Load()
if latestParentBlockHash == payloadAttributes.Data.ParentBlockHash {
return
}
api.latestParentBlockHash.Store(payloadAttributes.Data.ParentBlockHash)
log = log.WithField("parentBlockHash", payloadAttributes.Data.ParentBlockHash)

log.Info("updating payload attributes")
api.expectedPrevRandaoLock.Lock()
prevRandao := payloadAttributes.Data.PayloadAttributes.PrevRandao
api.expectedPrevRandao = randaoHelper{
slot: proposalSlot,
prevRandao: prevRandao,
}
api.expectedPrevRandaoLock.Unlock()
log.Infof("updated expected prev_randao to %s", prevRandao)

// Update withdrawals (in Capella only)
if api.isBellatrix(proposalSlot) {
return
}
log.Info("updating expected withdrawals")
withdrawalsRoot, err := ComputeWithdrawalsRoot(payloadAttributes.Data.PayloadAttributes.Withdrawals)
if err != nil {
log.WithError(err).Error("error computing withdrawals root")
return
}
api.expectedWithdrawalsLock.Lock()
api.expectedWithdrawalsRoot = withdrawalsHelper{
slot: proposalSlot,
root: withdrawalsRoot,
}
api.expectedWithdrawalsLock.Unlock()
log.Infof("updated expected withdrawals root to %s", withdrawalsRoot)
}

func (api *RelayAPI) processNewSlot(headSlot uint64) {
_apiHeadSlot := api.headSlot.Load()
if headSlot <= _apiHeadSlot {
Expand All @@ -474,11 +546,14 @@ func (api *RelayAPI) processNewSlot(headSlot uint64) {

// only for builder-api
if api.opts.BlockBuilderAPI {
// query the expected prev_randao field
go api.updatedExpectedRandao(headSlot)
// if not subscribed to payload attributes via sse, query beacon node endpoints
if !api.ffEnableSSEPayloadAttributes {
// query the expected prev_randao field
go api.updatedExpectedRandao(headSlot)

// query expected withdrawals root
go api.updatedExpectedWithdrawals(headSlot)
// query expected withdrawals root
go api.updatedExpectedWithdrawals(headSlot)
}

// update proposer duties in the background
go api.updateProposerDuties(headSlot)
Expand Down Expand Up @@ -999,7 +1074,7 @@ func (api *RelayAPI) updatedExpectedWithdrawals(slot uint64) {
}

log := api.log.WithField("slot", slot)
log.Infof("updating withdrawals root...")
log.Info("updating withdrawals root...")
api.expectedWithdrawalsLock.Lock()
latestKnownSlot := api.expectedWithdrawalsRoot.slot
if slot < latestKnownSlot || slot <= api.expectedWithdrawalsUpdating { // do nothing slot is already known or currently being updated
Expand Down

0 comments on commit fc0c773

Please sign in to comment.