From 54f405d8ca597770188444ed6274821566858738 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 8 Apr 2021 20:45:39 +0200 Subject: [PATCH 1/6] Add retries in historyRangeState when ledger is not available --- services/horizon/internal/ingest/fsm.go | 23 ++++++++- .../ingest/ingest_history_range_state_test.go | 49 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 329e7bbca3..16cc98cdf9 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -534,6 +534,11 @@ func (h historyRangeState) String() string { ) } +var ( + maxledgerNotFoundRetries = 4 + ledgerNotFoundRetryBackOff = 3 * time.Second +) + // historyRangeState is used when catching up history data func (h historyRangeState) run(s *system) (transition, error) { if h.fromLedger == 0 || h.toLedger == 0 || @@ -570,8 +575,22 @@ func (h historyRangeState) run(s *system) (transition, error) { } for cur := h.fromLedger; cur <= h.toLedger; cur++ { - if err = runTransactionProcessorsOnLedger(s, cur); err != nil { - return start(), err + for attempt := 0; attempt < maxledgerNotFoundRetries; attempt++ { + if err = runTransactionProcessorsOnLedger(s, cur); err == nil { + break + } else if errors.Cause(err) == ingest.ErrNotFound { + if attempt == maxledgerNotFoundRetries-1 { + return start(), errors.Wrapf( + err, + "Could not obtain ledger %v after %v attempts", + cur, + maxledgerNotFoundRetries, + ) + } + time.Sleep(ledgerNotFoundRetryBackOff) + } else { + return start(), err + } } } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index b3e031eeb2..b64e58303c 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -9,6 +9,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/services/horizon/internal/toid" @@ -42,6 +43,9 @@ func (s *IngestHistoryRangeStateTestSuite) SetupTest() { } s.system.initMetrics() + // configure back off to 0 so we can speed up tests + ledgerNotFoundRetryBackOff = 0 + s.historyQ.On("Rollback").Return(nil).Once() } @@ -140,6 +144,51 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerR s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } +func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsRetrySucceeds() { + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerIngest").Return(uint32(0), nil).Once() + s.historyQ.On("GetLatestLedger").Return(uint32(99), nil).Once() + + s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once() + s.system.maxReingestRetries = 2 + s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return( + processors.StatsLedgerTransactionProcessorResults{}, + processorsRunDurations{}, + errors.Wrap(errors.Wrap(ingest.ErrNotFound, "layer1"), "layer2"), + ).Once() + + s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return( + processors.StatsLedgerTransactionProcessorResults{}, + processorsRunDurations{}, + nil, + ).Once() + + s.historyQ.On("Commit").Return(nil).Once() + + next, err := historyRangeState{fromLedger: 100, toLedger: 100}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + +func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsRetryFails() { + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerIngest").Return(uint32(0), nil).Once() + s.historyQ.On("GetLatestLedger").Return(uint32(99), nil).Once() + + s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once() + s.system.maxReingestRetries = 2 + s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return( + processors.StatsLedgerTransactionProcessorResults{}, + processorsRunDurations{}, + errors.Wrap(errors.Wrap(ingest.ErrNotFound, "layer1"), "layer2"), + ).Times(4) + + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) + s.Assert().Error(err) + s.Assert().EqualError(err, "Could not obtain ledger 100 after 4 attempts: error processing ledger sequence=100: layer2: layer1: ledger not found") + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + func (s *IngestHistoryRangeStateTestSuite) TestRangeNotPreparedFailPrepare() { s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest").Return(uint32(0), nil).Once() From 6de05fbc3a24a1846e8b16e6d7a17fc9137112c4 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 8 Apr 2021 21:11:55 +0200 Subject: [PATCH 2/6] Update changelog --- services/horizon/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index a568d19ff7..a2de05b03f 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## v2.1.1 + +* When ingesting a backlog of ledgers, Horizon sometimes consumes ledgers faster than the rate which Captive Core emits them. Previously this scenario caused failures in the ingestion system. That is now fixed in ([3531](https://github.com/stellar/go/pull/3531)). + ## v2.1.0 ### DB State Migration From b35694b1c24443bdbd2fcd516d5b811af484c76f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 8 Apr 2021 21:17:03 +0200 Subject: [PATCH 3/6] Code review feedback --- services/horizon/internal/ingest/fsm.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 16cc98cdf9..1561bc0edd 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -575,11 +575,11 @@ func (h historyRangeState) run(s *system) (transition, error) { } for cur := h.fromLedger; cur <= h.toLedger; cur++ { - for attempt := 0; attempt < maxledgerNotFoundRetries; attempt++ { + for attempt := 1; attempt <= maxledgerNotFoundRetries; attempt++ { if err = runTransactionProcessorsOnLedger(s, cur); err == nil { break } else if errors.Cause(err) == ingest.ErrNotFound { - if attempt == maxledgerNotFoundRetries-1 { + if attempt == maxledgerNotFoundRetries { return start(), errors.Wrapf( err, "Could not obtain ledger %v after %v attempts", From f129e0b820785515ae87b3ec7aa6a7485425688f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 8 Apr 2021 21:17:42 +0200 Subject: [PATCH 4/6] Update services/horizon/CHANGELOG.md Co-authored-by: Eric Saunders --- services/horizon/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index a2de05b03f..d43b58a98d 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,7 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## v2.1.1 -* When ingesting a backlog of ledgers, Horizon sometimes consumes ledgers faster than the rate which Captive Core emits them. Previously this scenario caused failures in the ingestion system. That is now fixed in ([3531](https://github.com/stellar/go/pull/3531)). +* When ingesting a backlog of ledgers, Horizon sometimes consumes ledgers faster than the rate at which Captive Core emits them. Previously this scenario caused failures in the ingestion system. That is now fixed in ([3531](https://github.com/stellar/go/pull/3531)). ## v2.1.0 From 073aa5ab521af7933f461da7683756a10b142686 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 8 Apr 2021 22:50:59 +0200 Subject: [PATCH 5/6] increase retry limit --- services/horizon/internal/ingest/fsm.go | 2 +- .../horizon/internal/ingest/ingest_history_range_state_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 1561bc0edd..6120350bce 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -535,7 +535,7 @@ func (h historyRangeState) String() string { } var ( - maxledgerNotFoundRetries = 4 + maxledgerNotFoundRetries = 5 ledgerNotFoundRetryBackOff = 3 * time.Second ) diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index b64e58303c..3d93f72c81 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -181,7 +181,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsRetryFail processors.StatsLedgerTransactionProcessorResults{}, processorsRunDurations{}, errors.Wrap(errors.Wrap(ingest.ErrNotFound, "layer1"), "layer2"), - ).Times(4) + ).Times(maxledgerNotFoundRetries) next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) From 54e41cee7b41337de1b4f4e21d633c20ebd4b551 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 8 Apr 2021 22:56:10 +0200 Subject: [PATCH 6/6] Fix test --- .../horizon/internal/ingest/ingest_history_range_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index 3d93f72c81..e2d12f3539 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -185,7 +185,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsRetryFail next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) - s.Assert().EqualError(err, "Could not obtain ledger 100 after 4 attempts: error processing ledger sequence=100: layer2: layer1: ledger not found") + s.Assert().EqualError(err, "Could not obtain ledger 100 after 5 attempts: error processing ledger sequence=100: layer2: layer1: ledger not found") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) }