Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Allow captive core to run with sqlite database #4092

Merged
merged 31 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
03dce0f
Skip in-memory flag for catchup with captive core
Nov 29, 2021
0426a49
run a catchup before run, intead of runFrom
Jan 18, 2022
baa254b
remove in-memory from core run
Jan 18, 2022
0f274b0
Remove now-unused ledger hash store
Jan 18, 2022
031cda3
fix govet
Jan 18, 2022
a3496b5
Can't catchup to ledger 1
Jan 19, 2022
f468799
catchup from x-1, not from x
Jan 25, 2022
f1789e8
Add database= flag to the captive core config file
Jan 26, 2022
760ba2f
Have to run stellar-core new-db to initialize the new db
Jan 26, 2022
2053b97
Experiment with pushing the catchup into stellar_core_runner to use t…
Jan 28, 2022
094468a
#4038: fixed unit test in ingest/ledgerbackend to mock for new captiv…
sreuland Jan 31, 2022
e30afe8
Merge remote-tracking branch 'origin/master' into 4038/captive-core-s…
sreuland Jan 31, 2022
ac70788
#4038: fixed unit test after merge conflict
sreuland Jan 31, 2022
2a9f950
Merge pull request #4207 from sreuland/4038/captive-core-sqlite
sreuland Feb 1, 2022
f16f43c
#4038: fixed integration test to not use GenesisState
sreuland Feb 1, 2022
21224eb
Merge pull request #4209 from sreuland/4038/captive-core-sqlite
sreuland Feb 2, 2022
c0968b3
Merge remote-tracking branch 'origin/master' into 4038/captive-core-s…
sreuland Feb 2, 2022
ff41661
Merge pull request #4214 from sreuland/4038/captive-core-sqlite
sreuland Feb 2, 2022
49040a2
Merge remote-tracking branch 'upstream/master' into 4038/captive-core…
sreuland Feb 7, 2022
805ba5f
Merge pull request, merge from master
sreuland Feb 7, 2022
b75964d
Merge remote-tracking branch 'origin/master' into 4038/captive-core-s…
sreuland Feb 8, 2022
31f8e3a
Merge remote-tracking branch 'upstream/4038/captive-core-sqlite' into…
sreuland Feb 8, 2022
c797d23
Merge pull request #4219, updated latest from master
sreuland Feb 8, 2022
a98d1ab
Merge remote-tracking branch 'upstream/master' into 4038/captive-core…
sreuland Feb 10, 2022
c1f4692
Merge pull request #4225 from sreuland/4038/captive-core-sqlite, port…
sreuland Feb 10, 2022
a0ce745
#4038: use --in-memory for cc ingest by default
sreuland Feb 16, 2022
21ca9cf
4038: incorporate PR feedback updates
sreuland Feb 16, 2022
5185fff
#4038: incorporate PR feedback, rename the sqlite db parameter to UseDB
sreuland Feb 17, 2022
f686301
#4038: incorporated UseDB in CaptiveCoreTomlParams and other PR feedback
sreuland Feb 17, 2022
df69623
Merge remote-tracking branch 'upstream/master' into 4038/captive-core…
sreuland Feb 17, 2022
4f825ff
Merge pull request #4235, port latest from master
sreuland Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ jobs:
enable-captive-core:
type: boolean
default: false
enable-captive-core-remote-storage:
type: boolean
default: false
working_directory: ~/go/src/github.com/stellar/go
machine:
image: ubuntu-2004:202010-01
Expand All @@ -452,6 +455,12 @@ jobs:
- run:
name: Setting Captive Core env variables
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE=true" >> $BASH_ENV
- when:
condition: <<parameters.enable-captive-core-remote-storage>>
steps:
- run:
name: Setting Captive Core Remote Storage env variable
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_REMOTE_STORAGE=true" >> $BASH_ENV
- run:
name: Run Horizon integration tests <<#parameters.enable-captive-core>>(With captive core)<</parameters.enable-captive-core>>
# Currently all integration tests are in a single directory.
Expand Down Expand Up @@ -480,6 +489,10 @@ workflows:
- test_horizon_integration:
name: test_horizon_integration_with_captive_core
enable-captive-core: true
- test_horizon_integration:
name: test_horizon_integration_with_captive_core_remote_storage
enable-captive-core: true
enable-captive-core-remote-storage: true
- test_verify_range_docker_image:
filters:
# we use test_verify_range_docker_image with publish in master
Expand Down
33 changes: 23 additions & 10 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type CaptiveCoreConfig struct {
// stored. We always append /captive-core to this directory, since we clean
// it up entirely on shutdown.
StoragePath string

// UseExternalStorageLedger, when true, instructs the core invocation to use an external db url
sreuland marked this conversation as resolved.
Show resolved Hide resolved
// for ledger states rather than in memory(RAM). The external db url is determined by the presence
// of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite
// and the db file will be stored at location derived from StoragePath parameter.
UseExternalStorageLedger bool
sreuland marked this conversation as resolved.
Show resolved Hide resolved
}

