diff --git a/exp/services/captivecore/internal/api.go b/exp/services/captivecore/internal/api.go index 9b70c19188..9e666e565c 100644 --- a/exp/services/captivecore/internal/api.go +++ b/exp/services/captivecore/internal/api.go @@ -25,7 +25,7 @@ type rangeRequest struct { readyDuration int valid bool ready bool - sync.RWMutex + sync.Mutex } // CaptiveCoreAPI manages a shared captive core subprocess and exposes an API for @@ -141,8 +141,8 @@ func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerba // GetLatestLedgerSequence determines the latest ledger sequence available on the captive core instance. func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSequenceResponse, error) { - c.activeRequest.RLock() - defer c.activeRequest.RUnlock() + c.activeRequest.Lock() + defer c.activeRequest.Unlock() if !c.activeRequest.valid { return ledgerbackend.LatestLedgerSequenceResponse{}, ErrMissingPrepareRange @@ -152,13 +152,16 @@ func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSe } seq, err := c.core.GetLatestLedgerSequence() + if err != nil { + c.activeRequest.valid = false + } return ledgerbackend.LatestLedgerSequenceResponse{Sequence: seq}, err } // GetLedger fetches the ledger with the given sequence number from the captive core instance. func (c *CaptiveCoreAPI) GetLedger(sequence uint32) (ledgerbackend.LedgerResponse, error) { - c.activeRequest.RLock() - defer c.activeRequest.RUnlock() + c.activeRequest.Lock() + defer c.activeRequest.Unlock() if !c.activeRequest.valid { return ledgerbackend.LedgerResponse{}, ErrMissingPrepareRange @@ -168,6 +171,9 @@ func (c *CaptiveCoreAPI) GetLedger(sequence uint32) (ledgerbackend.LedgerRespons } present, ledger, err := c.core.GetLedger(sequence) + if err != nil { + c.activeRequest.valid = false + } return ledgerbackend.LedgerResponse{ Present: present, Ledger: ledgerbackend.Base64Ledger(ledger), diff --git a/exp/services/captivecore/main.go b/exp/services/captivecore/main.go index d4f02f0545..ef7055da18 100644 --- a/exp/services/captivecore/main.go +++ b/exp/services/captivecore/main.go @@ -101,7 +101,7 @@ func main() { Run: func(_ *cobra.Command, _ []string) { configOpts.Require() configOpts.SetValues() - logger.Level = logLevel + logger.SetLevel(logLevel) captiveConfig := ledgerbackend.CaptiveCoreConfig{ StellarCoreBinaryPath: binaryPath, @@ -124,7 +124,8 @@ func main() { if err != nil { logger.WithError(err).Fatal("Could not create captive core instance") } - api := internal.NewCaptiveCoreAPI(core, logger) + core.SetStellarCoreLogger(logger.WithField("subservice", "stellar-core")) + api := internal.NewCaptiveCoreAPI(core, logger.WithField("subservice", "api")) supporthttp.Run(supporthttp.Config{ ListenAddr: fmt.Sprintf(":%d", port), diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index a69a222eb5..7fdade1632 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -326,9 +326,9 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH runFrom = from - 1 if c.ledgerHashStore != nil { var exists bool - ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(from) + ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(runFrom) if err != nil { - err = errors.Wrapf(err, "error trying to read ledger hash %d", from) + err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom) return } if exists { diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 98f6fe72b8..549ce25c8f 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -962,13 +962,13 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { }, nil) mockLedgerHashStore := &MockLedgerHashStore{} - mockLedgerHashStore.On("GetLedgerHash", uint32(1023)). + mockLedgerHashStore.On("GetLedgerHash", uint32(1022)). Return("", false, fmt.Errorf("transient error")).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(255)). + mockLedgerHashStore.On("GetLedgerHash", uint32(254)). Return("", false, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(63)). + mockLedgerHashStore.On("GetLedgerHash", uint32(62)). Return("cde", true, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(127)). + mockLedgerHashStore.On("GetLedgerHash", uint32(126)). Return("ghi", true, nil).Once() captiveBackend := CaptiveStellarCore{ @@ -997,7 +997,7 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { assert.Equal(t, uint32(64), nextLedger) runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(1050) - assert.EqualError(t, err, "error trying to read ledger hash 1023: transient error") + assert.EqualError(t, err, "error trying to read ledger hash 1022: transient error") runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(300) assert.NoError(t, err) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 582c2194c3..a3e9b5c722 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -1,10 +1,15 @@ # Changelog All notable changes to this project will be documented in this -file. This project adheres to [Semantic Versioning](http://semver.org/).x +file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +## v1.12.0 + +* Add Prometheus metrics for the duration of ingestion processors ([#3224](https://github.com/stellar/go/pull/3224)) +* Many Captive Core improvements and fixes ([#3232](https://github.com/stellar/go/pull/3232), [#3223](https://github.com/stellar/go/pull/3223), [#3226](https://github.com/stellar/go/pull/3226), [#3203](https://github.com/stellar/go/pull/3203), [#3189](https://github.com/stellar/go/pull/3189), [#3187](https://github.com/stellar/go/pull/3187)) + ## v1.11.1 * Fix bug in parsing `db-url` parameter in `horizon db migrate` and `horizon db init` commands ([#3192](https://github.com/stellar/go/pull/3192)).