Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/txsub: Batch status check queries for pending transactions in txsub #3563

Merged
merged 13 commits into from
Jun 28, 2021
25 changes: 25 additions & 0 deletions services/horizon/internal/db2/history/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,31 @@ func (q *Q) TransactionByHash(ctx context.Context, dest interface{}, hash string
return q.Get(ctx, dest, union)
}

// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions`
// table which match the given hash since the given ledger sequence (for perf reasons).
func (q *Q) TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]Transaction, error) {
var dest []Transaction
byHash := selectTransaction.
Where(map[string]interface{}{"ht.transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})
byInnerHash := selectTransaction.
Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})

byInnerHashString, args, err := byInnerHash.ToSql()
if err != nil {
return nil, errors.Wrap(err, "could not get string for inner hash sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)

err = q.Select(ctx, &dest, union)
if err != nil {
return nil, err
}

return dest, nil
}

// TransactionsByIDs fetches transactions from the `history_transactions` table
// which match the given ids
func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Transaction, error) {
Expand Down
14 changes: 14 additions & 0 deletions services/horizon/internal/txsub/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"database/sql"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -44,11 +45,24 @@ func (m *mockDBQ) NoRows(err error) bool {
return args.Bool(0)
}

func (m *mockDBQ) GetLatestHistoryLedger(ctx context.Context) (uint32, error) {
args := m.Called()
return args.Get(0).(uint32), args.Error(1)
}

func (m *mockDBQ) GetSequenceNumbers(ctx context.Context, addresses []string) (map[string]uint64, error) {
args := m.Called(ctx, addresses)
return args.Get(0).(map[string]uint64), args.Error(1)
}

func (m *mockDBQ) TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]history.Transaction, error) {
args := m.Called(ctx, hashes, sinceLedgerSeq)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]history.Transaction), args.Error(1)
}

func (m *mockDBQ) TransactionByHash(ctx context.Context, dest interface{}, hash string) error {
args := m.Called(ctx, dest, hash)
return args.Error(0)
Expand Down
11 changes: 11 additions & 0 deletions services/horizon/internal/txsub/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ func TestGetIngestedTx(t *testing.T) {
tt.Assert.Equal(hash, tx.TransactionHash)
}

func TestGetIngestedTxHashes(t *testing.T) {
tt := test.Start(t)
tt.Scenario("base")
defer tt.Finish()
q := &history.Q{SessionInterface: tt.HorizonSession()}
hashes := []string{"2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d"}
txs, err := q.TransactionsByHashesSinceLedger(tt.Ctx, hashes, 0)
tt.Assert.NoError(err)
tt.Assert.Equal(hashes[0], txs[0].TransactionHash)
}

func TestGetMissingTx(t *testing.T) {
tt := test.Start(t)
tt.Scenario("base")
Expand Down
62 changes: 50 additions & 12 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/txsub/sequence"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

type HorizonDB interface {
GetLatestHistoryLedger(ctx context.Context) (uint32, error)
TransactionByHash(ctx context.Context, dest interface{}, hash string) error
TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]history.Transaction, error)
GetSequenceNumbers(ctx context.Context, addresses []string) (map[string]uint64, error)
BeginTx(*sql.TxOptions) error
Rollback() error
Expand Down Expand Up @@ -304,23 +307,58 @@ func (sys *System) Tick(ctx context.Context) {
}
}

for _, hash := range sys.Pending.Pending(ctx) {
tx, err := txResultByHash(ctx, db, hash)
pending := sys.Pending.Pending(ctx)

if err == nil {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx})
continue
if len(pending) > 0 {
latestLedger, err := db.GetLatestHistoryLedger(ctx)
if err != nil {
logger.WithError(err).Error("error getting latest history ledger")
return
}

if _, ok := err.(*FailedTransactionError); ok {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err})
continue
// In Tick we only check txs in a queue so those which did not have results before Tick
// so we check for them in the last 5 mins of ledgers: 60.
var sinceLedgerSeq int32 = int32(latestLedger) - 60
if sinceLedgerSeq < 0 {
sinceLedgerSeq = 0
}

if err != ErrNoResults {
logger.WithStack(err).Error(err)
txs, err := db.TransactionsByHashesSinceLedger(ctx, pending, uint32(sinceLedgerSeq))
if err != nil && !db.NoRows(err) {
logger.WithError(err).Error("error getting transactions by hashes")
return
}

txMap := make(map[string]history.Transaction, len(txs))
for _, tx := range txs {
txMap[tx.TransactionHash] = tx
if tx.InnerTransactionHash.Valid {
txMap[tx.InnerTransactionHash.String] = tx
}
}

for _, hash := range pending {
tx, found := txMap[hash]
if !found {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we can happily continue here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't it be that the transaction is in the DB but isn't found by TransactionsByHashesSinceLedger() due to the sinceLedgerSeq passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If txMap doesn't contain the hash it means that tx with hash has not been included in the ledger yet. So we continue to the next pending transaction.

}
_, err := txResultFromHistory(tx)

if err == nil {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx})
continue
}

if _, ok := err.(*FailedTransactionError); ok {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err})
continue
}

if err != nil {
logger.WithStack(err).Error(err)
}
}
}

Expand Down
26 changes: 10 additions & 16 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (suite *SystemTestSuite) SetupTest() {
suite.badSeq = SubmissionResult{
Err: ErrBadSequence,
}

suite.db.On("GetLatestHistoryLedger").Return(uint32(1000), nil).Maybe()
}

func (suite *SystemTestSuite) TearDownTest() {
Expand Down Expand Up @@ -327,8 +329,8 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Once()
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{suite.successTx.Transaction.TransactionHash}, uint32(940)).
Return(nil, sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()

suite.system.Tick(suite.ctx)
Expand All @@ -341,12 +343,8 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Run(func(args mock.Arguments) {
ptr := args.Get(1).(*history.Transaction)
*ptr = suite.successTx.Transaction
}).
Return(nil).Once()
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{suite.successTx.Transaction.TransactionHash}, uint32(940)).
Return([]history.Transaction{suite.successTx.Transaction}, nil).Once()

suite.system.Tick(suite.ctx)

Expand Down Expand Up @@ -395,12 +393,8 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, innerHash).
Run(func(args mock.Arguments) {
ptr := args.Get(1).(*history.Transaction)
*ptr = feeBumpTx.Transaction
}).
Return(nil).Once()
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{innerHash}, uint32(940)).
Return([]history.Transaction{feeBumpTx.Transaction}, nil).Once()

suite.system.Tick(suite.ctx)

Expand All @@ -423,8 +417,8 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Once()
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, []string{suite.successTx.Transaction.TransactionHash}, uint32(940)).
Return(nil, sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()

suite.system.Tick(suite.ctx)
Expand Down