Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

horizon: Merge horizon-db-optimizations into master #4400

Merged
merged 7 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

- Querying claimable balances has been optimized ([4385](https://github.com/stellar/go/pull/4385)).
- Querying trade aggregations has been optimized ([4389](https://github.com/stellar/go/pull/4389)).
- Postgres connections for non ingesting Horizon instances are now configured to timeout on long running queries / transactions ([4390](https://github.com/stellar/go/pull/4390)).
- Added `disable-path-finding` Horizon flag to disable the path finding endpoints. This flag should be enabled on ingesting Horizon instances which do not serve HTTP traffic ([4399](https://github.com/stellar/go/pull/4399)).

## V2.17.0

This is the final release after the [release candidate](v2.17.0-release-candidate), including some small additional changes:
Expand Down
4 changes: 3 additions & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (a *App) Serve() error {
}

go a.run()
go a.orderBookStream.Run(a.ctx)
if !a.config.DisablePathFinding {
go a.orderBookStream.Run(a.ctx)
}

// WaitGroup for all go routines. Makes sure that DB is closed when
// all services gracefully shutdown.
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Config struct {
// DisablePoolPathFinding configures horizon to run path finding without including liquidity pools
// in the path finding search.
DisablePoolPathFinding bool
// DisablePathFinding configures horizon without the path finding endpoint.
DisablePathFinding bool
// MaxPathFindingRequests is the maximum number of path finding requests horizon will allow
// in a 1-second period. A value of 0 disables the limit.
MaxPathFindingRequests uint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
sql = sql.
Prefix("WITH cb AS (").
Suffix(
") select "+claimableBalancesSelectStatement+" from cb LIMIT ?",
"LIMIT ?) select "+claimableBalancesSelectStatement+" from cb",
query.PageQuery.Limit,
)

Expand Down
41 changes: 34 additions & 7 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package history

import (
"context"
"database/sql"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"

"github.com/stellar/go/support/errors"
)

const offersBatchSize = 50000

// QOffers defines offer related queries.
type QOffers interface {
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
Expand Down Expand Up @@ -83,28 +86,52 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {

// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
if tx := q.GetTx(); tx == nil {
return errors.New("cannot be called outside of a transaction")
}
if opts := q.GetTxOptions(); opts == nil || !opts.ReadOnly || opts.Isolation != sql.LevelRepeatableRead {
return errors.New("should only be called in a repeatable read transaction")
}

lastID := int64(0)
for {
nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback)
if err != nil {
return err
}
if lastID == nextID {
return nil
}
lastID = nextID
}
}

func (q *Q) streamAllOffersBatch(ctx context.Context, lastId int64, limit uint64, callback func(Offer) error) (int64, error) {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all offers select query")
rows, err = q.Query(ctx, selectOffers.
Where("deleted = ?", false).
Where("offer_id > ? ", lastId).
OrderBy("offer_id asc").Limit(limit))
if err != nil {
return 0, errors.Wrap(err, "could not run all offers select query")
}

defer rows.Close()

for rows.Next() {
offer := Offer{}
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
return 0, errors.Wrap(err, "could not scan row into offer struct")
}

if err = callback(offer); err != nil {
return err
return 0, err
}
lastId = offer.OfferID
}

return rows.Err()

return lastId, rows.Err()
}

// GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence.
Expand Down
45 changes: 38 additions & 7 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package history

import (
"context"
"database/sql"
"github.com/stretchr/testify/assert"
"strconv"
"testing"

Expand Down Expand Up @@ -105,14 +108,42 @@ func TestGetNonExistentOfferByID(t *testing.T) {
tt.Assert.True(q.NoRows(err))
}

func streamAllOffersInTx(q *Q, ctx context.Context, f func(offer Offer) error) error {
err := q.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead})
if err != nil {
return err
}
defer q.Rollback()
return q.StreamAllOffers(ctx, f)
}

func TestStreamAllOffersRequiresTx(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
return nil
})
assert.EqualError(t, err, "cannot be called outside of a transaction")

assert.NoError(t, q.Begin())
defer q.Rollback()
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
return nil
})
assert.EqualError(t, err, "should only be called in a repeatable read transaction")
}

func TestQueryEmptyOffers(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

var offers []Offer
err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err := streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -150,7 +181,7 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -183,7 +214,7 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.Equal(3, afterCompactionCount)

var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
Expand All @@ -201,7 +232,7 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -229,7 +260,7 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand All @@ -256,7 +287,7 @@ func TestRemoveOffer(t *testing.T) {
err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand All @@ -274,7 +305,7 @@ func TestRemoveOffer(t *testing.T) {
expectedUpdates[0].Deleted = true

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down
Loading