Skip to content

Commit

Permalink
services/horizon: Suppress Core timeout error when ingestion state ma…
Browse files Browse the repository at this point in the history
…chine is in build state.
  • Loading branch information
urvisavla committed May 2, 2023
1 parent d795eee commit b6e878b
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
3 changes: 3 additions & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ func (a *App) UpdateCoreLedgerState(ctx context.Context) {

coreInfo, err := coreClient.Info(ctx)
if err != nil {
if a.ingester != nil && a.ingester.GetCurrentState() == ingest.Build {
return
}
logErr(err, "failed to load the stellar-core info")
return
}
Expand Down
52 changes: 52 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,25 @@ func (e ErrReingestRangeConflict) Error() string {
return fmt.Sprintf("reingest range overlaps with horizon ingestion, supplied range shouldn't contain ledger %d", e.maximumLedgerSequence)
}

type StateName int

const (
None StateName = iota
Start
Stop
Build
Resume
WaitForCheckpoint
StressTest
VerifyRange
HistoryRange
ReingestHistoryRange
)

type stateMachineNode interface {
run(*system) (transition, error)
String() string
GetName() StateName
}

type transition struct {
Expand Down Expand Up @@ -105,6 +121,10 @@ func (stopState) String() string {
return "stop"
}

func (stopState) GetName() StateName {
return Stop
}

func (stopState) run(s *system) (transition, error) {
return stop(), errors.New("Cannot run terminal state")
}
Expand All @@ -117,6 +137,10 @@ func (startState) String() string {
return "start"
}

func (startState) GetName() StateName {
return Start
}

func (state startState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
Expand Down Expand Up @@ -234,6 +258,10 @@ func (b buildState) String() string {
return fmt.Sprintf("buildFromCheckpoint(checkpointLedger=%d, skipChecks=%t)", b.checkpointLedger, b.skipChecks)
}

func (buildState) GetName() StateName {
return Build
}

func (b buildState) run(s *system) (transition, error) {
var nextFailState = start()
if b.stop {
Expand Down Expand Up @@ -377,6 +405,10 @@ func (r resumeState) String() string {
return fmt.Sprintf("resume(latestSuccessfullyProcessedLedger=%d)", r.latestSuccessfullyProcessedLedger)
}

func (resumeState) GetName() StateName {
return Resume
}

func (r resumeState) run(s *system) (transition, error) {
if r.latestSuccessfullyProcessedLedger == 0 {
return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value")
Expand Down Expand Up @@ -566,6 +598,10 @@ func (h historyRangeState) String() string {
)
}

func (historyRangeState) GetName() StateName {
return HistoryRange
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
Expand Down Expand Up @@ -679,6 +715,10 @@ func (h reingestHistoryRangeState) String() string {
)
}

func (reingestHistoryRangeState) GetName() StateName {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
Expand Down Expand Up @@ -833,6 +873,10 @@ func (waitForCheckpointState) String() string {
return "waitForCheckpoint"
}

func (waitForCheckpointState) GetName() StateName {
return WaitForCheckpoint
}

func (waitForCheckpointState) run(*system) (transition, error) {
log.Info("Waiting for the next checkpoint...")
time.Sleep(10 * time.Second)
Expand All @@ -854,6 +898,10 @@ func (v verifyRangeState) String() string {
)
}

func (verifyRangeState) GetName() StateName {
return VerifyRange
}

func (v verifyRangeState) run(s *system) (transition, error) {
if v.fromLedger == 0 || v.toLedger == 0 ||
v.fromLedger > v.toLedger {
Expand Down Expand Up @@ -985,6 +1033,10 @@ func (stressTestState) String() string {
return "stressTest"
}

func (stressTestState) GetName() StateName {
return StressTest
}

func (stressTestState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
err = errors.Wrap(err, "Error starting a transaction")
Expand Down
9 changes: 9 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type System interface {
ReingestRange(ledgerRanges []history.LedgerRange, force bool) error
BuildGenesisState() error
Shutdown()
GetCurrentState() StateName
}

type system struct {
Expand Down Expand Up @@ -228,6 +229,8 @@ type system struct {
runStateVerificationOnLedger func(uint32) bool

reapOffsets map[string]int64

currentState StateName
}

func NewSystem(config Config) (System, error) {
Expand Down Expand Up @@ -292,6 +295,7 @@ func NewSystem(config Config) (System, error) {
cancel: cancel,
config: config,
ctx: ctx,
currentState: None,
disableStateVerification: config.DisableStateVerification,
historyAdapter: historyAdapter,
historyQ: historyQ,
Expand Down Expand Up @@ -479,6 +483,10 @@ func (s *system) initMetrics() {
)
}

func (s *system) GetCurrentState() StateName {
return s.currentState
}

func (s *system) Metrics() Metrics {
return s.metrics
}
Expand Down Expand Up @@ -644,6 +652,7 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
panic("unexpected transaction")
}

s.currentState = cur.GetName()
next, err := cur.run(s)
if err != nil {
logger := log.WithFields(logpkg.F{
Expand Down

0 comments on commit b6e878b

Please sign in to comment.