Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/txsub: Batch status check queries for pending transactions in txsub #3563

Merged
merged 13 commits into from
Jun 28, 2021
19 changes: 19 additions & 0 deletions services/horizon/internal/db2/history/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ func (q *Q) TransactionByHash(ctx context.Context, dest interface{}, hash string
return q.Get(ctx, dest, union)
}

// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions`
// table which match the given hash since the given ledger sequence (for perf reasons).
func (q *Q) TransactionsByHashesSinceLedger(ctx context.Context, dest interface{}, hashes []string, sinceLedgerSeq uint32) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it necessary to add the ht.ledger_sequence >= sinceLedgerSeq condition? I thought the transaction hash is globally unique?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the query without the ledger_sequence limit is extremely slow. I realized it after to.mer tested it and added a fix in e8701ab. Technically the 100 ledgers is too much because TransactionsByHashesSinceLedger is used only when checking the status of transactions in Tick so this could be 12 (one minute of ledgers).

bartekn marked this conversation as resolved.
Show resolved Hide resolved
bartekn marked this conversation as resolved.
Show resolved Hide resolved
byHash := selectTransaction.
Where(map[string]interface{}{"ht.transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})
byInnerHash := selectTransaction.
Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})

byInnerHashString, args, err := byInnerHash.ToSql()
if err != nil {
return errors.Wrap(err, "could not get string for inner hash sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)

return q.Select(ctx, dest, union)
}

// TransactionsByIDs fetches transactions from the `history_transactions` table
// which match the given ids
func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Transaction, error) {
Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/txsub/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,21 @@ func (m *mockDBQ) NoRows(err error) bool {
return args.Bool(0)
}

func (m *mockDBQ) GetLatestHistoryLedger(ctx context.Context) (uint32, error) {
args := m.Called()
return args.Get(0).(uint32), args.Error(1)
}

func (m *mockDBQ) GetSequenceNumbers(ctx context.Context, addresses []string) (map[string]uint64, error) {
args := m.Called(ctx, addresses)
return args.Get(0).(map[string]uint64), args.Error(1)
}

func (m *mockDBQ) TransactionsByHashesSinceLedger(ctx context.Context, dest interface{}, hashes []string, sinceLedgerSeq uint32) error {
args := m.Called(ctx, dest, hashes, sinceLedgerSeq)
return args.Error(0)
}

func (m *mockDBQ) TransactionByHash(ctx context.Context, dest interface{}, hash string) error {
args := m.Called(ctx, dest, hash)
return args.Error(0)
Expand Down
12 changes: 12 additions & 0 deletions services/horizon/internal/txsub/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ func TestGetIngestedTx(t *testing.T) {
tt.Assert.Equal(hash, tx.TransactionHash)
}

func TestGetIngestedTxHashes(t *testing.T) {
tt := test.Start(t)
tt.Scenario("base")
defer tt.Finish()
q := &history.Q{SessionInterface: tt.HorizonSession()}
hashes := []string{"2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d"}
var txs []history.Transaction
err := q.TransactionsByHashesSinceLedger(tt.Ctx, &txs, hashes, 0)
tt.Assert.NoError(err)
tt.Assert.Equal(hashes[0], txs[0].TransactionHash)
}

func TestGetMissingTx(t *testing.T) {
tt := test.Start(t)
tt.Scenario("base")
Expand Down
62 changes: 50 additions & 12 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/txsub/sequence"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

type HorizonDB interface {
GetLatestHistoryLedger(ctx context.Context) (uint32, error)
TransactionByHash(ctx context.Context, dest interface{}, hash string) error
TransactionsByHashesSinceLedger(ctx context.Context, dest interface{}, hashes []string, sinceLedgerSeq uint32) error
GetSequenceNumbers(ctx context.Context, addresses []string) (map[string]uint64, error)
BeginTx(*sql.TxOptions) error
Rollback() error
Expand Down Expand Up @@ -304,23 +307,58 @@ func (sys *System) Tick(ctx context.Context) {
}
}

for _, hash := range sys.Pending.Pending(ctx) {
tx, err := txResultByHash(ctx, db, hash)
pending := sys.Pending.Pending(ctx)

if err == nil {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx})
continue
if len(pending) > 0 {
latestLedger, err := db.GetLatestHistoryLedger(ctx)
if err != nil {
logger.WithError(err).Error("error getting latest history ledger")
return
}

if _, ok := err.(*FailedTransactionError); ok {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err})
continue
var sinceLedgerSeq int32 = int32(latestLedger)
bartekn marked this conversation as resolved.
Show resolved Hide resolved
sinceLedgerSeq -= 100
bartekn marked this conversation as resolved.
Show resolved Hide resolved
if sinceLedgerSeq < 0 {
sinceLedgerSeq = 0
}

if err != ErrNoResults {
logger.WithStack(err).Error(err)
var txs []history.Transaction
err = db.TransactionsByHashesSinceLedger(ctx, &txs, pending, uint32(sinceLedgerSeq))
if err != nil && !db.NoRows(err) {
logger.WithError(err).Error("error getting transactions by hashes")
return
}

txMap := make(map[string]history.Transaction, len(txs))
for _, tx := range txs {
txMap[tx.TransactionHash] = tx
if tx.InnerTransactionHash.Valid {
txMap[tx.InnerTransactionHash.String] = tx
}
}

for _, hash := range pending {
tx, found := txMap[hash]
if !found {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we can happily continue here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't it be that the transaction is in the DB but isn't found by TransactionsByHashesSinceLedger() due to the sinceLedgerSeq passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If txMap doesn't contain the hash it means that tx with hash has not been included in the ledger yet. So we continue to the next pending transaction.

}
_, err := txResultFromHistory(tx)

if err == nil {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx})
continue
}

if _, ok := err.(*FailedTransactionError); ok {
logger.WithField("hash", hash).Debug("finishing open submission")
sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err})
continue
}

if err != nil {
logger.WithStack(err).Error(err)
}
}
}

Expand Down
18 changes: 10 additions & 8 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (suite *SystemTestSuite) SetupTest() {
suite.badSeq = SubmissionResult{
Err: ErrBadSequence,
}

suite.db.On("GetLatestHistoryLedger").Return(uint32(1000), nil).Maybe()
}

func (suite *SystemTestSuite) TearDownTest() {
Expand Down Expand Up @@ -327,7 +329,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)).
Return(sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()

Expand All @@ -341,10 +343,10 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)).
Run(func(args mock.Arguments) {
ptr := args.Get(1).(*history.Transaction)
*ptr = suite.successTx.Transaction
ptr := args.Get(1).(*[]history.Transaction)
*ptr = []history.Transaction{suite.successTx.Transaction}
}).
Return(nil).Once()

Expand Down Expand Up @@ -395,10 +397,10 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, innerHash).
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, mock.Anything, []string{innerHash}, uint32(900)).
Run(func(args mock.Arguments) {
ptr := args.Get(1).(*history.Transaction)
*ptr = feeBumpTx.Transaction
ptr := args.Get(1).(*[]history.Transaction)
*ptr = []history.Transaction{feeBumpTx.Transaction}
}).
Return(nil).Once()

Expand All @@ -423,7 +425,7 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() {
ReadOnly: true,
}).Return(nil).Once()
suite.db.On("Rollback").Return(nil).Once()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
suite.db.On("TransactionsByHashesSinceLedger", suite.ctx, mock.Anything, []string{suite.successTx.Transaction.TransactionHash}, uint32(900)).
Return(sql.ErrNoRows).Once()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once()

Expand Down