// NewCaptive returns a new CaptiveStellarCore instance.
Expand All @@ -142,6 +148,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
if parentCtx == nil {
parentCtx = context.Background()
}

var cancel context.CancelFunc
config.Context, cancel = context.WithCancel(parentCtx)

Expand Down Expand Up @@ -250,11 +257,8 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
var runner stellarCoreRunnerInterface
if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil {
return errors.Wrap(err, "error creating stellar-core runner")
} else {
// only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check
// see https://golang.org/doc/faq#nil_error
c.stellarCoreRunner = runner
}
c.stellarCoreRunner = runner

runFrom, ledgerHash, err := c.runFromParams(ctx, from)
if err != nil {
Expand All @@ -279,14 +283,19 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
}

// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) {
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (uint32, string, error) {

var runFrom uint32
sreuland marked this conversation as resolved.
Show resolved Hide resolved
var ledgerHash string
var err error

if from == 1 {
// Trying to start-from 1 results in an error from Stellar-Core:
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
// TODO maybe we can fix it by generating 1st ledger meta
// like GenesisLedgerStateReader?
err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
return
return 0, "", err
}

if from <= 63 {
Expand All @@ -304,20 +313,20 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
if err != nil {
err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom)
return
return 0, "", err
}
if exists {
return
return runFrom, ledgerHash, nil
}
}

ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
return
return 0, "", err
}
ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
return
return runFrom, ledgerHash, nil
}

// nextExpectedSequence returns nextLedger (if currently set) or start of
Expand Down Expand Up @@ -406,6 +415,10 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited {
return false
}

lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
Expand Down
7 changes: 6 additions & 1 deletion ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -490,13 +491,15 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: uint32(129),
}, nil)

