From e8701abc90e0ce8774f5efab0b4e7dce2573fb1e Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 22 Apr 2021 22:50:22 +0200 Subject: [PATCH] Perf fix --- .../internal/db2/history/transaction.go | 42 +++++++++---------- .../horizon/internal/txsub/helpers_test.go | 9 +++- .../horizon/internal/txsub/results_test.go | 2 +- services/horizon/internal/txsub/system.go | 11 ++++- .../horizon/internal/txsub/system_test.go | 8 ++-- 5 files changed, 42 insertions(+), 30 deletions(-) diff --git a/services/horizon/internal/db2/history/transaction.go b/services/horizon/internal/db2/history/transaction.go index 94ee381351..8da962c59f 100644 --- a/services/horizon/internal/db2/history/transaction.go +++ b/services/horizon/internal/db2/history/transaction.go @@ -1,8 +1,6 @@ package history import ( - "database/sql" - sq "github.com/Masterminds/squirrel" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/toid" @@ -10,14 +8,13 @@ import ( "github.com/stellar/go/xdr" ) -// TransactionsByHashes fetches transactions from the `history_transactions` table -// which match the given hash. -// Note that -func (q *Q) TransactionsByHashes(dest interface{}, hashes []string) error { +// TransactionByHash is a query that loads a single row from the +// `history_transactions` table based upon the provided hash. +func (q *Q) TransactionByHash(dest interface{}, hash string) error { byHash := selectTransaction. - Where(map[string]interface{}{"ht.transaction_hash": hashes}) + Where("ht.transaction_hash = ?", hash) byInnerHash := selectTransaction. - Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}) + Where("ht.inner_transaction_hash = ?", hash) byInnerHashString, args, err := byInnerHash.ToSql() if err != nil { @@ -25,25 +22,26 @@ func (q *Q) TransactionsByHashes(dest interface{}, hashes []string) error { } union := byHash.Suffix("UNION ALL "+byInnerHashString, args...) - return q.Select(dest, union) + return q.Get(dest, union) } -// TransactionByHash is a query that loads a single row from the -// `history_transactions` table based upon the provided hash. -func (q *Q) TransactionByHash(dest interface{}, hash string) error { - var txs []Transaction - err := q.TransactionsByHashes(&txs, []string{hash}) +// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions` +// table which match the given hash since the given ledger sequence (for perf reasons). +func (q *Q) TransactionsByHashesSinceLedger(dest interface{}, hashes []string, sinceLedgerSeq uint32) error { + byHash := selectTransaction. + Where(map[string]interface{}{"ht.transaction_hash": hashes}). + Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq}) + byInnerHash := selectTransaction. + Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}). + Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq}) + + byInnerHashString, args, err := byInnerHash.ToSql() if err != nil { - return err + return errors.Wrap(err, "could not get string for inner hash sql query") } + union := byHash.Suffix("UNION ALL "+byInnerHashString, args...) - if len(txs) > 0 { - ptr := dest.(*Transaction) - *ptr = txs[0] - return nil - } else { - return sql.ErrNoRows - } + return q.Select(dest, union) } // TransactionsByIDs fetches transactions from the `history_transactions` table diff --git a/services/horizon/internal/txsub/helpers_test.go b/services/horizon/internal/txsub/helpers_test.go index c3df0d22d0..f20104ab6b 100644 --- a/services/horizon/internal/txsub/helpers_test.go +++ b/services/horizon/internal/txsub/helpers_test.go @@ -44,13 +44,18 @@ func (m *mockDBQ) NoRows(err error) bool { return args.Bool(0) } +func (m *mockDBQ) GetLatestHistoryLedger() (uint32, error) { + args := m.Called() + return args.Get(0).(uint32), args.Error(1) +} + func (m *mockDBQ) GetSequenceNumbers(addresses []string) (map[string]uint64, error) { args := m.Called(addresses) return args.Get(0).(map[string]uint64), args.Error(1) } -func (m *mockDBQ) TransactionsByHashes(dest interface{}, hashes []string) error { - args := m.Called(dest, hashes) +func (m *mockDBQ) TransactionsByHashesSinceLedger(dest interface{}, hashes []string, sinceLedgerSeq uint32) error { + args := m.Called(dest, hashes, sinceLedgerSeq) return args.Error(0) } diff --git a/services/horizon/internal/txsub/results_test.go b/services/horizon/internal/txsub/results_test.go index 839e6c3057..95b7a52096 100644 --- a/services/horizon/internal/txsub/results_test.go +++ b/services/horizon/internal/txsub/results_test.go @@ -25,7 +25,7 @@ func TestGetIngestedTxHashes(t *testing.T) { q := &history.Q{Session: tt.HorizonSession()} hashes := []string{"2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d"} var txs []history.Transaction - err := q.TransactionsByHashes(&txs, hashes) + err := q.TransactionsByHashesSinceLedger(&txs, hashes, 0) tt.Assert.NoError(err) tt.Assert.Equal(hashes[0], txs[0].TransactionHash) } diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index ccb4873af6..3cdd3bc817 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -15,8 +15,9 @@ import ( ) type HorizonDB interface { + GetLatestHistoryLedger() (uint32, error) TransactionByHash(dest interface{}, hash string) error - TransactionsByHashes(dest interface{}, hashes []string) error + TransactionsByHashesSinceLedger(dest interface{}, hashes []string, sinceLedgerSeq uint32) error GetSequenceNumbers(addresses []string) (map[string]uint64, error) BeginTx(*sql.TxOptions) error Rollback() error @@ -309,8 +310,14 @@ func (sys *System) Tick(ctx context.Context) { pending := sys.Pending.Pending(ctx) if len(pending) > 0 { + latestLedger, err := db.GetLatestHistoryLedger() + if err != nil { + logger.WithError(err).Error("error getting latest history ledger") + return + } + var txs []history.Transaction - err := db.TransactionsByHashes(&txs, pending) + err = db.TransactionsByHashesSinceLedger(&txs, pending, latestLedger-100) if err != nil && !db.NoRows(err) { logger.WithError(err).Error("error getting transactions by hashes") return diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index d49dbf5ae2..468910d146 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -113,6 +113,8 @@ func (suite *SystemTestSuite) SetupTest() { suite.badSeq = SubmissionResult{ Err: ErrBadSequence, } + + suite.db.On("GetLatestHistoryLedger").Return(uint32(1000), nil).Maybe() } func (suite *SystemTestSuite) TearDownTest() { @@ -327,7 +329,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionsByHashes", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}). + suite.db.On("TransactionsByHashesSinceLedger", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)). Return(sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() @@ -341,7 +343,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionsByHashes", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}). + suite.db.On("TransactionsByHashesSinceLedger", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)). Run(func(args mock.Arguments) { ptr := args.Get(0).(*[]history.Transaction) *ptr = []history.Transaction{suite.successTx.Transaction} @@ -425,7 +427,7 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionsByHashes", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}). + suite.db.On("TransactionsByHashesSinceLedger", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)). Return(sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()