Skip to content

Commit

Permalink
services/horizon: Suppress Core timeout error (#4860)
Browse files Browse the repository at this point in the history
Suppress Core timeout error when ingestion state machine is in build state.
  • Loading branch information
urvisavla authored May 19, 2023
1 parent c05dcf6 commit 148bf79
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 0 deletions.
8 changes: 8 additions & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ func (a *App) Paths() paths.Finder {
func (a *App) UpdateCoreLedgerState(ctx context.Context) {
var next ledger.CoreStatus

// #4446 If the ingestion state machine is in the build state, the query can time out
// because the captive-core buffer may be full. In this case, skip the check.
if a.config.CaptiveCoreToml != nil &&
a.config.StellarCoreURL == fmt.Sprintf("http://localhost:%d", a.config.CaptiveCoreToml.HTTPPort) &&
a.ingester != nil && a.ingester.GetCurrentState() == ingest.Build {
return
}

logErr := func(err error, msg string) {
log.WithStack(err).WithField("err", err.Error()).Error(msg)
}
Expand Down
52 changes: 52 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,25 @@ func (e ErrReingestRangeConflict) Error() string {
return fmt.Sprintf("reingest range overlaps with horizon ingestion, supplied range shouldn't contain ledger %d", e.maximumLedgerSequence)
}

type State int

const (
None State = iota
Start
Stop
Build
Resume
WaitForCheckpoint
StressTest
VerifyRange
HistoryRange
ReingestHistoryRange
)

type stateMachineNode interface {
run(*system) (transition, error)
String() string
GetState() State
}

type transition struct {
Expand Down Expand Up @@ -105,6 +121,10 @@ func (stopState) String() string {
return "stop"
}

func (stopState) GetState() State {
return Stop
}

func (stopState) run(s *system) (transition, error) {
return stop(), errors.New("Cannot run terminal state")
}
Expand All @@ -117,6 +137,10 @@ func (startState) String() string {
return "start"
}

func (startState) GetState() State {
return Start
}

func (state startState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
Expand Down Expand Up @@ -234,6 +258,10 @@ func (b buildState) String() string {
return fmt.Sprintf("buildFromCheckpoint(checkpointLedger=%d, skipChecks=%t)", b.checkpointLedger, b.skipChecks)
}

func (buildState) GetState() State {
return Build
}

func (b buildState) run(s *system) (transition, error) {
var nextFailState = start()
if b.stop {
Expand Down Expand Up @@ -377,6 +405,10 @@ func (r resumeState) String() string {
return fmt.Sprintf("resume(latestSuccessfullyProcessedLedger=%d)", r.latestSuccessfullyProcessedLedger)
}

func (resumeState) GetState() State {
return Resume
}

func (r resumeState) run(s *system) (transition, error) {
if r.latestSuccessfullyProcessedLedger == 0 {
return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value")
Expand Down Expand Up @@ -566,6 +598,10 @@ func (h historyRangeState) String() string {
)
}

func (historyRangeState) GetState() State {
return HistoryRange
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
Expand Down Expand Up @@ -679,6 +715,10 @@ func (h reingestHistoryRangeState) String() string {
)
}

func (reingestHistoryRangeState) GetState() State {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
Expand Down Expand Up @@ -833,6 +873,10 @@ func (waitForCheckpointState) String() string {
return "waitForCheckpoint"
}

func (waitForCheckpointState) GetState() State {
return WaitForCheckpoint
}

func (waitForCheckpointState) run(*system) (transition, error) {
log.Info("Waiting for the next checkpoint...")
time.Sleep(10 * time.Second)
Expand All @@ -854,6 +898,10 @@ func (v verifyRangeState) String() string {
)
}

func (verifyRangeState) GetState() State {
return VerifyRange
}

func (v verifyRangeState) run(s *system) (transition, error) {
if v.fromLedger == 0 || v.toLedger == 0 ||
v.fromLedger > v.toLedger {
Expand Down Expand Up @@ -985,6 +1033,10 @@ func (stressTestState) String() string {
return "stressTest"
}

func (stressTestState) GetState() State {
return StressTest
}

func (stressTestState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
err = errors.Wrap(err, "Error starting a transaction")
Expand Down
9 changes: 9 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type System interface {
ReingestRange(ledgerRanges []history.LedgerRange, force bool) error
BuildGenesisState() error
Shutdown()
GetCurrentState() State
}

type system struct {
Expand Down Expand Up @@ -228,6 +229,8 @@ type system struct {
runStateVerificationOnLedger func(uint32) bool

reapOffsets map[string]int64

currentState State
}

func NewSystem(config Config) (System, error) {
Expand Down Expand Up @@ -292,6 +295,7 @@ func NewSystem(config Config) (System, error) {
cancel: cancel,
config: config,
ctx: ctx,
currentState: None,
disableStateVerification: config.DisableStateVerification,
historyAdapter: historyAdapter,
historyQ: historyQ,
Expand Down Expand Up @@ -479,6 +483,10 @@ func (s *system) initMetrics() {
)
}

func (s *system) GetCurrentState() State {
return s.currentState
}

func (s *system) Metrics() Metrics {
return s.metrics
}
Expand Down Expand Up @@ -644,6 +652,7 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
panic("unexpected transaction")
}

s.currentState = cur.GetState()
next, err := cur.run(s)
if err != nil {
logger := log.WithFields(logpkg.F{
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ func (m *mockSystem) BuildGenesisState() error {
return args.Error(0)
}

func (m *mockSystem) GetCurrentState() State {
args := m.Called()
return args.Get(0).(State)
}

func (m *mockSystem) Shutdown() {
m.Called()
}
Expand Down

0 comments on commit 148bf79

Please sign in to comment.