Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Implement db backed ledger store for the captive core backend #3203

Merged
merged 4 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exp/services/captivecore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Usage:
captivecore [flags]

Flags:
--db-url Horizon Postgres URL (optional) used to lookup the ledger hash for sequence numbers
--stellar-core-binary-path Path to stellar core binary
--stellar-core-config-path Path to stellar core config file
--history-archive-urls Comma-separated list of stellar history archives to connect with
Expand Down
33 changes: 31 additions & 2 deletions exp/services/captivecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/db"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
)

func main() {
var port int
var networkPassphrase, binaryPath, configPath string
var networkPassphrase, binaryPath, configPath, dbURL string
var historyArchiveURLs []string
var logLevel logrus.Level
logger := supportlog.New()
Expand Down Expand Up @@ -85,6 +86,14 @@ func main() {
},
Usage: "minimum log severity (debug, info, warn, error) to log",
},
&config.ConfigOption{
Name: "db-url",
EnvVar: "DATABASE_URL",
ConfigKey: &dbURL,
OptType: types.String,
Required: false,
Usage: "horizon postgres database to connect with",
},
}
cmd := &cobra.Command{
Use: "captivecore",
Expand All @@ -94,7 +103,24 @@ func main() {
configOpts.SetValues()
logger.Level = logLevel

core, err := ledgerbackend.NewCaptive(binaryPath, configPath, networkPassphrase, historyArchiveURLs)
captiveConfig := ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: binaryPath,
StellarCoreConfigPath: configPath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyArchiveURLs,
}

var dbConn *db.Session
if len(dbURL) > 0 {
var err error
dbConn, err = db.Open("postgres", dbURL)
if err != nil {
logger.WithError(err).Fatal("Could not create db connection instance")
}
captiveConfig.LedgerHashStore = ledgerbackend.NewHorizonDBLedgerHashStore(dbConn)
}

core, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
logger.WithError(err).Fatal("Could not create captive core instance")
}
Expand All @@ -108,6 +134,9 @@ func main() {
},
OnStopping: func() {
api.Shutdown()
if dbConn != nil {
dbConn.Close()
}
},
})
},
Expand Down
10 changes: 6 additions & 4 deletions exp/tools/captive-core-start-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ func main() {

func check(ledger uint32) bool {
c, err := ledgerbackend.NewCaptive(
"stellar-core",
"stellar-core-standalone2.cfg",
"Standalone Network ; February 2017",
[]string{"http://localhost:1570"},
ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: "stellar-core",
StellarCoreConfigPath: "stellar-core-standalone2.cfg",
NetworkPassphrase: "Standalone Network ; February 2017",
HistoryArchiveURLs: []string{"http://localhost:1570"},
},
)
if err != nil {
panic(err)
Expand Down
10 changes: 6 additions & 4 deletions ingest/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ func Example_changes() {

// Requires Stellar-Core 13.2.0+
backend, err := ledgerbackend.NewCaptive(
"/bin/stellar-core",
"/opt/stellar-core.cfg",
networkPassphrase,
[]string{archiveURL},
ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: "/bin/stellar-core",
StellarCoreConfigPath: "/opt/stellar-core.cfg",
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: []string{archiveURL},
},
)
if err != nil {
panic(err)
Expand Down
49 changes: 41 additions & 8 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type CaptiveStellarCore struct {
networkPassphrase string
historyURLs []string
archive historyarchive.ArchiveInterface
ledgerHashStore TrustedLedgerHashStore

ledgerBuffer bufferedLedgerMetaReader

Expand Down Expand Up @@ -94,15 +95,29 @@ type CaptiveStellarCore struct {
log *log.Entry
}

// CaptiveCoreConfig contains all the parameters required to create a CaptiveStellarCore instance
type CaptiveCoreConfig struct {
// StellarCoreBinaryPath is the file path to the Stellar Core binary
StellarCoreBinaryPath string
// StellarCoreConfigPath is the file path to the Stellar Core configuration file used by captive core
StellarCoreConfigPath string
// NetworkPassphrase is the Stellar network passphrase used by captive core when connecting to the Stellar network
NetworkPassphrase string
// HistoryArchiveURLs are a list of history archive urls
HistoryArchiveURLs []string
// LedgerHashStore is an optional store used to obtain hashes for ledger sequences from a trusted source
LedgerHashStore TrustedLedgerHashStore
}

// NewCaptive returns a new CaptiveStellarCore.
//
// All parameters are required, except configPath which is not required when
// working with BoundedRanges only.
func NewCaptive(executablePath, configPath, networkPassphrase string, historyURLs []string) (*CaptiveStellarCore, error) {
func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
archive, err := historyarchive.Connect(
historyURLs[0],
config.HistoryArchiveURLs[0],
historyarchive.ConnectOptions{
NetworkPassphrase: networkPassphrase,
NetworkPassphrase: config.NetworkPassphrase,
},
)
if err != nil {
Expand All @@ -111,14 +126,20 @@ func NewCaptive(executablePath, configPath, networkPassphrase string, historyURL

c := &CaptiveStellarCore{
archive: archive,
executablePath: executablePath,
configPath: configPath,
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
executablePath: config.StellarCoreBinaryPath,
configPath: config.StellarCoreConfigPath,
historyURLs: config.HistoryArchiveURLs,
networkPassphrase: config.NetworkPassphrase,
waitIntervalPrepareRange: time.Second,
ledgerHashStore: config.LedgerHashStore,
}
c.stellarCoreRunnerFactory = func(configPath2 string) (stellarCoreRunnerInterface, error) {
runner, innerErr := newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
runner, innerErr := newStellarCoreRunner(
config.StellarCoreBinaryPath,
configPath2,
config.NetworkPassphrase,
config.HistoryArchiveURLs,
)
if innerErr != nil {
return runner, innerErr
}
Expand Down Expand Up @@ -296,6 +317,18 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH
}

runFrom = from - 1
if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(from)
if err != nil {
err = errors.Wrapf(err, "error trying to read ledger hash %d", from)
return
}
if exists {
return
}
}

ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
Expand Down
69 changes: 65 additions & 4 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ func TestCaptiveNew(t *testing.T) {
historyURLs := []string{"http://history.stellar.org/prd/core-live/core_live_001"}

captiveStellarCore, err := NewCaptive(
executablePath,
configPath,
networkPassphrase,
historyURLs,
CaptiveCoreConfig{
StellarCoreBinaryPath: executablePath,
StellarCoreConfigPath: configPath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyURLs,
},
)

assert.NoError(t, err)
Expand Down Expand Up @@ -836,6 +838,65 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) {
assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error")
}

func TestCaptiveUseOfLedgerHashStore(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetLedgerHeader", uint32(255)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1, 1, 1, 1},
},
}, nil)

mockLedgerHashStore := &MockLedgerHashStore{}
mockLedgerHashStore.On("GetLedgerHash", uint32(1023)).
Return("", false, fmt.Errorf("transient error")).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(255)).
Return("", false, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(63)).
Return("cde", true, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(127)).
Return("ghi", true, nil).Once()

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunner: mockRunner,
ledgerHashStore: mockLedgerHashStore,
}

runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24)
assert.NoError(t, err)
assert.Equal(t, uint32(2), runFrom)
assert.Equal(t, "", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86)
assert.NoError(t, err)
assert.Equal(t, uint32(62), runFrom)
assert.Equal(t, "cde", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(128)
assert.NoError(t, err)
assert.Equal(t, uint32(126), runFrom)
assert.Equal(t, "ghi", ledgerHash)
assert.Equal(t, uint32(64), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(1050)
assert.EqualError(t, err, "error trying to read ledger hash 1023: transient error")

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(300)
assert.NoError(t, err)
assert.Equal(t, uint32(254), runFrom, "runFrom")
assert.Equal(t, "0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
assert.Equal(t, uint32(192), nextLedger, "nextLedger")

mockLedgerHashStore.AssertExpectations(t)
mockArchive.AssertExpectations(t)
}

func TestCaptiveRunFromParams(t *testing.T) {
var tests = []struct {
from uint32
Expand Down
50 changes: 50 additions & 0 deletions ingest/ledgerbackend/ledger_hash_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ledgerbackend

import (
sq "github.com/Masterminds/squirrel"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/support/db"
)

// TrustedLedgerHashStore is used to query ledger data from a trusted source.
// The store should contain ledgers verified by Stellar-Core, do not use untrusted
// source like history archives.
type TrustedLedgerHashStore interface {
// GetLedgerHash returns the ledger hash for the given sequence number
GetLedgerHash(seq uint32) (string, bool, error)
}

// HorizonDBLedgerHashStore is a TrustedLedgerHashStore which uses horizon's db to look up ledger hashes
type HorizonDBLedgerHashStore struct {
session *db.Session
}

// NewHorizonDBLedgerHashStore constructs a new TrustedLedgerHashStore backed by the horizon db
func NewHorizonDBLedgerHashStore(session *db.Session) TrustedLedgerHashStore {
return HorizonDBLedgerHashStore{session: session}
}

// GetLedgerHash returns the ledger hash for the given sequence number
func (h HorizonDBLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error) {
sql := sq.Select("hl.ledger_hash").From("history_ledgers hl").
Limit(1).Where("sequence = ?", seq)

var hash string
err := h.session.Get(&hash, sql)
if h.session.NoRows(err) {
return hash, false, nil
}
return hash, true, err
}

// MockLedgerHashStore is a mock implementation of TrustedLedgerHashStore
type MockLedgerHashStore struct {
mock.Mock
}

// GetLedgerHash returns the ledger hash for the given sequence number
func (m *MockLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error) {
args := m.Called(seq)
return args.Get(0).(string), args.Get(1).(bool), args.Error(2)
}
11 changes: 11 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/guregu/null"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -57,6 +58,11 @@ func TestInsertLedger(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

ledgerHashStore := ledgerbackend.NewHorizonDBLedgerHashStore(tt.HorizonSession())
_, exists, err := ledgerHashStore.GetLedgerHash(100)
tt.Assert.NoError(err)
tt.Assert.False(exists)

expectedLedger := Ledger{
Sequence: 69859,
LedgerHash: "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
Expand Down Expand Up @@ -136,4 +142,9 @@ func TestInsertLedger(t *testing.T) {
expectedLedger.ClosedAt = ledgerFromDB.ClosedAt

tt.Assert.Equal(expectedLedger, ledgerFromDB)

hash, exists, err := ledgerHashStore.GetLedgerHash(uint32(expectedLedger.Sequence))
tt.Assert.NoError(err)
tt.Assert.True(exists)
tt.Assert.Equal(expectedLedger.LedgerHash, hash)
}
11 changes: 7 additions & 4 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,13 @@ func NewSystem(config Config) (System, error) {
} else {
var captiveCoreBackend *ledgerbackend.CaptiveStellarCore
captiveCoreBackend, err = ledgerbackend.NewCaptive(
config.StellarCoreBinaryPath,
config.StellarCoreConfigPath,
config.NetworkPassphrase,
[]string{config.HistoryArchiveURL},
ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: config.StellarCoreBinaryPath,
StellarCoreConfigPath: config.StellarCoreConfigPath,
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURLs: []string{config.HistoryArchiveURL},
LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession),
},
)
if err != nil {
cancel()
Expand Down