Skip to content

Commit

Permalink
Change []beacon.Committee to an interface to reduce copying on the ho…
Browse files Browse the repository at this point in the history
…t path

Allow buffered decoding in the BN std-http-client
  • Loading branch information
jshufro committed May 9, 2023
1 parent 7b0c980 commit 4f40e1c
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 56 deletions.
23 changes: 18 additions & 5 deletions shared/services/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,23 @@ type BeaconBlock struct {
ExecutionBlockNumber uint64
}

type Committee struct {
Index uint64
Slot uint64
Validators []string
// Committees in an interface as an optimization- since committees responses
// are quite large, there's a decent cpu/memory improvement to removing the
// translation to an intermediate storage class.
//
// Instead, the interface provides the access pattern that smartnode (or more
// specifically, tree-gen) wants, and the underlying format is just the format
// of the Beacon Node response.
type Committees interface {
// Index returns the index of the committee at the provided offset
Index(int) uint64
// Slot returns the slot of the committee at the provided offset
Slot(int) uint64
// Validators returns the list of validatorsof the committee at the
// provided offset
Validators(int) []string
// Count returns the number of committees in the response
Count() int
}

type AttestationInfo struct {
Expand Down Expand Up @@ -128,6 +141,6 @@ type Client interface {
ExitValidator(validatorIndex string, epoch uint64, signature types.ValidatorSignature) error
Close() error
GetEth1DataForEth2Block(blockId string) (Eth1Data, bool, error)
GetCommitteesForEpoch(epoch *uint64) ([]Committee, error)
GetCommitteesForEpoch(epoch *uint64) (Committees, error)
ChangeWithdrawalCredentials(validatorIndex string, fromBlsPubkey types.ValidatorPubkey, toExecutionAddress common.Address, signature types.ValidatorSignature) error
}
58 changes: 37 additions & 21 deletions shared/services/beacon/client/std-http-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,22 +526,13 @@ func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock,
}

// Get the attestation committees for the given epoch, or the current epoch if nil
func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Committee, error) {
func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) (beacon.Committees, error) {
response, err := c.getCommittees("head", epoch)
if err != nil {
return nil, err
}

committees := []beacon.Committee{}
for _, committee := range response.Data {
committees = append(committees, beacon.Committee{
Index: uint64(committee.Index),
Slot: uint64(committee.Slot),
Validators: committee.Validators,
})
}

return committees, nil
return &response, nil
}

