Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 151/add rp mode test #1167

Open
wants to merge 5 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions backend/pkg/api/data_access/mobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex
data.NetworkEfficiency = utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime])

protocolModes := t.VDBProtocolModes{RocketPool: true}
rpInfos, err := d.getRocketPoolInfos(ctx, wrappedDashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool infos: %w", err)
}

// Validator status
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand Down Expand Up @@ -262,7 +268,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveApr := func(hours int, apr *float64) {
eg.Go(func() error {
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand All @@ -273,7 +279,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveRewards := func(hours int, rewards *decimal.Decimal) {
eg.Go(func() error {
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
56 changes: 42 additions & 14 deletions backend/pkg/api/data_access/vdb_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
)

func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBBlocksColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBBlocksTableRow, *t.Paging, error) {
// @DATA-ACCESS incorporate protocolModes

// -------------------------------------
// Setup
var err error
Expand All @@ -42,6 +40,15 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
}
}

// Get the rocketpool minipool infos
var rpInfos *t.RPInfo
if protocolModes.RocketPool {
rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, nil, err
}
}

searchPubkey := regexp.MustCompile(`^0x[0-9a-fA-F]{96}$`).MatchString(search)
searchGroup := regexp.MustCompile(`^[a-zA-Z0-9_\-.\ ]+$`).MatchString(search)
searchIndex := regexp.MustCompile(`^[0-9]+$`).MatchString(search)
Expand Down Expand Up @@ -123,7 +130,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
}

// -------------------------------------
// Constuct final query
// Construct final query
var blocksDs *goqu.SelectDataset

// 1. Tables
Expand Down Expand Up @@ -175,6 +182,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.COALESCE(goqu.L("rb.value / 1e18"), goqu.I("ep.fee_recipient_reward")).As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
blocksDs = blocksDs.
SelectAppend(goqu.L("(blocks.exec_fee_recipient = ? AND (rb.proposer_fee_recipient IS NULL OR rb.proposer_fee_recipient = ?)) AS is_smoothing_pool", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress))
}

// 3. Sorting and pagination
defaultColumns := []t.SortColumn{
{Column: enums.VDBBlocksColumns.Slot.ToExpr(), Desc: true, Offset: currentCursor.Slot},
Expand Down Expand Up @@ -272,6 +284,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.L("NULL::NUMERIC").As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
scheduledDs = scheduledDs.
SelectAppend(goqu.L("NULL::BOOL").As("is_smoothing_pool"))
}

