diff --git a/services/horizon/internal/db2/history/transaction.go b/services/horizon/internal/db2/history/transaction.go index d538ad20ab..b1328343ca 100644 --- a/services/horizon/internal/db2/history/transaction.go +++ b/services/horizon/internal/db2/history/transaction.go @@ -28,6 +28,31 @@ func (q *Q) TransactionByHash(ctx context.Context, dest interface{}, hash string return q.Get(ctx, dest, union) } +// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions` +// table which match the given hash since the given ledger sequence (for perf reasons). +func (q *Q) TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]Transaction, error) { + var dest []Transaction + byHash := selectTransaction. + Where(map[string]interface{}{"ht.transaction_hash": hashes}). + Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq}) + byInnerHash := selectTransaction. + Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}). + Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq}) + + byInnerHashString, args, err := byInnerHash.ToSql() + if err != nil { + return nil, errors.Wrap(err, "could not get string for inner hash sql query") + } + union := byHash.Suffix("UNION ALL "+byInnerHashString, args...) + + err = q.Select(ctx, &dest, union) + if err != nil { + return nil, err + } + + return dest, nil +} + // TransactionsByIDs fetches transactions from the `history_transactions` table // which match the given ids func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Transaction, error) { diff --git a/services/horizon/internal/txsub/helpers_test.go b/services/horizon/internal/txsub/helpers_test.go index 5d3190937c..119515ebc2 100644 --- a/services/horizon/internal/txsub/helpers_test.go +++ b/services/horizon/internal/txsub/helpers_test.go @@ -10,6 +10,7 @@ import ( "context" "database/sql" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stretchr/testify/mock" ) @@ -44,11 +45,24 @@ func (m *mockDBQ) NoRows(err error) bool { return args.Bool(0) } +func (m *mockDBQ) GetLatestHistoryLedger(ctx context.Context) (uint32, error) { + args := m.Called() + return args.Get(0).(uint32), args.Error(1) +} + func (m *mockDBQ) GetSequenceNumbers(ctx context.Context, addresses []string) (map[string]uint64, error) { args := m.Called(ctx, addresses) return args.Get(0).(map[string]uint64), args.Error(1) } +func (m *mockDBQ) TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]history.Transaction, error) { + args := m.Called(ctx, hashes, sinceLedgerSeq) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]history.Transaction), args.Error(1) +} + func (m *mockDBQ) TransactionByHash(ctx context.Context, dest interface{}, hash string) error { args := m.Called(ctx, dest, hash) return args.Error(0) diff --git a/services/horizon/internal/txsub/results_test.go b/services/horizon/internal/txsub/results_test.go index a0695c22c2..2a04d7ecdd 100644 --- a/services/horizon/internal/txsub/results_test.go +++ b/services/horizon/internal/txsub/results_test.go @@ -18,6 +18,17 @@ func TestGetIngestedTx(t *testing.T) { tt.Assert.Equal(hash, tx.TransactionHash) } +func TestGetIngestedTxHashes(t *testing.T) { + tt := test.Start(t) + tt.Scenario("base") + defer tt.Finish() + q := &history.Q{SessionInterface: tt.HorizonSession()} + hashes := []string{"2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d"} + txs, err := q.TransactionsByHashesSinceLedger(tt.Ctx, hashes, 0) + tt.Assert.NoError(err) + tt.Assert.Equal(hashes[0], txs[0].TransactionHash) +} + func TestGetMissingTx(t *testing.T) { tt := test.Start(t) tt.Scenario("base") diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 89a456344e..23d32a3549 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -8,13 +8,16 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) type HorizonDB interface { + GetLatestHistoryLedger(ctx context.Context) (uint32, error) TransactionByHash(ctx context.Context, dest interface{}, hash string) error + TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]history.Transaction, error) GetSequenceNumbers(ctx context.Context, addresses []string) (map[string]uint64, error) BeginTx(*sql.TxOptions) error Rollback() error @@ -304,23 +307,58 @@ func (sys *System) Tick(ctx context.Context) { } } - for _, hash := range sys.Pending.Pending(ctx) { - tx, err := txResultByHash(ctx, db, hash) + pending := sys.Pending.Pending(ctx) - if err == nil { - logger.WithField("hash", hash).Debug("finishing open submission") - sys.Pending.Finish(ctx, hash, Result{Transaction: tx}) - continue + if len(pending) > 0 { + latestLedger, err := db.GetLatestHistoryLedger(ctx) + if err != nil { + logger.WithError(err).Error("error getting latest history ledger") + return } - if _, ok := err.(*FailedTransactionError); ok { - logger.WithField("hash", hash).Debug("finishing open submission") - sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err}) - continue + // In Tick we only check txs in a queue so those which did not have results before Tick + // so we check for them in the last 5 mins of ledgers: 60. + var sinceLedgerSeq int32 = int32(latestLedger) - 60 + if sinceLedgerSeq < 0 { + sinceLedgerSeq = 0 } - if err != ErrNoResults { - logger.WithStack(err).Error(err) + txs, err := db.TransactionsByHashesSinceLedger(ctx, pending, uint32(sinceLedgerSeq)) + if err != nil && !db.NoRows(err) { + logger.WithError(err).Error("error getting transactions by hashes") + return + } + + txMap := make(map[string]history.Transaction, len(txs)) + for _, tx := range txs { + txMap[tx.TransactionHash] = tx + if tx.InnerTransactionHash.Valid { + txMap[tx.InnerTransactionHash.String] = tx + } + } + + for _, hash := range pending { + tx, found := txMap[hash] + if !found { + continue + } + _, err := txResultFromHistory(tx) + + if err == nil { + logger.WithField("hash", hash).Debug("finishing open submission") + sys.Pending.Finish(ctx, hash, Result{Transaction: tx}) + continue + } + + if _, ok := err.(*FailedTransactionError); ok { + logger.WithField("hash", hash).Debug("finishing open submission") + sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err}) + continue + } + + if err != nil { + logger.WithStack(err).Error(err) + } } } diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 166558d8c7..b6026b5adb 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -113,6 +113,8 @@ func (suite *SystemTestSuite) SetupTest() { suite.badSeq = SubmissionResult{ Err: ErrBadSequence, } + + suite.db.On("GetLatestHistoryLedger").Return(uint32(1000), nil).Maybe() } func (suite *SystemTestSuite) TearDownTest() { @@ -327,8 +329,8 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). - Return(sql.ErrNoRows).Once() + suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{suite.successTx.Transaction.TransactionHash}, uint32(940)). + Return(nil, sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() suite.system.Tick(suite.ctx) @@ -341,12 +343,8 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). - Run(func(args mock.Arguments) { - ptr := args.Get(1).(*history.Transaction) - *ptr = suite.successTx.Transaction - }). - Return(nil).Once() + suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{suite.successTx.Transaction.TransactionHash}, uint32(940)). + Return([]history.Transaction{suite.successTx.Transaction}, nil).Once() suite.system.Tick(suite.ctx) @@ -395,12 +393,8 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionByHash", suite.ctx, mock.Anything, innerHash). - Run(func(args mock.Arguments) { - ptr := args.Get(1).(*history.Transaction) - *ptr = feeBumpTx.Transaction - }). - Return(nil).Once() + suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{innerHash}, uint32(940)). + Return([]history.Transaction{feeBumpTx.Transaction}, nil).Once() suite.system.Tick(suite.ctx) @@ -423,8 +417,8 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() { ReadOnly: true, }).Return(nil).Once() suite.db.On("Rollback").Return(nil).Once() - suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). - Return(sql.ErrNoRows).Once() + suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{suite.successTx.Transaction.TransactionHash}, uint32(940)). + Return(nil, sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() suite.system.Tick(suite.ctx)