Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Allow captive core to run with sqlite database #4092

Merged
merged 31 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
03dce0f
Skip in-memory flag for catchup with captive core
Nov 29, 2021
0426a49
run a catchup before run, intead of runFrom
Jan 18, 2022
baa254b
remove in-memory from core run
Jan 18, 2022
0f274b0
Remove now-unused ledger hash store
Jan 18, 2022
031cda3
fix govet
Jan 18, 2022
a3496b5
Can't catchup to ledger 1
Jan 19, 2022
f468799
catchup from x-1, not from x
Jan 25, 2022
f1789e8
Add database= flag to the captive core config file
Jan 26, 2022
760ba2f
Have to run stellar-core new-db to initialize the new db
Jan 26, 2022
2053b97
Experiment with pushing the catchup into stellar_core_runner to use t…
Jan 28, 2022
094468a
#4038: fixed unit test in ingest/ledgerbackend to mock for new captiv…
sreuland Jan 31, 2022
e30afe8
Merge remote-tracking branch 'origin/master' into 4038/captive-core-s…
sreuland Jan 31, 2022
ac70788
#4038: fixed unit test after merge conflict
sreuland Jan 31, 2022
2a9f950
Merge pull request #4207 from sreuland/4038/captive-core-sqlite
sreuland Feb 1, 2022
f16f43c
#4038: fixed integration test to not use GenesisState
sreuland Feb 1, 2022
21224eb
Merge pull request #4209 from sreuland/4038/captive-core-sqlite
sreuland Feb 2, 2022
c0968b3
Merge remote-tracking branch 'origin/master' into 4038/captive-core-s…
sreuland Feb 2, 2022
ff41661
Merge pull request #4214 from sreuland/4038/captive-core-sqlite
sreuland Feb 2, 2022
49040a2
Merge remote-tracking branch 'upstream/master' into 4038/captive-core…
sreuland Feb 7, 2022
805ba5f
Merge pull request, merge from master
sreuland Feb 7, 2022
b75964d
Merge remote-tracking branch 'origin/master' into 4038/captive-core-s…
sreuland Feb 8, 2022
31f8e3a
Merge remote-tracking branch 'upstream/4038/captive-core-sqlite' into…
sreuland Feb 8, 2022
c797d23
Merge pull request #4219, updated latest from master
sreuland Feb 8, 2022
a98d1ab
Merge remote-tracking branch 'upstream/master' into 4038/captive-core…
sreuland Feb 10, 2022
c1f4692
Merge pull request #4225 from sreuland/4038/captive-core-sqlite, port…
sreuland Feb 10, 2022
a0ce745
#4038: use --in-memory for cc ingest by default
sreuland Feb 16, 2022
21ca9cf
4038: incorporate PR feedback updates
sreuland Feb 16, 2022
5185fff
#4038: incorporate PR feedback, rename the sqlite db parameter to UseDB
sreuland Feb 17, 2022
f686301
#4038: incorporated UseDB in CaptiveCoreTomlParams and other PR feedback
sreuland Feb 17, 2022
df69623
Merge remote-tracking branch 'upstream/master' into 4038/captive-core…
sreuland Feb 17, 2022
4f825ff
Merge pull request #4235, port latest from master
sreuland Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ jobs:
enable-captive-core:
type: boolean
default: false
enable-captive-core-remote-storage:
type: boolean
default: false
working_directory: ~/go/src/github.com/stellar/go
machine:
image: ubuntu-2004:202010-01
Expand All @@ -452,6 +455,12 @@ jobs:
- run:
name: Setting Captive Core env variables
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE=true" >> $BASH_ENV
- when:
condition: <<parameters.enable-captive-core-remote-storage>>
steps:
- run:
name: Setting Captive Core Remote Storage env variable
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB=true" >> $BASH_ENV
- run:
name: Run Horizon integration tests <<#parameters.enable-captive-core>>(With captive core)<</parameters.enable-captive-core>>
# Currently all integration tests are in a single directory.
Expand Down Expand Up @@ -480,6 +489,10 @@ workflows:
- test_horizon_integration:
name: test_horizon_integration_with_captive_core
enable-captive-core: true
- test_horizon_integration:
name: test_horizon_integration_with_captive_core_remote_storage
enable-captive-core: true
enable-captive-core-remote-storage: true
- test_verify_range_docker_image:
filters:
# we use test_verify_range_docker_image with publish in master
Expand Down
42 changes: 25 additions & 17 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type CaptiveCoreConfig struct {
// stored. We always append /captive-core to this directory, since we clean
// it up entirely on shutdown.
StoragePath string

// UseDB, when true, instructs the core invocation to use an external db url
// for ledger states rather than in memory(RAM). The external db url is determined by the presence
// of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite
// and the db file will be stored at location derived from StoragePath parameter.
UseDB bool
}

// NewCaptive returns a new CaptiveStellarCore instance.
Expand All @@ -142,6 +148,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
if parentCtx == nil {
parentCtx = context.Background()
}

var cancel context.CancelFunc
config.Context, cancel = context.WithCancel(parentCtx)

Expand Down Expand Up @@ -250,11 +257,8 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
var runner stellarCoreRunnerInterface
if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil {
return errors.Wrap(err, "error creating stellar-core runner")
} else {
// only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check
// see https://golang.org/doc/faq#nil_error
c.stellarCoreRunner = runner
}
c.stellarCoreRunner = runner

runFrom, ledgerHash, err := c.runFromParams(ctx, from)
if err != nil {
Expand All @@ -279,14 +283,15 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
}

// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) {
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (uint32, string, error) {

if from == 1 {
// Trying to start-from 1 results in an error from Stellar-Core:
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
// TODO maybe we can fix it by generating 1st ledger meta
// like GenesisLedgerStateReader?
err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
return
err := errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
return 0, "", err
}

if from <= 63 {
Expand All @@ -298,26 +303,25 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
from = 3
}

runFrom = from - 1
runFrom := from - 1
if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
ledgerHash, exists, err := c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
if err != nil {
err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom)
return
return 0, "", err
}
if exists {
return
return runFrom, ledgerHash, nil
}
}

ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
return
ledgerHeader, err := c.archive.GetLedgerHeader(from)
if err != nil {
return 0, "", errors.Wrapf(err, "error trying to read ledger header %d from HAS", from)
}
ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
return
ledgerHash := hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
return runFrom, ledgerHash, nil
}

// nextExpectedSequence returns nextLedger (if currently set) or start of
Expand Down Expand Up @@ -406,6 +410,10 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited {
return false
}

lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
Expand Down
7 changes: 6 additions & 1 deletion ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -490,13 +491,15 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: uint32(129),
}, nil)

mockArchive.
On("GetLedgerHeader", uint32(65)).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)
Expand Down Expand Up @@ -585,6 +588,7 @@ func TestCaptiveGetLedger(t *testing.T) {
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -1288,6 +1292,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
func TestCaptiveIsPrepared(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("context").Return(context.Background()).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)

// c.prepared == nil
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1351,6 +1356,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1447,5 +1453,4 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {

mockRunner.AssertExpectations(t)
mockArchive.AssertExpectations(t)
mockLedgerHashStore.AssertExpectations(t)
}
58 changes: 46 additions & 12 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type stellarCoreRunner struct {
processExitError error

storagePath string
useDB bool
nonce string

log *log.Entry
Expand Down Expand Up @@ -122,6 +123,7 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
ctx: ctx,
cancel: cancel,
storagePath: fullStoragePath,
useDB: config.UseDB,
mode: mode,
nonce: fmt.Sprintf(
"captive-stellar-core-%x",
Expand Down Expand Up @@ -261,11 +263,22 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
}

rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
r.cmd = r.createCmd(
"catchup", rangeArg,
"--metadata-output-stream", r.getPipeName(),
"--in-memory",
)
params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()}

// horizon operator has specified to use external storage for captive core ledger state
// instruct captive core invocation to not use memory, and in that case
// cc will look at DATABASE property in cfg toml for the external storage source to use.
// when using external storage of ledgers, use new-db to first set the state of
// remote db storage to genesis to purge any prior state and reset.
if r.useDB {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}
} else {
params = append(params, "--in-memory")
}

r.cmd = r.createCmd(params...)

var err error
r.pipe, err = r.start(r.cmd)
Expand Down Expand Up @@ -304,13 +317,34 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
"--start-at-hash", hash,
"--metadata-output-stream", r.getPipeName(),
)
if r.useDB {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}
// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else if err := r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}

r.cmd = r.createCmd(
"run",
"--metadata-output-stream",
r.getPipeName(),
)
} else {
r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
"--start-at-hash", hash,
"--metadata-output-stream", r.getPipeName(),
)
}

var err error
r.pipe, err = r.start(r.cmd)
Expand Down
1 change: 1 addition & 0 deletions ingest/ledgerbackend/testdata/expected-offline-core.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Generated file, do not edit
DATABASE = "sqlite3://stellar.db"
FAILURE_SAFETY = 0
HTTP_PORT = 0
LOG_FILE_PATH = ""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DATABASE limited to only be sqlite:// protocol
DATABASE="postgres://mydb"

[[HOME_DOMAINS]]
HOME_DOMAIN="testnet.stellar.org"
QUALITY="MEDIUM"

[[VALIDATORS]]
NAME="sdf_testnet_1"
HOME_DOMAIN="testnet.stellar.org"
PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y"
ADDRESS="localhost:123"
12 changes: 12 additions & 0 deletions ingest/ledgerbackend/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type QuorumSet struct {
}

type captiveCoreTomlValues struct {
Database string `toml:"DATABASE,omitempty"`
// we cannot omitempty because the empty string is a valid configuration for LOG_FILE_PATH
// and the default is stellar-core.log
LogFilePath string `toml:"LOG_FILE_PATH"`
Expand Down Expand Up @@ -312,6 +313,8 @@ type CaptiveCoreTomlParams struct {
LogPath *string
// Strict is a flag which, if enabled, rejects Stellar Core toml fields which are not supported by captive core.
Strict bool
// If true, specifies that captive core should be invoked with on-disk rather than in-memory option for ledger state
UseDB bool
}

// NewCaptiveCoreTomlFromFile constructs a new CaptiveCoreToml instance by merging configuration
Expand Down Expand Up @@ -405,6 +408,11 @@ func (c *CaptiveCoreToml) CatchupToml() (*CaptiveCoreToml, error) {
}

func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) {

if params.UseDB && !c.tree.Has("DATABASE") {
c.Database = "sqlite3://stellar.db"
}

if !c.tree.Has("NETWORK_PASSPHRASE") {
c.NetworkPassphrase = params.NetworkPassphrase
}
Expand Down Expand Up @@ -549,5 +557,9 @@ func (c *CaptiveCoreToml) validate(params CaptiveCoreTomlParams) error {
names[v.Name] = true
}

if len(c.Database) > 0 && !strings.HasPrefix(c.Database, "sqlite3://") {
return fmt.Errorf("invalid DATABASE parameter: %s, for captive core config, must be valid sqlite3 db url", c.Database)
}

return nil
}
Loading