Skip to content

Commit

Permalink
services/horizon: Batch liquidity pool updates/removals (#3944)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Sep 21, 2021
1 parent 46be761 commit 1aa506e
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 320 deletions.
7 changes: 1 addition & 6 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,7 @@ func createLP(tt *test.T, q *history.Q) history.LiquidityPool {
LastModifiedLedger: 123,
}

builder := q.NewLiquidityPoolsBatchInsertBuilder(2)

err := builder.Add(tt.Ctx, lp)
tt.Assert.NoError(err)

err = builder.Exec(tt.Ctx)
err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp})
tt.Assert.NoError(err)
return lp
}
Expand Down
18 changes: 6 additions & 12 deletions services/horizon/internal/actions/liquidity_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ func TestGetLiquidityPoolByID(t *testing.T) {
LastModifiedLedger: 100,
}

builder := q.NewLiquidityPoolsBatchInsertBuilder(2)
err := builder.Add(tt.Ctx, lp)
tt.Assert.NoError(err)
err = builder.Exec(tt.Ctx)
err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp})
tt.Assert.NoError(err)

handler := GetLiquidityPoolByIDHandler{}
Expand Down Expand Up @@ -95,8 +92,7 @@ func TestGetLiquidityPools(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

builder := q.NewLiquidityPoolsBatchInsertBuilder(2)
err := builder.Add(tt.Ctx, history.LiquidityPool{
lp1 := history.LiquidityPool{
PoolID: "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 30,
Expand All @@ -113,9 +109,8 @@ func TestGetLiquidityPools(t *testing.T) {
},
},
LastModifiedLedger: 100,
})
tt.Assert.NoError(err)
err = builder.Add(tt.Ctx, history.LiquidityPool{
}
lp2 := history.LiquidityPool{
PoolID: "d827bf10a721d217de3cd9ab3f10198a54de558c093a511ec426028618df2633",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 30,
Expand All @@ -132,9 +127,8 @@ func TestGetLiquidityPools(t *testing.T) {
},
},
LastModifiedLedger: 100,
})
tt.Assert.NoError(err)
err = builder.Exec(tt.Ctx)
}
err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp1, lp2})
tt.Assert.NoError(err)

handler := GetLiquidityPoolsHandler{}
Expand Down
97 changes: 30 additions & 67 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand Down Expand Up @@ -80,16 +77,9 @@ func (lpar *LiquidityPoolAssetReserve) UnmarshalJSON(data []byte) error {
return nil
}

type LiquidityPoolsBatchInsertBuilder interface {
Add(ctx context.Context, lp LiquidityPool) error
Exec(ctx context.Context) error
}

// QLiquidityPools defines liquidity-pool-related queries.
type QLiquidityPools interface {
NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder
UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error)
RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error)
UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error
GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error)
GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error)
CountLiquidityPools(ctx context.Context) (int, error)
Expand All @@ -98,25 +88,37 @@ type QLiquidityPools interface {
CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error)
}

// NewLiquidityPoolsBatchInsertBuilder constructs a new LiquidityPoolsBatchInsertBuilder instance
func (q *Q) NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder {
cols := db.ColumnsForStruct(LiquidityPool{})
excludedCols := make([]string, len(cols))
for i, col := range cols {
excludedCols[i] = "EXCLUDED." + col
// UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table.
// There's currently no limit of the number of liquidity pools this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error {
var poolID, typ, fee, shareCount, trustlineCount,
assetReserves, lastModifiedLedger, deleted []interface{}

for _, lp := range lps {
poolID = append(poolID, lp.PoolID)
typ = append(typ, lp.Type)
fee = append(fee, lp.Fee)
trustlineCount = append(trustlineCount, lp.TrustlineCount)
shareCount = append(shareCount, lp.ShareCount)
assetReserves = append(assetReserves, lp.AssetReserves)
lastModifiedLedger = append(lastModifiedLedger, lp.LastModifiedLedger)
deleted = append(deleted, lp.Deleted)
}
suffix := fmt.Sprintf(
"ON CONFLICT (id) DO UPDATE SET (%s) = (%s)",
strings.Join(cols, ", "),
strings.Join(excludedCols, ", "),
)
return &liquidityPoolsBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("liquidity_pools"),
MaxBatchSize: maxBatchSize,
Suffix: suffix,
},

upsertFields := []upsertField{
{"id", "text", poolID},
{"type", "smallint", typ},
{"fee", "integer", fee},
{"trustline_count", "bigint", trustlineCount},
{"share_count", "bigint", shareCount},
{"asset_reserves", "jsonb", assetReserves},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"deleted", "boolean", deleted},
}

return q.upsertRows(ctx, "liquidity_pools", "id", upsertFields)
}

// CountLiquidityPools returns the total number of liquidity pools in the DB
Expand All @@ -140,33 +142,6 @@ func (q *Q) GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]Liqu
return liquidityPools, err
}

// UpdateLiquidityPool updates a row in the liquidity_pools table.
// Returns number of rows affected and error.
func (q *Q) UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error) {
updateBuilder := q.GetTable("liquidity_pools").Update()
result, err := updateBuilder.SetStruct(lp, []string{}).Where("id = ?", lp.PoolID).Exec(ctx)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// RemoveLiquidityPool marks the given liquidity pool as deleted.
// Returns number of rows affected and error.
func (q *Q) RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error) {
sql := sq.Update("liquidity_pools").
Set("deleted", true).
Set("last_modified_ledger", lastModifiedLedger).
Where(sq.Eq{"id": liquidityPoolID})
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// FindLiquidityPoolByID returns a liquidity pool.
func (q *Q) FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error) {
var lp LiquidityPool
Expand Down Expand Up @@ -234,18 +209,6 @@ func (q *Q) CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (i
return result.RowsAffected()
}

type liquidityPoolsBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}

func (i *liquidityPoolsBatchInsertBuilder) Add(ctx context.Context, lp LiquidityPool) error {
return i.builder.RowStruct(ctx, lp)
}

func (i *liquidityPoolsBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}

var liquidityPoolsSelectStatement = "lp.id, " +
"lp.type, " +
"lp.fee, " +
Expand Down

This file was deleted.

Loading

0 comments on commit 1aa506e

Please sign in to comment.