Skip to content

Commit

Permalink
refactor: operator profit -> operation pool
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryQW committed May 1, 2024
1 parent f06e720 commit f9aed38
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 203 deletions.
4 changes: 2 additions & 2 deletions internal/database/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type Client interface {
FindStakerProfitSnapshots(ctx context.Context, query schema.StakerProfitSnapshotsQuery) ([]*schema.StakerProfitSnapshot, error)
SaveStakerProfitSnapshots(ctx context.Context, stakerProfitSnapshots []*schema.StakerProfitSnapshot) error
FindStakerCountRecentEpochs(ctx context.Context, recentEpochs int) (map[common.Address]*schema.StakeRecentCount, error)
FindOperatorProfitSnapshots(ctx context.Context, query schema.OperatorProfitSnapshotsQuery) ([]*schema.OperatorProfitSnapshot, error)
SaveOperatorProfitSnapshots(ctx context.Context, operatorProfitSnapshots []*schema.OperatorProfitSnapshot) error
FindOperationPoolSnapshots(ctx context.Context, query schema.OperationPoolSnapshotsQuery) ([]*schema.OperationPoolSnapshot, error)
SaveOperationPoolSnapshots(ctx context.Context, operatorProfitSnapshots []*schema.OperationPoolSnapshot) error

FindBridgeTransaction(ctx context.Context, query schema.BridgeTransactionQuery) (*schema.BridgeTransaction, error)
FindBridgeTransactions(ctx context.Context, query schema.BridgeTransactionsQuery) ([]*schema.BridgeTransaction, error)
Expand Down
6 changes: 3 additions & 3 deletions internal/database/dialer/cockroachdb/client_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ func (c *client) SaveNodeMinTokensToStakeSnapshots(ctx context.Context, snapshot
return c.database.WithContext(ctx).Clauses(onConflict).Create(&value).Error
}

func (c *client) FindOperatorProfitSnapshots(ctx context.Context, query schema.OperatorProfitSnapshotsQuery) ([]*schema.OperatorProfitSnapshot, error) {
databaseClient := c.database.WithContext(ctx).Table((*table.OperatorProfitSnapshot).TableName(nil))
func (c *client) FindOperationPoolSnapshots(ctx context.Context, query schema.OperationPoolSnapshotsQuery) ([]*schema.OperationPoolSnapshot, error) {
databaseClient := c.database.WithContext(ctx).Table((*table.OperationPoolSnapshot).TableName(nil))

if query.Operator != nil {
databaseClient = databaseClient.Where("operator = ?", *query.Operator)
Expand Down Expand Up @@ -613,7 +613,7 @@ func (c *client) FindOperatorProfitSnapshots(ctx context.Context, query schema.O
return snapshots.Export()
}

func (c *client) SaveOperatorProfitSnapshots(ctx context.Context, snapshots []*schema.OperatorProfitSnapshot) error {
func (c *client) SaveOperationPoolSnapshots(ctx context.Context, snapshots []*schema.OperationPoolSnapshot) error {
var value table.OperatorProfitSnapshots

if err := value.Import(snapshots); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/shopspring/decimal"
)

type OperatorProfitSnapshot struct {
type OperationPoolSnapshot struct {
ID uint64 `gorm:"column:id"`
Date time.Time `gorm:"column:date"`
EpochID uint64 `gorm:"column:epoch_id"`
Expand All @@ -18,11 +18,12 @@ type OperatorProfitSnapshot struct {
UpdatedAt time.Time `gorm:"column:updated_at"`
}

func (s *OperatorProfitSnapshot) TableName() string {
// FIXME: update table name to `node.operation_pool_snapshots`
func (s *OperationPoolSnapshot) TableName() string {
return "node.operator_profit_snapshots"
}

func (s *OperatorProfitSnapshot) Import(snapshot schema.OperatorProfitSnapshot) error {
func (s *OperationPoolSnapshot) Import(snapshot schema.OperationPoolSnapshot) error {
s.Date = snapshot.Date
s.EpochID = snapshot.EpochID
s.Operator = snapshot.Operator
Expand All @@ -33,8 +34,8 @@ func (s *OperatorProfitSnapshot) Import(snapshot schema.OperatorProfitSnapshot)
return nil
}

func (s *OperatorProfitSnapshot) Export() (*schema.OperatorProfitSnapshot, error) {
return &schema.OperatorProfitSnapshot{
func (s *OperationPoolSnapshot) Export() (*schema.OperationPoolSnapshot, error) {
return &schema.OperationPoolSnapshot{
ID: s.ID,
Date: s.Date,
EpochID: s.EpochID,
Expand All @@ -45,11 +46,11 @@ func (s *OperatorProfitSnapshot) Export() (*schema.OperatorProfitSnapshot, error
}, nil
}

type OperatorProfitSnapshots []OperatorProfitSnapshot
type OperatorProfitSnapshots []OperationPoolSnapshot

func (s *OperatorProfitSnapshots) Import(snapshots []*schema.OperatorProfitSnapshot) error {
func (s *OperatorProfitSnapshots) Import(snapshots []*schema.OperationPoolSnapshot) error {
for _, snapshot := range snapshots {
var imported OperatorProfitSnapshot
var imported OperationPoolSnapshot

if err := imported.Import(*snapshot); err != nil {
return err
Expand All @@ -61,8 +62,8 @@ func (s *OperatorProfitSnapshots) Import(snapshots []*schema.OperatorProfitSnaps
return nil
}

func (s *OperatorProfitSnapshots) Export() ([]*schema.OperatorProfitSnapshot, error) {
snapshots := make([]*schema.OperatorProfitSnapshot, 0)
func (s *OperatorProfitSnapshots) Export() ([]*schema.OperationPoolSnapshot, error) {
snapshots := make([]*schema.OperationPoolSnapshot, 0)

for _, snapshot := range *s {
exported, err := snapshot.Export()
Expand Down
4 changes: 2 additions & 2 deletions internal/service/hub/handler/nta/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (n *NTA) findOperatorHistoryProfitSnapshots(ctx context.Context, operator c
}

now := time.Now()
query := schema.OperatorProfitSnapshotsQuery{
query := schema.OperationPoolSnapshotsQuery{
Operator: lo.ToPtr(operator),
Dates: []time.Time{
now.Add(-24 * time.Hour), // 1 day
Expand All @@ -66,7 +66,7 @@ func (n *NTA) findOperatorHistoryProfitSnapshots(ctx context.Context, operator c
},
}

snapshots, err := n.databaseClient.FindOperatorProfitSnapshots(ctx, query)
snapshots, err := n.databaseClient.FindOperationPoolSnapshots(ctx, query)
if err != nil && !errors.Is(err, database.ErrorRowNotFound) {
return nil, fmt.Errorf("find operator profit snapshots: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/service/hub/handler/nta/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ func (n *NTA) GetOperatorProfitsSnapshots(c echo.Context) error {
return errorx.ValidateFailedError(c, fmt.Errorf("validate failed: %w", err))
}

query := schema.OperatorProfitSnapshotsQuery{
query := schema.OperationPoolSnapshotsQuery{
Operator: lo.ToPtr(request.Operator),
Limit: request.Limit,
Cursor: request.Cursor,
BeforeDate: request.BeforeDate,
AfterDate: request.AfterDate,
}

operatorProfitSnapshots, err := n.databaseClient.FindOperatorProfitSnapshots(c.Request().Context(), query)
operatorProfitSnapshots, err := n.databaseClient.FindOperationPoolSnapshots(c.Request().Context(), query)
if err != nil {
zap.L().Error("find operator profit snapshots", zap.Error(err))

Expand Down
2 changes: 1 addition & 1 deletion internal/service/hub/model/nta/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type BatchGetNodeMinTokensToStakeSnapshotsResponseData []*NodeMinTokensToStakeSn

type GetStakerProfitSnapshotsResponseData []*CountSnapshot

type GetOperatorProfitsSnapshotsResponseData []*schema.OperatorProfitSnapshot
type GetOperatorProfitsSnapshotsResponseData []*schema.OperationPoolSnapshot

type CountSnapshot struct {
Date string `json:"date"`
Expand Down
212 changes: 212 additions & 0 deletions internal/service/scheduler/snapshot/operation_pool/operation_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package operationpool

import (
"context"
"errors"
"fmt"
"math/big"
"os"
"os/signal"
"syscall"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/redis/go-redis/v9"
"github.com/rss3-network/global-indexer/common/ethereum"
"github.com/rss3-network/global-indexer/contract/l2"
"github.com/rss3-network/global-indexer/internal/cronjob"
"github.com/rss3-network/global-indexer/internal/database"
"github.com/rss3-network/global-indexer/internal/service"
"github.com/rss3-network/global-indexer/schema"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"github.com/sourcegraph/conc/pool"
"go.uber.org/zap"
)

var (
Name = "operator_profit"
Timeout = 3 * time.Minute
)

var _ service.Server = (*server)(nil)

type server struct {
cronJob *cronjob.CronJob
databaseClient database.Client
redisClient *redis.Client
stakingContract *l2.Staking
}

func (s *server) Name() string {
return Name
}

func (s *server) Spec() string {
return "0 */1 * * * *" // every minute
}

func (s *server) Run(ctx context.Context) error {
err := s.cronJob.AddFunc(ctx, s.Spec(), func() {
// Query the latest Operation Pool snapshot.
snapshot, err := s.databaseClient.FindOperationPoolSnapshots(ctx, schema.OperationPoolSnapshotsQuery{Limit: lo.ToPtr(1)})
if err != nil && !errors.Is(err, database.ErrorRowNotFound) {
zap.L().Error("find Operation Pool snapshot", zap.Error(err))

return
}

// Query the latest epoch.
epoch, err := s.databaseClient.FindEpochs(ctx, 1, nil)
if err != nil && !errors.Is(err, database.ErrorRowNotFound) {
zap.L().Error("find epoch", zap.Error(err))

return
}

// Assign Epoch Ids based on the retrieved snapshot and epoch.
var latestSnapshotEpochID, latestEpochID uint64

if len(snapshot) > 0 {
latestSnapshotEpochID = snapshot[0].EpochID
}

if len(epoch) > 0 {
latestEpochID = epoch[0].ID
}

// Only begin the snapshot process if the latest snapshot is behind the latest epoch.
if latestSnapshotEpochID < latestEpochID {
if err := s.beginSnapshot(ctx, latestSnapshotEpochID, latestEpochID); err != nil {
zap.L().Error("save Operation Pool snapshot", zap.Error(err))

return
}
}
})

if err != nil {
return fmt.Errorf("add Operation Pool snapshot cron job: %w", err)
}

s.cronJob.Start()
defer s.cronJob.Stop()

stopchan := make(chan os.Signal, 1)

signal.Notify(stopchan, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
<-stopchan

return nil
}

// beginSnapshot takes new snapshots of all Nodes' operation pool up to the latest epoch.
func (s *server) beginSnapshot(ctx context.Context, currentSnapshotEpochID, latestEpochID uint64) error {
// Query the array of Nodes.
nodes, err := s.getNodesFromDB(ctx)
if err != nil {
return fmt.Errorf("get nodes: %w", err)
}

// Iterate until the snapshot is up to date with the latest epoch.
// currentEpochID is the epoch being snapshotted.
for currentEpochID := currentSnapshotEpochID + 1; currentEpochID <= latestEpochID; currentEpochID++ {
// Fetch the epoch by currentEpochID.
epoch, err := s.databaseClient.FindEpochTransactions(ctx, currentEpochID, 1, nil)
if err != nil {
zap.L().Error("find epoch transactions", zap.Error(err))

continue
}

// If the epoch does not exist in the database, log an error and continue.
// This means epochs are not indexed up to the latest epoch.
if len(epoch) == 0 {
zap.L().Error("an epoch does not exist in database", zap.Any("epoch ID", currentEpochID))

continue
}

// If there are no nodes, continue to the next epoch.
if len(nodes) == 0 {
continue
}

var (
errorPool = pool.New().WithContext(ctx).WithMaxGoroutines(30).WithCancelOnError().WithFirstError()
result = make([]*schema.OperationPoolSnapshot, len(nodes))
)

for i, node := range nodes {
errorPool.Go(func(ctx context.Context) error {
// Query the Node info from the VSL.
nodeInfo, err := s.getNodeInfoFromVSL(ctx, epoch[0].BlockNumber, node.Address)
if err != nil {
return err
}

// should not include genesis account
if nodeInfo.Account == ethereum.AddressGenesis {
return nil
}

result[i] = &schema.OperationPoolSnapshot{
Date: time.Unix(epoch[0].BlockTimestamp, 0),
EpochID: currentEpochID,
Operator: nodeInfo.Account,
OperationPool: decimal.NewFromBigInt(nodeInfo.OperationPoolTokens, 0),
}

return nil
})
}

if err := errorPool.Wait(); err != nil {
return fmt.Errorf("fetch operator profit: %w", err)
}

// Filter out nil values in the result.
result = lo.FilterMap(result, func(snapshot *schema.OperationPoolSnapshot, _ int) (*schema.OperationPoolSnapshot, bool) {
return snapshot, snapshot != nil
})

// Save snapshots into the database.
if err := s.databaseClient.SaveOperationPoolSnapshots(ctx, result); err != nil {
return fmt.Errorf("save Operation Pool: %w", err)
}
}

return nil
}

func (s *server) getNodesFromDB(ctx context.Context) ([]*schema.Node, error) {
nodes, err := s.databaseClient.FindNodes(ctx, schema.FindNodesQuery{})

if err != nil {
return nil, fmt.Errorf("find nodes from DB: %w", err)
}

return nodes, nil
}

func (s *server) getNodeInfoFromVSL(ctx context.Context, blockNumber *big.Int, nodeAddress common.Address) (*l2.DataTypesNode, error) {
nodeInfo, err := s.stakingContract.GetNode(&bind.CallOpts{Context: ctx, BlockNumber: blockNumber}, nodeAddress)
if err != nil {
msg := "get node from VSL error"
zap.L().Error(msg, zap.Error(err))

return nil, fmt.Errorf("%s: %w", msg, err)
}

return &nodeInfo, nil
}

func New(databaseClient database.Client, redisClient *redis.Client, stakingContract *l2.Staking) service.Server {
return &server{
cronJob: cronjob.New(redisClient, Name, Timeout),
databaseClient: databaseClient,
redisClient: redisClient,
stakingContract: stakingContract,
}
}
Loading

0 comments on commit f9aed38

Please sign in to comment.