From a32f84aeae50846f9ebece9842206ffc88e9e76b Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 23 May 2022 15:58:31 -0400 Subject: [PATCH 1/3] Implement StreamAllOffers with batches The query to get all rows in the offers table currently takes more than 10 seconds to execute. Given that the horizon request timeout is 10 seconds, we need to reimplement StreamAllOffers to use batching. --- .../horizon/internal/db2/history/offers.go | 34 +++++++++++++++---- services/horizon/internal/ingest/orderbook.go | 1 - 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index 83bf3f8e17..1819e3e22c 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -2,6 +2,7 @@ package history import ( "context" + "math" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" @@ -9,6 +10,8 @@ import ( "github.com/stellar/go/support/errors" ) +const offersBatchSize = 50000 + // QOffers defines offer related queries. type QOffers interface { StreamAllOffers(ctx context.Context, callback func(Offer) error) error @@ -83,28 +86,45 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) { // StreamAllOffers loads all non deleted offers func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error { + lastID := int64(math.MinInt64) + for { + nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback) + if err != nil { + return err + } + if lastID == nextID { + return nil + } + lastID = nextID + } +} + +func (q *Q) streamAllOffersBatch(ctx context.Context, lastId int64, limit uint64, callback func(Offer) error) (int64, error) { var rows *sqlx.Rows var err error - if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil { - return errors.Wrap(err, "could not run all offers select query") + rows, err = q.Query(ctx, selectOffers. + Where("deleted = ?", false). + Where("offer_id > ? ", lastId). + OrderBy("offer_id asc").Limit(limit)) + if err != nil { + return 0, errors.Wrap(err, "could not run all offers select query") } defer rows.Close() - for rows.Next() { offer := Offer{} if err = rows.StructScan(&offer); err != nil { - return errors.Wrap(err, "could not scan row into offer struct") + return 0, errors.Wrap(err, "could not scan row into offer struct") } if err = callback(offer); err != nil { - return err + return 0, err } + lastId = offer.OfferID } - return rows.Err() - + return lastId, rows.Err() } // GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence. diff --git a/services/horizon/internal/ingest/orderbook.go b/services/horizon/internal/ingest/orderbook.go index 3fd421cde3..e62cc7a8b6 100644 --- a/services/horizon/internal/ingest/orderbook.go +++ b/services/horizon/internal/ingest/orderbook.go @@ -136,7 +136,6 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b o.graph.AddOffers(offerToXDR(offer)) return nil }) - if err != nil { return true, errors.Wrap(err, "Error loading offers into orderbook") } From 244bda94ae52cdbd94c102299781d3f054ea2fa3 Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 23 May 2022 17:35:21 -0400 Subject: [PATCH 2/3] Update services/horizon/internal/db2/history/offers.go Co-authored-by: Bartek Nowotarski --- services/horizon/internal/db2/history/offers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index 1819e3e22c..b00c36dbb0 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -86,7 +86,7 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) { // StreamAllOffers loads all non deleted offers func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error { - lastID := int64(math.MinInt64) + lastID := int64(0) for { nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback) if err != nil { From cc96ca3296876085bb9baa082bb98f8311acea1b Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 23 May 2022 18:29:15 -0400 Subject: [PATCH 3/3] StreamAllOffers must be called within a repeatable read tx --- .../horizon/internal/db2/history/offers.go | 9 +++- .../internal/db2/history/offers_test.go | 45 ++++++++++++++++--- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index b00c36dbb0..c80d67c854 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -2,7 +2,7 @@ package history import ( "context" - "math" + "database/sql" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" @@ -86,6 +86,13 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) { // StreamAllOffers loads all non deleted offers func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error { + if tx := q.GetTx(); tx == nil { + return errors.New("cannot be called outside of a transaction") + } + if opts := q.GetTxOptions(); opts == nil || !opts.ReadOnly || opts.Isolation != sql.LevelRepeatableRead { + return errors.New("should only be called in a repeatable read transaction") + } + lastID := int64(0) for { nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback) diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 0aac2522f7..86d93ff958 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -1,6 +1,9 @@ package history import ( + "context" + "database/sql" + "github.com/stretchr/testify/assert" "strconv" "testing" @@ -105,6 +108,34 @@ func TestGetNonExistentOfferByID(t *testing.T) { tt.Assert.True(q.NoRows(err)) } +func streamAllOffersInTx(q *Q, ctx context.Context, f func(offer Offer) error) error { + err := q.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}) + if err != nil { + return err + } + defer q.Rollback() + return q.StreamAllOffers(ctx, f) +} + +func TestStreamAllOffersRequiresTx(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + return nil + }) + assert.EqualError(t, err, "cannot be called outside of a transaction") + + assert.NoError(t, q.Begin()) + defer q.Rollback() + err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + return nil + }) + assert.EqualError(t, err, "should only be called in a repeatable read transaction") +} + func TestQueryEmptyOffers(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -112,7 +143,7 @@ func TestQueryEmptyOffers(t *testing.T) { q := &Q{tt.HorizonSession()} var offers []Offer - err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err := streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -150,7 +181,7 @@ func TestInsertOffers(t *testing.T) { tt.Assert.NoError(err) var offers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -183,7 +214,7 @@ func TestInsertOffers(t *testing.T) { tt.Assert.Equal(3, afterCompactionCount) var afterCompactionOffers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { afterCompactionOffers = append(afterCompactionOffers, offer) return nil }) @@ -201,7 +232,7 @@ func TestUpdateOffer(t *testing.T) { tt.Assert.NoError(err) var offers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -229,7 +260,7 @@ func TestUpdateOffer(t *testing.T) { tt.Assert.NoError(err) offers = nil - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -256,7 +287,7 @@ func TestRemoveOffer(t *testing.T) { err := insertOffer(tt, q, eurOffer) tt.Assert.NoError(err) var offers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -274,7 +305,7 @@ func TestRemoveOffer(t *testing.T) { expectedUpdates[0].Deleted = true offers = nil - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil })