Skip to content

Commit

Permalink
Perf fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed Apr 22, 2021
1 parent c3f9569 commit e8701ab
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 30 deletions.
42 changes: 20 additions & 22 deletions services/horizon/internal/db2/history/transaction.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,47 @@
package history

import (
"database/sql"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// TransactionsByHashes fetches transactions from the `history_transactions` table
// which match the given hash.
// Note that
func (q *Q) TransactionsByHashes(dest interface{}, hashes []string) error {
// TransactionByHash is a query that loads a single row from the
// `history_transactions` table based upon the provided hash.
func (q *Q) TransactionByHash(dest interface{}, hash string) error {
byHash := selectTransaction.
Where(map[string]interface{}{"ht.transaction_hash": hashes})
Where("ht.transaction_hash = ?", hash)
byInnerHash := selectTransaction.
Where(map[string]interface{}{"ht.inner_transaction_hash": hashes})
Where("ht.inner_transaction_hash = ?", hash)

byInnerHashString, args, err := byInnerHash.ToSql()
if err != nil {
return errors.Wrap(err, "could not get string for inner hash sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)

return q.Select(dest, union)
return q.Get(dest, union)
}

// TransactionByHash is a query that loads a single row from the
// `history_transactions` table based upon the provided hash.
func (q *Q) TransactionByHash(dest interface{}, hash string) error {
var txs []Transaction
err := q.TransactionsByHashes(&txs, []string{hash})
// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions`
// table which match the given hash since the given ledger sequence (for perf reasons).
func (q *Q) TransactionsByHashesSinceLedger(dest interface{}, hashes []string, sinceLedgerSeq uint32) error {
byHash := selectTransaction.
Where(map[string]interface{}{"ht.transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})
byInnerHash := selectTransaction.
Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})

byInnerHashString, args, err := byInnerHash.ToSql()
if err != nil {
return err
return errors.Wrap(err, "could not get string for inner hash sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)

if len(txs) > 0 {
ptr := dest.(*Transaction)
*ptr = txs[0]
return nil
} else {
return sql.ErrNoRows
}
return q.Select(dest, union)
}

// TransactionsByIDs fetches transactions from the `history_transactions` table
Expand Down
9 changes: 7 additions & 2 deletions services/horizon/internal/txsub/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ func (m *mockDBQ) NoRows(err error) bool {
return args.Bool(0)
}

func (m *mockDBQ) GetLatestHistoryLedger() (uint32, error) {
args := m.Called()
return args.Get(0).(uint32), args.Error(1)
}

func (m *mockDBQ) GetSequenceNumbers(addresses []string) (map[string]uint64, error) {
args := m.Called(addresses)
return args.Get(0).(map[string]uint64), args.Error(1)
}

func (m *mockDBQ) TransactionsByHashes(dest interface{}, hashes []string) error {
args := m.Called(dest, hashes)
func (m *mockDBQ) TransactionsByHashesSinceLedger(dest interface{}, hashes []string, sinceLedgerSeq uint32) error {
args := m.Called(dest, hashes, sinceLedgerSeq)
return args.Error(0)
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestGetIngestedTxHashes(t *testing.T) {
q := &history.Q{Session: tt.HorizonSession()}
hashes := []string{"2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d"}
var txs []history.Transaction
err := q.TransactionsByHashes(&txs, hashes)
err := q.TransactionsByHashesSinceLedger(&txs, hashes, 0)
tt.Assert.NoError(err)
tt.Assert.Equal(hashes[0], txs[0].TransactionHash)
}
Expand Down
11 changes: 9 additions & 2 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
)

type HorizonDB interface {
GetLatestHistoryLedger() (uint32, error)
TransactionByHash(dest interface{}, hash string) error
TransactionsByHashes(dest interface{}, hashes []string) error
TransactionsByHashesSinceLedger(dest interface{}, hashes []string, sinceLedgerSeq uint32) error
GetSequenceNumbers(addresses []string) (map[string]uint64, error)
BeginTx(*sql.TxOptions) error
Rollback() error
Expand Down Expand Up @@ -309,8 +310,14 @@ func (sys *System) Tick(ctx context.Context) {
pending := sys.Pending.Pending(ctx)

if len(pending) > 0 {
latestLedger, err := db.GetLatestHistoryLedger()
if err != nil {
logger.WithError(err).Error("error getting latest history ledger")
return
}

var txs []history.Transaction
err := db.TransactionsByHashes(&txs, pending)
err = db.TransactionsByHashesSinceLedger(&txs, pending, latestLedger-100)
if err != nil && !db.NoRows(err) {
logger.WithError(err).Error("error getting transactions by hashes")
return
Expand Down
8 changes: 5 additions & 3 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (suite *SystemTestSuite) SetupTest() {
suite.badSeq = SubmissionResult{
Err: ErrBadSequence,
}

suite.db.On("GetLatestHistoryLedger").Return(uint32(1000), nil).Maybe()
}

func (suite *SystemTestSuite) TearDownTest() {
Expand Down Expand Up @@ -327,7 +329,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionsByHashes", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}).
suite.db.On("TransactionsByHashesSinceLedger", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)).
Return(sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()

Expand All @@ -341,7 +343,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionsByHashes", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}).
suite.db.On("TransactionsByHashesSinceLedger", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)).
Run(func(args mock.Arguments) {
ptr := args.Get(0).(*[]history.Transaction)
*ptr = []history.Transaction{suite.successTx.Transaction}
Expand Down Expand Up @@ -425,7 +427,7 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionsByHashes", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}).
suite.db.On("TransactionsByHashesSinceLedger", mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)).
Return(sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()

Expand Down

0 comments on commit e8701ab

Please sign in to comment.