mockArchive.
On("GetLedgerHeader", uint32(65)).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)
Expand Down Expand Up @@ -585,6 +588,7 @@ func TestCaptiveGetLedger(t *testing.T) {
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -1288,6 +1292,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
func TestCaptiveIsPrepared(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("context").Return(context.Background()).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)

// c.prepared == nil
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1351,6 +1356,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1447,5 +1453,4 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {

mockRunner.AssertExpectations(t)
mockArchive.AssertExpectations(t)
mockLedgerHashStore.AssertExpectations(t)
}
85 changes: 65 additions & 20 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type stellarCoreRunner struct {
processExited bool
processExitError error

storagePath string
nonce string
storagePath string
useExternalStorageLedger bool
nonce string

log *log.Entry
}
Expand Down Expand Up @@ -118,19 +119,20 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
ctx, cancel := context.WithCancel(config.Context)

runner := &stellarCoreRunner{
executablePath: config.BinaryPath,
ctx: ctx,
cancel: cancel,
storagePath: fullStoragePath,
mode: mode,
executablePath: config.BinaryPath,
ctx: ctx,
cancel: cancel,
storagePath: fullStoragePath,
useExternalStorageLedger: config.UseExternalStorageLedger,
mode: mode,
nonce: fmt.Sprintf(
"captive-stellar-core-%x",
rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
),
log: config.Log,
}

if conf, err := writeConf(config.Toml, mode, runner.getConfFileName()); err != nil {
if conf, err := writeConf(config.Toml, mode, config, fullStoragePath, runner.getConfFileName()); err != nil {
return nil, errors.Wrap(err, "error writing configuration")
} else {
runner.log.Debugf("captive core config file contents:\n%s", conf)
Expand All @@ -139,18 +141,18 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
return runner, nil
}

func writeConf(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode, location string) (string, error) {
text, err := generateConfig(captiveCoreToml, mode)
func writeConf(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode, config CaptiveCoreConfig, fullStoragePath string, location string) (string, error) {
text, err := generateConfig(captiveCoreToml, mode, config, fullStoragePath)
if err != nil {
return "", err
}

return string(text), ioutil.WriteFile(location, text, 0644)
}

func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) {
func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode, config CaptiveCoreConfig, fullStoragePath string) ([]byte, error) {
var err error
sreuland marked this conversation as resolved.
Show resolved Hide resolved
if mode == stellarCoreRunnerModeOffline {
var err error
captiveCoreToml, err = captiveCoreToml.CatchupToml()
if err != nil {
return nil, errors.Wrap(err, "could not generate catch up config")
Expand All @@ -161,6 +163,12 @@ func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode
return nil, errors.New("captive-core config file does not define any quorum set")
}

if config.UseExternalStorageLedger {
if captiveCoreToml, err = captiveCoreToml.ExternalLedgerStorageToml(fullStoragePath); err != nil {
return nil, errors.Wrap(err, "could not generate catch up config")
}
}

text, err := captiveCoreToml.Marshal()
if err != nil {
return nil, errors.Wrap(err, "could not marshal captive core config")
Expand Down Expand Up @@ -261,10 +269,24 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
}

rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
inMemory := "--in-memory"
sreuland marked this conversation as resolved.
Show resolved Hide resolved

// horizon operator has specified to use external storage for captive core ledger state
// instruct captive core invocation to not use memory, and in that case
// cc will look at DATABASE property in cfg toml for the external storage source to use.
// when using external storage of ledgers, use new-db to first set the state of
// remote db storage to genesis to purge any prior state and reset.
if r.useExternalStorageLedger {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}
inMemory = ""
}

r.cmd = r.createCmd(
"catchup", rangeArg,
"--metadata-output-stream", r.getPipeName(),
"--in-memory",
inMemory,
)

var err error
Expand Down Expand Up @@ -304,13 +326,36 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
"--start-at-hash", hash,
"--metadata-output-stream", r.getPipeName(),
)
if r.useExternalStorageLedger {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}
// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
// Can't catch up to the genesis ledger.
if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
if err := r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
}
r.cmd = r.createCmd(
"run",
"--metadata-output-stream",
r.getPipeName(),
)
} else {
r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
"--start-at-hash", hash,
"--metadata-output-stream", r.getPipeName(),
)
}

var err error
r.pipe, err = r.start(r.cmd)
Expand Down
17 changes: 17 additions & 0 deletions ingest/ledgerbackend/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var validQuality = map[string]bool{
"LOW": true,
}

var sqliteUrlRegEx = regexp.MustCompile(`^sqlite3://\S+`)
sreuland marked this conversation as resolved.
Show resolved Hide resolved

// Validator represents a [[VALIDATORS]] entry in the captive core toml file.
type Validator struct {
Name string `toml:"NAME"`
Expand Down Expand Up @@ -61,6 +63,7 @@ type QuorumSet struct {
}

type captiveCoreTomlValues struct {
Database string `toml:"DATABASE,omitempty"`
// we cannot omitempty because the empty string is a valid configuration for LOG_FILE_PATH
// and the default is stellar-core.log
LogFilePath string `toml:"LOG_FILE_PATH"`
Expand Down Expand Up @@ -404,6 +407,20 @@ func (c *CaptiveCoreToml) CatchupToml() (*CaptiveCoreToml, error) {
return offline, nil
}

func (c *CaptiveCoreToml) ExternalLedgerStorageToml(fullStoragePath string) (*CaptiveCoreToml, error) {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
newToml, err := c.clone()
if err != nil {
return nil, errors.Wrap(err, "could not clone toml")
}

if len(newToml.Database) == 0 {
newToml.Database = fmt.Sprintf("sqlite3://%v/stellar.db", fullStoragePath)
sreuland marked this conversation as resolved.
Show resolved Hide resolved
} else if !sqliteUrlRegEx.MatchString(newToml.Database) {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.Errorf("Invalid DATABASE parameter for captive core config, must be valid sqlite3 db url")
}
return newToml, nil
}

func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) {
if !c.tree.Has("NETWORK_PASSPHRASE") {
c.NetworkPassphrase = params.NetworkPassphrase
Expand Down
Loading