Skip to content

Commit

Permalink
Merge pull request #4400 from stellar/horizon-db-optimizations
Browse files Browse the repository at this point in the history
horizon: Merge horizon-db-optimizations into master
  • Loading branch information
2opremio authored May 24, 2022
2 parents 5fa5149 + e0b4d32 commit 8212530
Show file tree
Hide file tree
Showing 17 changed files with 361 additions and 63 deletions.
7 changes: 7 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

- Querying claimable balances has been optimized ([4385](https://github.com/stellar/go/pull/4385)).
- Querying trade aggregations has been optimized ([4389](https://github.com/stellar/go/pull/4389)).
- Postgres connections for non ingesting Horizon instances are now configured to timeout on long running queries / transactions ([4390](https://github.com/stellar/go/pull/4390)).
- Added `disable-path-finding` Horizon flag to disable the path finding endpoints. This flag should be enabled on ingesting Horizon instances which do not serve HTTP traffic ([4399](https://github.com/stellar/go/pull/4399)).

## V2.17.0

This is the final release after the [release candidate](v2.17.0-release-candidate), including some small additional changes:
Expand Down
4 changes: 3 additions & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (a *App) Serve() error {
}

go a.run()
go a.orderBookStream.Run(a.ctx)
if !a.config.DisablePathFinding {
go a.orderBookStream.Run(a.ctx)
}

// WaitGroup for all go routines. Makes sure that DB is closed when
// all services gracefully shutdown.
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Config struct {
// DisablePoolPathFinding configures horizon to run path finding without including liquidity pools
// in the path finding search.
DisablePoolPathFinding bool
// DisablePathFinding configures horizon without the path finding endpoint.
DisablePathFinding bool
// MaxPathFindingRequests is the maximum number of path finding requests horizon will allow
// in a 1-second period. A value of 0 disables the limit.
MaxPathFindingRequests uint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
sql = sql.
Prefix("WITH cb AS (").
Suffix(
") select "+claimableBalancesSelectStatement+" from cb LIMIT ?",
"LIMIT ?) select "+claimableBalancesSelectStatement+" from cb",
query.PageQuery.Limit,
)

Expand Down
41 changes: 34 additions & 7 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package history

import (
"context"
"database/sql"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"

"github.com/stellar/go/support/errors"
)

const offersBatchSize = 50000

// QOffers defines offer related queries.
type QOffers interface {
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
Expand Down Expand Up @@ -83,28 +86,52 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {

// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
if tx := q.GetTx(); tx == nil {
return errors.New("cannot be called outside of a transaction")
}
if opts := q.GetTxOptions(); opts == nil || !opts.ReadOnly || opts.Isolation != sql.LevelRepeatableRead {
return errors.New("should only be called in a repeatable read transaction")
}

lastID := int64(0)
for {
nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback)
if err != nil {
return err
}
if lastID == nextID {
return nil
}
lastID = nextID
}
}

func (q *Q) streamAllOffersBatch(ctx context.Context, lastId int64, limit uint64, callback func(Offer) error) (int64, error) {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all offers select query")
rows, err = q.Query(ctx, selectOffers.
Where("deleted = ?", false).
Where("offer_id > ? ", lastId).
OrderBy("offer_id asc").Limit(limit))
if err != nil {
return 0, errors.Wrap(err, "could not run all offers select query")
}

defer rows.Close()

for rows.Next() {
offer := Offer{}
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
return 0, errors.Wrap(err, "could not scan row into offer struct")
}

if err = callback(offer); err != nil {
return err
return 0, err
}
lastId = offer.OfferID
}

return rows.Err()

return lastId, rows.Err()
}

// GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence.
Expand Down
45 changes: 38 additions & 7 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package history

import (
"context"
"database/sql"
"github.com/stretchr/testify/assert"
"strconv"
"testing"

Expand Down Expand Up @@ -105,14 +108,42 @@ func TestGetNonExistentOfferByID(t *testing.T) {
tt.Assert.True(q.NoRows(err))
}

func streamAllOffersInTx(q *Q, ctx context.Context, f func(offer Offer) error) error {
err := q.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead})
if err != nil {
return err
}
defer q.Rollback()
return q.StreamAllOffers(ctx, f)
}

func TestStreamAllOffersRequiresTx(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
return nil
})
assert.EqualError(t, err, "cannot be called outside of a transaction")

assert.NoError(t, q.Begin())
defer q.Rollback()
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
return nil
})
assert.EqualError(t, err, "should only be called in a repeatable read transaction")
}

func TestQueryEmptyOffers(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

var offers []Offer
err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err := streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -150,7 +181,7 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -183,7 +214,7 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.Equal(3, afterCompactionCount)

var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
Expand All @@ -201,7 +232,7 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -229,7 +260,7 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand All @@ -256,7 +287,7 @@ func TestRemoveOffer(t *testing.T) {
err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand All @@ -274,7 +305,7 @@ func TestRemoveOffer(t *testing.T) {
expectedUpdates[0].Deleted = true

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down
Loading

0 comments on commit 8212530

Please sign in to comment.