// Perform a withdrawal credentials change on a validator
Expand Down Expand Up @@ -791,21 +782,35 @@ func (c *StandardHttpClient) getBeaconBlock(blockId string) (BeaconBlockResponse

// Get the committees for the epoch
func (c *StandardHttpClient) getCommittees(stateId string, epoch *uint64) (CommitteesResponse, error) {
var committees CommitteesResponse

query := ""
if epoch != nil {
query = fmt.Sprintf("?epoch=%d", *epoch)
}
responseBody, status, err := c.getRequest(fmt.Sprintf(RequestCommitteePath, stateId) + query)

// Committees responses are large, so let the json decoder read it in a buffered fashion
reader, status, err := c.getRequestReader(fmt.Sprintf(RequestCommitteePath, stateId) + query)
if err != nil {
return CommitteesResponse{}, fmt.Errorf("Could not get committees: %w", err)
}
defer func() {
_ = reader.Close()
}()

if status != http.StatusOK {
return CommitteesResponse{}, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", status, string(responseBody))
body, _ := io.ReadAll(reader)
return CommitteesResponse{}, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", status, string(body))
}
var committees CommitteesResponse
if err := json.Unmarshal(responseBody, &committees); err != nil {

// Pass the reader off to the decoder
decoder := json.NewDecoder(reader)

// Begin decoding
if err := decoder.Decode(&committees); err != nil {
return CommitteesResponse{}, fmt.Errorf("Could not decode committees: %w", err)
}

return committees, nil
}

Expand All @@ -822,27 +827,38 @@ func (c *StandardHttpClient) postWithdrawalCredentialsChange(request BLSToExecut
return nil
}

// Make a GET request to the beacon node
func (c *StandardHttpClient) getRequest(requestPath string) ([]byte, int, error) {
// Make a GET request but do not read its body yet (allows buffered decoding)
func (c *StandardHttpClient) getRequestReader(requestPath string) (io.ReadCloser, int, error) {

// Send request
response, err := http.Get(fmt.Sprintf(RequestUrlFormat, c.providerAddress, requestPath))
if err != nil {
return nil, 0, err
}

return response.Body, response.StatusCode, nil
}

// Make a GET request to the beacon node and read the body of the response
func (c *StandardHttpClient) getRequest(requestPath string) ([]byte, int, error) {

// Send request
reader, status, err := c.getRequestReader(requestPath)
if err != nil {
return []byte{}, 0, err
}
defer func() {
_ = response.Body.Close()
_ = reader.Close()
}()

// Get response
body, err := io.ReadAll(response.Body)
body, err := io.ReadAll(reader)
if err != nil {
return []byte{}, 0, err
}

// Return
return body, response.StatusCode, nil

return body, status, nil
}

// Make a POST request to the beacon node
Expand Down
16 changes: 16 additions & 0 deletions shared/services/beacon/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,22 @@ type Committee struct {
Validators []string `json:"validators"`
}

func (c *CommitteesResponse) Count() int {
return len(c.Data)
}

func (c *CommitteesResponse) Index(idx int) uint64 {
return uint64(c.Data[idx].Index)
}

func (c *CommitteesResponse) Slot(idx int) uint64 {
return uint64(c.Data[idx].Slot)
}

func (c *CommitteesResponse) Validators(idx int) []string {
return c.Data[idx].Validators
}

type Attestation struct {
AggregationBits string `json:"aggregation_bits"`
Data struct {
Expand Down
12 changes: 6 additions & 6 deletions shared/services/rewards/generator-impl-v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ func (r *treeGeneratorImpl_v1) processAttestationsForInterval() error {
func (r *treeGeneratorImpl_v1) processEpoch(getDuties bool, epoch uint64) error {

// Get the committee info and attestation records for this epoch
var committeeData []beacon.Committee
var committeeData beacon.Committees
attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch)
var wg errgroup.Group

Expand Down Expand Up @@ -939,20 +939,20 @@ func (r *treeGeneratorImpl_v1) checkDutiesForSlot(attestations []beacon.Attestat
}

// Maps out the attestaion duties for the given epoch
func (r *treeGeneratorImpl_v1) getDutiesForEpoch(committees []beacon.Committee) error {
func (r *treeGeneratorImpl_v1) getDutiesForEpoch(committees beacon.Committees) error {

// Crawl the committees
for _, committee := range committees {
slotIndex := committee.Slot
for idx := 0; idx < committees.Count(); idx++ {
slotIndex := committees.Slot(idx)
if slotIndex < r.rewardsFile.ConsensusStartBlock || slotIndex > r.rewardsFile.ConsensusEndBlock {
// Ignore slots that are out of bounds
continue
}
committeeIndex := committee.Index
committeeIndex := committees.Index(idx)

// Check if there are any RP validators in this committee
rpValidators := map[int]*MinipoolInfo{}
for position, validator := range committee.Validators {
for position, validator := range committees.Validators(idx) {
minipoolInfo, exists := r.validatorIndexMap[validator]
if exists {
rpValidators[position] = minipoolInfo
Expand Down
12 changes: 6 additions & 6 deletions shared/services/rewards/generator-impl-v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (r *treeGeneratorImpl_v2) processAttestationsForInterval() error {
func (r *treeGeneratorImpl_v2) processEpoch(getDuties bool, epoch uint64) error {

// Get the committee info and attestation records for this epoch
var committeeData []beacon.Committee
var committeeData beacon.Committees
attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch)
var wg errgroup.Group

Expand Down Expand Up @@ -958,20 +958,20 @@ func (r *treeGeneratorImpl_v2) checkDutiesForSlot(attestations []beacon.Attestat
}

// Maps out the attestaion duties for the given epoch
func (r *treeGeneratorImpl_v2) getDutiesForEpoch(committees []beacon.Committee) error {
func (r *treeGeneratorImpl_v2) getDutiesForEpoch(committees beacon.Committees) error {

// Crawl the committees
for _, committee := range committees {
slotIndex := committee.Slot
for idx := 0; idx < committees.Count(); idx++ {
slotIndex := committees.Slot(idx)
if slotIndex < r.rewardsFile.ConsensusStartBlock || slotIndex > r.rewardsFile.ConsensusEndBlock {
// Ignore slots that are out of bounds
continue
}
committeeIndex := committee.Index
committeeIndex := committees.Index(idx)

// Check if there are any RP validators in this committee
rpValidators := map[int]*MinipoolInfo{}
for position, validator := range committee.Validators {
for position, validator := range committees.Validators(idx) {
minipoolInfo, exists := r.validatorIndexMap[validator]
if exists {
rpValidators[position] = minipoolInfo
Expand Down
12 changes: 6 additions & 6 deletions shared/services/rewards/generator-impl-v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func (r *treeGeneratorImpl_v3) processAttestationsForInterval() error {
func (r *treeGeneratorImpl_v3) processEpoch(getDuties bool, epoch uint64) error {

// Get the committee info and attestation records for this epoch
var committeeData []beacon.Committee
var committeeData beacon.Committees
attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch)
var wg errgroup.Group

Expand Down Expand Up @@ -954,20 +954,20 @@ func (r *treeGeneratorImpl_v3) checkDutiesForSlot(attestations []beacon.Attestat
}

// Maps out the attestaion duties for the given epoch
func (r *treeGeneratorImpl_v3) getDutiesForEpoch(committees []beacon.Committee) error {
func (r *treeGeneratorImpl_v3) getDutiesForEpoch(committees beacon.Committees) error {

// Crawl the committees
for _, committee := range committees {
slotIndex := committee.Slot
for idx := 0; idx < committees.Count(); idx++ {
slotIndex := committees.Slot(idx)
if slotIndex < r.rewardsFile.ConsensusStartBlock || slotIndex > r.rewardsFile.ConsensusEndBlock {
// Ignore slots that are out of bounds
continue
}
committeeIndex := committee.Index
committeeIndex := committees.Index(idx)

// Check if there are any RP validators in this committee
rpValidators := map[int]*MinipoolInfo{}
for position, validator := range committee.Validators {
for position, validator := range committees.Validators(idx) {
minipoolInfo, exists := r.validatorIndexMap[validator]
if exists {
rpValidators[position] = minipoolInfo
Expand Down
12 changes: 6 additions & 6 deletions shared/services/rewards/generator-impl-v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func (r *treeGeneratorImpl_v4) processAttestationsForInterval() error {
func (r *treeGeneratorImpl_v4) processEpoch(getDuties bool, epoch uint64) error {

// Get the committee info and attestation records for this epoch
var committeeData []beacon.Committee
var committeeData beacon.Committees
attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch)
var wg errgroup.Group

Expand Down Expand Up @@ -1011,20 +1011,20 @@ func (r *treeGeneratorImpl_v4) checkDutiesForSlot(attestations []beacon.Attestat
}

// Maps out the attestaion duties for the given epoch
func (r *treeGeneratorImpl_v4) getDutiesForEpoch(committees []beacon.Committee) error {
func (r *treeGeneratorImpl_v4) getDutiesForEpoch(committees beacon.Committees) error {

// Crawl the committees
for _, committee := range committees {
slotIndex := committee.Slot
for idx := 0; idx < committees.Count(); idx++ {
slotIndex := committees.Slot(idx)
if slotIndex < r.rewardsFile.ConsensusStartBlock || slotIndex > r.rewardsFile.ConsensusEndBlock {
// Ignore slots that are out of bounds
continue
}
committeeIndex := committee.Index
committeeIndex := committees.Index(idx)

// Check if there are any RP validators in this committee
rpValidators := map[int]*MinipoolInfo{}
for position, validator := range committee.Validators {
for position, validator := range committees.Validators(idx) {
minipoolInfo, exists := r.validatorIndexMap[validator]
if exists {
rpValidators[position] = minipoolInfo
Expand Down
12 changes: 6 additions & 6 deletions shared/services/rewards/generator-impl-v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error {
func (r *treeGeneratorImpl_v5) processEpoch(getDuties bool, epoch uint64) error {

// Get the committee info and attestation records for this epoch
var committeeData []beacon.Committee
var committeeData beacon.Committees
attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch)
var wg errgroup.Group

Expand Down Expand Up @@ -889,20 +889,20 @@ func (r *treeGeneratorImpl_v5) checkDutiesForSlot(attestations []beacon.Attestat
}

// Maps out the attestaion duties for the given epoch
func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees []beacon.Committee) error {
func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) error {

// Crawl the committees
for _, committee := range committees {
slotIndex := committee.Slot
for idx := 0; idx < committees.Count(); idx++ {
slotIndex := committees.Slot(idx)
if slotIndex < r.rewardsFile.ConsensusStartBlock || slotIndex > r.rewardsFile.ConsensusEndBlock {
// Ignore slots that are out of bounds
continue
}
committeeIndex := committee.Index
committeeIndex := committees.Index(idx)

// Check if there are any RP validators in this committee
rpValidators := map[int]*MinipoolInfo{}
for position, validator := range committee.Validators {
for position, validator := range committees.Validators(idx) {
minipoolInfo, exists := r.validatorIndexMap[validator]
if exists {
rpValidators[position] = minipoolInfo
Expand Down

0 comments on commit 4f40e1c

Please sign in to comment.