// We don't have access to exec_block_number and status for a WHERE without wrapping the query so if we sort by those get all the data
if colSort.Column == enums.VDBBlocksColumns.Proposer || colSort.Column == enums.VDBBlocksColumns.Slot {
scheduledDs = scheduledDs.
Expand Down Expand Up @@ -307,16 +324,17 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
// -------------------------------------
// Execute query
var proposals []struct {
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
IsSmoothingPool sql.NullBool `db:"is_smoothing_pool"`

// for cursor only
Reward decimal.Decimal
Expand Down Expand Up @@ -351,7 +369,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
slots[i] = proposal.Slot
}

// retrieve the cl rewards, source it from clickhouse for mainnet and from postgres for holsky
// retrieve the cl rewards, source it from clickhouse for mainnet and from postgres for holesky
// TODO: harmonize this @invis
clRewardsData := []struct {
Slot uint64 `db:"slot"`
Expand Down Expand Up @@ -444,9 +462,19 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
TraceIdx: -1,
})
reward.El = proposal.ElReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool && !(proposal.IsSmoothingPool.Valid && proposal.IsSmoothingPool.Bool) {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.El = reward.El.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
if clReward, ok := clRewards[proposal.Slot]; ok && clReward.Valid {
reward.Cl = clReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.Cl = reward.Cl.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
proposals[i].Reward = proposal.ElReward.Decimal.Add(proposal.ClReward.Decimal)
data[i].Reward = &reward
Expand Down
136 changes: 136 additions & 0 deletions backend/pkg/api/data_access/vdb_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"time"

"github.com/doug-martin/goqu/v9"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gobitfly/beaconchain/pkg/api/enums"
t "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
)

//////////////////// Helper functions (must be used by more than one VDB endpoint!)
Expand Down Expand Up @@ -130,3 +133,136 @@ func (d *DataAccessService) getTimeToNextWithdrawal(distance uint64) time.Time {

return timeToWithdrawal
}

func (d *DataAccessService) getRocketPoolInfos(ctx context.Context, dashboardId t.VDBId, groupId int64) (*t.RPInfo, error) {
var rpInfo t.RPInfo
wg := errgroup.Group{}

queryResult := []struct {
ValidatorIndex uint64 `db:"validator_index"`
NodeAddress []byte `db:"node_address"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
EndTime sql.NullTime `db:"end_time"`
SmoothingPoolEth *decimal.Decimal `db:"smoothing_pool_eth"`
}{}

wg.Go(func() error {
ds := goqu.Dialect("postgres").
Select(
goqu.L("rplm.validator_index"),
goqu.L("rplm.node_address"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance"),
goqu.L("rplrs.end_time"),
goqu.L("rplrs.smoothing_pool_eth"),
).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("rocketpool_rewards_summary AS rplrs"), goqu.On(goqu.L("rplm.node_address = rplrs.node_address"))).
Where(goqu.L("rplm.node_deposit_balance IS NOT NULL")).
Where(goqu.L("rplm.user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = rplm.validator_index"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))

if groupId != t.AllGroups {
ds = ds.
Where(goqu.L("uvdv.group_id = ?", groupId))
}
} else {
ds = ds.
Where(goqu.L("rplm.validator_index = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

return nil
})

nodeMinipoolCount := make(map[string]uint64)
wg.Go(func() error {
queryResult := []struct {
NodeAddress []byte `db:"node_address"`
MinipoolCount uint64 `db:"minipool_count"`
SmoothingPoolAddress []byte `db:"smoothing_pool_address"`
}{}

err := d.alloyReader.SelectContext(ctx, &queryResult, `
SELECT
rplm.node_address,
COUNT(rplm.node_address) AS minipool_count,
rploc.smoothing_pool_address
FROM rocketpool_minipools AS rplm
LEFT JOIN rocketpool_onchain_configs AS rploc ON rplm.rocketpool_storage_address = rploc.rocketpool_storage_address
GROUP BY node_address, smoothing_pool_address`)
if err != nil {
return fmt.Errorf("error retrieving rocketpool node deposits data: %w", err)
}

for _, res := range queryResult {
node := hexutil.Encode(res.NodeAddress)
nodeMinipoolCount[node] = res.MinipoolCount
}
if len(queryResult) > 0 {
// Smoothing pool address is the same for all nodes on the network so take the first result
rpInfo.SmoothingPoolAddress = queryResult[0].SmoothingPoolAddress
}

return nil
})

err := wg.Wait()
if err != nil {
return nil, err
}

if len(queryResult) == 0 {
return nil, nil
}

rpInfo.Minipool = make(map[uint64]t.RPMinipoolInfo)

for _, res := range queryResult {
if _, ok := rpInfo.Minipool[res.ValidatorIndex]; !ok {
rpInfo.Minipool[res.ValidatorIndex] = t.RPMinipoolInfo{
NodeFee: res.NodeFee,
NodeDepositBalance: res.NodeDepositBalance,
UserDepositBalance: res.UserDepositBalance,
SmoothingPoolRewards: make(map[uint64]decimal.Decimal),
}
}

node := hexutil.Encode(res.NodeAddress)
if res.EndTime.Valid && res.SmoothingPoolEth != nil {
epoch := uint64(utils.TimeToEpoch(res.EndTime.Time))
splitReward := res.SmoothingPoolEth.Div(decimal.NewFromUint64(nodeMinipoolCount[node]))
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch] =
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch].Add(splitReward)
}
Comment on lines +238 to +253
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite complicated, I don't think we need to handle multiple result rows with the same validator index, do we? Should be unique (unless we're planning for reusable indices maybe, but then the math would be wrong).

But I could be missing something, so as it's also not wrong I'll keep it as is

}

return &rpInfo, nil
}

func (d *DataAccessService) getRocketPoolOperatorFactor(minipool t.RPMinipoolInfo) decimal.Decimal {
fullDeposit := minipool.UserDepositBalance.Add(minipool.NodeDepositBalance)
operatorShare := minipool.NodeDepositBalance.Div(fullDeposit)
invOperatorShare := decimal.NewFromInt(1).Sub(operatorShare)

commissionReward := invOperatorShare.Mul(decimal.NewFromFloat(minipool.NodeFee))
operatorFactor := operatorShare.Add(commissionReward)

return operatorFactor
}
Loading
Loading