From 1aa506ef8e2435f73fd40e9edefdb495e017d411 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 21 Sep 2021 19:22:06 +0200 Subject: [PATCH] services/horizon: Batch liquidity pool updates/removals (#3944) --- .../horizon/internal/actions/account_test.go | 7 +- .../internal/actions/liquidity_pool_test.go | 18 ++-- .../internal/db2/history/liquidity_pools.go | 97 ++++++------------- ...quidity_pools_batch_insert_builder_test.go | 51 ---------- .../db2/history/liquidity_pools_test.go | 64 ++++-------- .../db2/history/mock_q_liquidity_pools.go | 30 +----- .../horizon/internal/db2/history/offers.go | 2 +- .../internal/ingest/processor_runner_test.go | 23 ----- .../liquidity_pools_change_processor.go | 56 +++-------- .../liquidity_pools_change_processor_test.go | 59 +++-------- 10 files changed, 87 insertions(+), 320 deletions(-) delete mode 100644 services/horizon/internal/db2/history/liquidity_pools_batch_insert_builder_test.go diff --git a/services/horizon/internal/actions/account_test.go b/services/horizon/internal/actions/account_test.go index edeba85d05..bd4ef85c5d 100644 --- a/services/horizon/internal/actions/account_test.go +++ b/services/horizon/internal/actions/account_test.go @@ -554,12 +554,7 @@ func createLP(tt *test.T, q *history.Q) history.LiquidityPool { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) - tt.Assert.NoError(err) - - err = builder.Exec(tt.Ctx) + err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp}) tt.Assert.NoError(err) return lp } diff --git a/services/horizon/internal/actions/liquidity_pool_test.go b/services/horizon/internal/actions/liquidity_pool_test.go index 75ac5a4314..bf645d408a 100644 --- a/services/horizon/internal/actions/liquidity_pool_test.go +++ b/services/horizon/internal/actions/liquidity_pool_test.go @@ -37,10 +37,7 @@ func TestGetLiquidityPoolByID(t *testing.T) { LastModifiedLedger: 100, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - err := builder.Add(tt.Ctx, lp) - tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) + err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp}) tt.Assert.NoError(err) handler := GetLiquidityPoolByIDHandler{} @@ -95,8 +92,7 @@ func TestGetLiquidityPools(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{tt.HorizonSession()} - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - err := builder.Add(tt.Ctx, history.LiquidityPool{ + lp1 := history.LiquidityPool{ PoolID: "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad", Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct, Fee: 30, @@ -113,9 +109,8 @@ func TestGetLiquidityPools(t *testing.T) { }, }, LastModifiedLedger: 100, - }) - tt.Assert.NoError(err) - err = builder.Add(tt.Ctx, history.LiquidityPool{ + } + lp2 := history.LiquidityPool{ PoolID: "d827bf10a721d217de3cd9ab3f10198a54de558c093a511ec426028618df2633", Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct, Fee: 30, @@ -132,9 +127,8 @@ func TestGetLiquidityPools(t *testing.T) { }, }, LastModifiedLedger: 100, - }) - tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) + } + err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp1, lp2}) tt.Assert.NoError(err) handler := GetLiquidityPoolsHandler{} diff --git a/services/horizon/internal/db2/history/liquidity_pools.go b/services/horizon/internal/db2/history/liquidity_pools.go index 3627060c3c..c50201f993 100644 --- a/services/horizon/internal/db2/history/liquidity_pools.go +++ b/services/horizon/internal/db2/history/liquidity_pools.go @@ -4,12 +4,9 @@ import ( "context" "database/sql/driver" "encoding/json" - "fmt" - "strings" sq "github.com/Masterminds/squirrel" "github.com/stellar/go/services/horizon/internal/db2" - "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -80,16 +77,9 @@ func (lpar *LiquidityPoolAssetReserve) UnmarshalJSON(data []byte) error { return nil } -type LiquidityPoolsBatchInsertBuilder interface { - Add(ctx context.Context, lp LiquidityPool) error - Exec(ctx context.Context) error -} - // QLiquidityPools defines liquidity-pool-related queries. type QLiquidityPools interface { - NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder - UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error) - RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error) + UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) CountLiquidityPools(ctx context.Context) (int, error) @@ -98,25 +88,37 @@ type QLiquidityPools interface { CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error) } -// NewLiquidityPoolsBatchInsertBuilder constructs a new LiquidityPoolsBatchInsertBuilder instance -func (q *Q) NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder { - cols := db.ColumnsForStruct(LiquidityPool{}) - excludedCols := make([]string, len(cols)) - for i, col := range cols { - excludedCols[i] = "EXCLUDED." + col +// UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table. +// There's currently no limit of the number of liquidity pools this method can +// accept other than 2GB limit of the query string length what should be enough +// for each ledger with the current limits. +func (q *Q) UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error { + var poolID, typ, fee, shareCount, trustlineCount, + assetReserves, lastModifiedLedger, deleted []interface{} + + for _, lp := range lps { + poolID = append(poolID, lp.PoolID) + typ = append(typ, lp.Type) + fee = append(fee, lp.Fee) + trustlineCount = append(trustlineCount, lp.TrustlineCount) + shareCount = append(shareCount, lp.ShareCount) + assetReserves = append(assetReserves, lp.AssetReserves) + lastModifiedLedger = append(lastModifiedLedger, lp.LastModifiedLedger) + deleted = append(deleted, lp.Deleted) } - suffix := fmt.Sprintf( - "ON CONFLICT (id) DO UPDATE SET (%s) = (%s)", - strings.Join(cols, ", "), - strings.Join(excludedCols, ", "), - ) - return &liquidityPoolsBatchInsertBuilder{ - builder: db.BatchInsertBuilder{ - Table: q.GetTable("liquidity_pools"), - MaxBatchSize: maxBatchSize, - Suffix: suffix, - }, + + upsertFields := []upsertField{ + {"id", "text", poolID}, + {"type", "smallint", typ}, + {"fee", "integer", fee}, + {"trustline_count", "bigint", trustlineCount}, + {"share_count", "bigint", shareCount}, + {"asset_reserves", "jsonb", assetReserves}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + {"deleted", "boolean", deleted}, } + + return q.upsertRows(ctx, "liquidity_pools", "id", upsertFields) } // CountLiquidityPools returns the total number of liquidity pools in the DB @@ -140,33 +142,6 @@ func (q *Q) GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]Liqu return liquidityPools, err } -// UpdateLiquidityPool updates a row in the liquidity_pools table. -// Returns number of rows affected and error. -func (q *Q) UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error) { - updateBuilder := q.GetTable("liquidity_pools").Update() - result, err := updateBuilder.SetStruct(lp, []string{}).Where("id = ?", lp.PoolID).Exec(ctx) - if err != nil { - return 0, err - } - - return result.RowsAffected() -} - -// RemoveLiquidityPool marks the given liquidity pool as deleted. -// Returns number of rows affected and error. -func (q *Q) RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error) { - sql := sq.Update("liquidity_pools"). - Set("deleted", true). - Set("last_modified_ledger", lastModifiedLedger). - Where(sq.Eq{"id": liquidityPoolID}) - result, err := q.Exec(ctx, sql) - if err != nil { - return 0, err - } - - return result.RowsAffected() -} - // FindLiquidityPoolByID returns a liquidity pool. func (q *Q) FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error) { var lp LiquidityPool @@ -234,18 +209,6 @@ func (q *Q) CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (i return result.RowsAffected() } -type liquidityPoolsBatchInsertBuilder struct { - builder db.BatchInsertBuilder -} - -func (i *liquidityPoolsBatchInsertBuilder) Add(ctx context.Context, lp LiquidityPool) error { - return i.builder.RowStruct(ctx, lp) -} - -func (i *liquidityPoolsBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) -} - var liquidityPoolsSelectStatement = "lp.id, " + "lp.type, " + "lp.fee, " + diff --git a/services/horizon/internal/db2/history/liquidity_pools_batch_insert_builder_test.go b/services/horizon/internal/db2/history/liquidity_pools_batch_insert_builder_test.go deleted file mode 100644 index ebe6c314c9..0000000000 --- a/services/horizon/internal/db2/history/liquidity_pools_batch_insert_builder_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package history - -import ( - "testing" - - "github.com/stellar/go/services/horizon/internal/test" - "github.com/stellar/go/xdr" -) - -func TestAddLiquidityPool(t *testing.T) { - tt := test.Start(t) - defer tt.Finish() - test.ResetHorizonDB(t, tt.HorizonDB) - q := &Q{tt.HorizonSession()} - - lp := LiquidityPool{ - PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000", - Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct, - Fee: 34, - TrustlineCount: 52115, - ShareCount: 412241, - AssetReserves: []LiquidityPoolAssetReserve{ - { - Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Reserve: 450, - }, - { - Asset: xdr.MustNewNativeAsset(), - Reserve: 450, - }, - }, - LastModifiedLedger: 123, - } - - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) - tt.Assert.NoError(err) - - err = builder.Exec(tt.Ctx) - tt.Assert.NoError(err) - - lps := []LiquidityPool{} - err = q.Select(tt.Ctx, &lps, selectLiquidityPools) - - if tt.Assert.NoError(err) { - tt.Assert.Len(lps, 1) - lpObtained := lps[0] - tt.Assert.Equal(lp, lpObtained) - } -} diff --git a/services/horizon/internal/db2/history/liquidity_pools_test.go b/services/horizon/internal/db2/history/liquidity_pools_test.go index 06d3ab12a4..d6e7a3aa1d 100644 --- a/services/horizon/internal/db2/history/liquidity_pools_test.go +++ b/services/horizon/internal/db2/history/liquidity_pools_test.go @@ -33,19 +33,23 @@ func TestFindLiquidityPool(t *testing.T) { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) + err := q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{lp}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) - lpObtained, err := q.FindLiquidityPoolByID(tt.Ctx, lp.PoolID) tt.Assert.NoError(err) tt.Assert.Equal(lp, lpObtained) } +func removeLiquidityPool(t *test.T, q *Q, lp LiquidityPool, sequence uint32) { + removed := lp + removed.Deleted = true + removed.LastModifiedLedger = sequence + err := q.UpsertLiquidityPools(t.Ctx, []LiquidityPool{removed}) + t.Assert.NoError(err) +} + func TestRemoveLiquidityPool(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -71,13 +75,9 @@ func TestRemoveLiquidityPool(t *testing.T) { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) + err := q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{lp}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) - count, err := q.CountLiquidityPools(tt.Ctx) tt.Assert.NoError(err) tt.Assert.Equal(1, count) @@ -86,9 +86,7 @@ func TestRemoveLiquidityPool(t *testing.T) { tt.Assert.NoError(err) tt.Assert.NotNil(lpObtained) - removed, err := q.RemoveLiquidityPool(tt.Ctx, lp.PoolID, 200) - tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), removed) + removeLiquidityPool(tt, q, lp, 200) _, err = q.FindLiquidityPoolByID(tt.Ctx, lp.PoolID) tt.Assert.EqualError(err, "sql: no rows in result set") @@ -111,9 +109,8 @@ func TestRemoveLiquidityPool(t *testing.T) { lp.Deleted = false lp.ShareCount = 1 lp.TrustlineCount = 2 - err = builder.Add(tt.Ctx, lp) + err = q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{lp}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) tt.Assert.NoError(err) lpObtained, err = q.FindLiquidityPoolByID(tt.Ctx, lp.PoolID) @@ -148,13 +145,9 @@ func TestFindLiquidityPoolsByAssets(t *testing.T) { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) + err := q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{lp}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) - // query by no asset query := LiquidityPoolsQuery{ PageQuery: db2.MustPageQuery("", false, "", 10), @@ -203,9 +196,7 @@ func TestFindLiquidityPoolsByAssets(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(lps, 0) - removed, err := q.RemoveLiquidityPool(tt.Ctx, lp.PoolID, 200) - tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), removed) + removeLiquidityPool(tt, q, lp, 200) query = LiquidityPoolsQuery{ PageQuery: db2.MustPageQuery("", false, "", 10), @@ -251,13 +242,9 @@ func TestLiquidityPoolCompaction(t *testing.T) { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) + err := q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{lp}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) - compationSequence, err := q.GetLiquidityPoolCompactionSequence(tt.Ctx) tt.Assert.NoError(err) tt.Assert.Equal(uint32(0), compationSequence) @@ -279,7 +266,7 @@ func TestLiquidityPoolCompaction(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(lps, 1) - q.RemoveLiquidityPool(tt.Ctx, lp.PoolID, 200) + removeLiquidityPool(tt, q, lp, 200) lps, err = q.GetLiquidityPools(tt.Ctx, query) tt.Assert.NoError(err) @@ -333,13 +320,9 @@ func TestUpdateLiquidityPool(t *testing.T) { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, initialLP) + err := q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{initialLP}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) - updatedLP := LiquidityPool{ PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000", Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct, @@ -359,9 +342,8 @@ func TestUpdateLiquidityPool(t *testing.T) { LastModifiedLedger: 124, } - updated, err := q.UpdateLiquidityPool(tt.Ctx, updatedLP) + err = q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{updatedLP}) tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), updated) lps := []LiquidityPool{} err = q.Select(tt.Ctx, &lps, selectLiquidityPools) @@ -396,20 +378,14 @@ func TestGetLiquidityPoolsByID(t *testing.T) { LastModifiedLedger: 123, } - builder := q.NewLiquidityPoolsBatchInsertBuilder(2) - - err := builder.Add(tt.Ctx, lp) + err := q.UpsertLiquidityPools(tt.Ctx, []LiquidityPool{lp}) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) - r, err := q.GetLiquidityPoolsByID(tt.Ctx, []string{lp.PoolID}) tt.Assert.NoError(err) tt.Assert.Len(r, 1) - removed, err := q.RemoveLiquidityPool(tt.Ctx, lp.PoolID, 200) - tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), removed) + removeLiquidityPool(tt, q, lp, 200) r, err = q.GetLiquidityPoolsByID(tt.Ctx, []string{lp.PoolID}) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/mock_q_liquidity_pools.go b/services/horizon/internal/db2/history/mock_q_liquidity_pools.go index dec955be27..e42122865d 100644 --- a/services/horizon/internal/db2/history/mock_q_liquidity_pools.go +++ b/services/horizon/internal/db2/history/mock_q_liquidity_pools.go @@ -11,9 +11,9 @@ type MockQLiquidityPools struct { mock.Mock } -func (m *MockQLiquidityPools) NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder { - a := m.Called(maxBatchSize) - return a.Get(0).(LiquidityPoolsBatchInsertBuilder) +func (m *MockQLiquidityPools) UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error { + a := m.Called(ctx, lps) + return a.Error(0) } func (m *MockQLiquidityPools) CountLiquidityPools(ctx context.Context) (int, error) { @@ -26,16 +26,6 @@ func (m *MockQLiquidityPools) GetLiquidityPoolsByID(ctx context.Context, poolIDs return a.Get(0).([]LiquidityPool), a.Error(1) } -func (m *MockQLiquidityPools) UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error) { - a := m.Called(ctx, lp) - return a.Get(0).(int64), a.Error(1) -} - -func (m *MockQLiquidityPools) RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, sequence uint32) (int64, error) { - a := m.Called(ctx, liquidityPoolID, sequence) - return a.Get(0).(int64), a.Error(1) -} - func (m *MockQLiquidityPools) FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error) { a := m.Called(ctx, liquidityPoolID) return a.Get(0).(LiquidityPool), a.Error(1) @@ -55,17 +45,3 @@ func (m *MockQLiquidityPools) CompactLiquidityPools(ctx context.Context, cutOffS a := m.Called(ctx, cutOffSequence) return a.Get(0).(int64), a.Error(1) } - -type MockLiquidityPoolsBatchInsertBuilder struct { - mock.Mock -} - -func (m *MockLiquidityPoolsBatchInsertBuilder) Add(ctx context.Context, lp LiquidityPool) error { - a := m.Called(ctx, lp) - return a.Error(0) -} - -func (m *MockLiquidityPoolsBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) - return a.Error(0) -} diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index 5b875b72cf..b6d3f0686b 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -95,7 +95,7 @@ func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]O return offers, err } -// UpsertOffers upserts a batch of offers in the offerss table. +// UpsertOffers upserts a batch of offers in the offers table. // There's currently no limit of the number of offers this method can // accept other than 2GB limit of the query string length what should be enough // for each ledger with the current limits. diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 549732caca..05f3c03247 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -36,12 +36,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q.MockQClaimableBalances.On("NewClaimableBalancesBatchInsertBuilder", maxBatchSize). Return(mockClaimableBalancesBatchInsertBuilder).Once() - mockLiquidityPoolsBatchInsertBuilder := &history.MockLiquidityPoolsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockLiquidityPoolsBatchInsertBuilder) - mockLiquidityPoolsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQLiquidityPools.On("NewLiquidityPoolsBatchInsertBuilder", maxBatchSize). - Return(mockLiquidityPoolsBatchInsertBuilder).Once() - q.MockQAccounts.On("UpsertAccounts", ctx, []xdr.LedgerEntry{ { LastModifiedLedgerSeq: 1, @@ -125,12 +119,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { q.MockQClaimableBalances.On("NewClaimableBalancesBatchInsertBuilder", maxBatchSize). Return(mockClaimableBalancesBatchInsertBuilder).Once() - mockLiquidityPoolsBatchInsertBuilder := &history.MockLiquidityPoolsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockLiquidityPoolsBatchInsertBuilder) - mockLiquidityPoolsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQLiquidityPools.On("NewLiquidityPoolsBatchInsertBuilder", maxBatchSize). - Return(mockLiquidityPoolsBatchInsertBuilder).Once() - q.MockQAccounts.On("UpsertAccounts", ctx, []xdr.LedgerEntry{ { LastModifiedLedgerSeq: 1, @@ -352,12 +340,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { q.MockQClaimableBalances.On("NewClaimableBalancesBatchInsertBuilder", maxBatchSize). Return(mockClaimableBalancesBatchInsertBuilder).Once() - mockLiquidityPoolsBatchInsertBuilder := &history.MockLiquidityPoolsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockLiquidityPoolsBatchInsertBuilder) - mockLiquidityPoolsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQLiquidityPools.On("NewLiquidityPoolsBatchInsertBuilder", maxBatchSize). - Return(mockLiquidityPoolsBatchInsertBuilder).Once() - q.MockQLedgers.On("InsertLedger", ctx, ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion). Return(int64(1), nil).Once() @@ -418,11 +400,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t q.MockQClaimableBalances.On("NewClaimableBalancesBatchInsertBuilder", maxBatchSize). Return(mockClaimableBalancesBatchInsertBuilder).Once() - mockLiquidityPoolsBatchInsertBuilder := &history.MockLiquidityPoolsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockLiquidityPoolsBatchInsertBuilder) - q.MockQLiquidityPools.On("NewLiquidityPoolsBatchInsertBuilder", maxBatchSize). - Return(mockLiquidityPoolsBatchInsertBuilder).Once() - runner := ProcessorRunner{ ctx: ctx, config: config, diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go index b1e163a446..c5e5252280 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go @@ -50,64 +50,30 @@ func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, chang } func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error { - batch := p.qLiquidityPools.NewLiquidityPoolsBatchInsertBuilder(maxBatchSize) changes := p.cache.GetChanges() + var lps []history.LiquidityPool for _, change := range changes { - var err error - var rowsAffected int64 - var action string - var ledgerKey xdr.LedgerKey - switch { case change.Pre == nil && change.Post != nil: // Created - action = "inserting" - err = batch.Add(ctx, p.ledgerEntryToRow(change.Post)) - rowsAffected = 1 + lps = append(lps, p.ledgerEntryToRow(change.Post)) case change.Pre != nil && change.Post == nil: // Removed - action = "removing" - lPool := change.Pre.Data.MustLiquidityPool() - err = ledgerKey.SetLiquidityPool(lPool.LiquidityPoolId) - if err != nil { - return errors.Wrap(err, "Error creating ledger key") - } - rowsAffected, err = p.qLiquidityPools.RemoveLiquidityPool( - ctx, PoolIDToString(lPool.LiquidityPoolId), p.sequence, - ) + lp := p.ledgerEntryToRow(change.Pre) + lp.Deleted = true + lp.LastModifiedLedger = p.sequence + lps = append(lps, lp) default: // Updated - action = "updating" - cBalance := change.Post.Data.MustLiquidityPool() - err = ledgerKey.SetLiquidityPool(cBalance.LiquidityPoolId) - if err != nil { - return errors.Wrap(err, "Error creating ledger key") - } - rowsAffected, err = p.qLiquidityPools.UpdateLiquidityPool(ctx, p.ledgerEntryToRow(change.Post)) - } - - if err != nil { - return err - } - - if rowsAffected != 1 { - ledgerKeyString, err := ledgerKey.MarshalBinaryBase64() - if err != nil { - return errors.Wrap(err, "Error marshalling ledger key") - } - return ingest.NewStateError(errors.Errorf( - "%d rows affected when %s liquidity pool: %s", - rowsAffected, - action, - ledgerKeyString, - )) + lps = append(lps, p.ledgerEntryToRow(change.Post)) } } - err := batch.Exec(ctx) - if err != nil { - return errors.Wrap(err, "error executing batch") + if len(lps) > 0 { + if err := p.qLiquidityPools.UpsertLiquidityPools(ctx, lps); err != nil { + return errors.Wrap(err, "error upserting liquidity pools") + } } if p.sequence > compactionWindow { diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go index 0c23d4e6a0..4e7383b1fe 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go @@ -19,31 +19,23 @@ func TestLiquidityPoolsChangeProcessorTestSuiteState(t *testing.T) { type LiquidityPoolsChangeProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *LiquidityPoolsChangeProcessor - mockQ *history.MockQLiquidityPools - mockBatchInsertBuilder *history.MockLiquidityPoolsBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *LiquidityPoolsChangeProcessor + mockQ *history.MockQLiquidityPools + sequence uint32 } func (s *LiquidityPoolsChangeProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQLiquidityPools{} - s.mockBatchInsertBuilder = &history.MockLiquidityPoolsBatchInsertBuilder{} - - s.mockQ. - On("NewLiquidityPoolsBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder) s.sequence = 456 s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence) } func (s *LiquidityPoolsChangeProcessorTestSuiteState) TearDownTest() { - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *LiquidityPoolsChangeProcessorTestSuiteState) TestNoEntries() { @@ -93,8 +85,8 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteState) TestCreatesLiquidityPools( }, LastModifiedLedger: 123, } + s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{lp}).Return(nil).Once() - s.mockBatchInsertBuilder.On("Add", s.ctx, lp).Return(nil).Once() s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ @@ -117,28 +109,21 @@ func TestLiquidityPoolsChangeProcessorTestSuiteLedger(t *testing.T) { type LiquidityPoolsChangeProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *LiquidityPoolsChangeProcessor - mockQ *history.MockQLiquidityPools - mockBatchInsertBuilder *history.MockLiquidityPoolsBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *LiquidityPoolsChangeProcessor + mockQ *history.MockQLiquidityPools + sequence uint32 } func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQLiquidityPools{} - s.mockBatchInsertBuilder = &history.MockLiquidityPoolsBatchInsertBuilder{} - - s.mockQ. - On("NewLiquidityPoolsBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder) s.sequence = 456 s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence) } func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TearDownTest() { - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) s.mockQ.AssertExpectations(s.T()) } @@ -233,13 +218,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() { }, LastModifiedLedger: 123, } - // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On( - "Add", - s.ctx, - postLP, - ).Return(nil).Once() - + s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{postLP}).Return(nil).Once() s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once() } @@ -318,12 +297,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestUpdateLiquidityPool() LastModifiedLedger: 123, } - s.mockQ.On( - "UpdateLiquidityPool", - s.ctx, - postLP, - ).Return(int64(1), nil).Once() - + s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{postLP}).Return(nil).Once() s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once() } @@ -366,12 +340,9 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestRemoveLiquidityPool() }) s.Assert().NoError(err) - s.mockQ.On( - "RemoveLiquidityPool", - s.ctx, - "cafebabedeadbeef000000000000000000000000000000000000000000000000", - s.sequence, - ).Return(int64(1), nil).Once() - + deleted := s.processor.ledgerEntryToRow(&pre) + deleted.Deleted = true + deleted.LastModifiedLedger = s.processor.sequence + s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{deleted}).Return(nil).Once() s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once() }