Skip to content

Commit

Permalink
ingest/ledgerbackend: Implement db backed ledger store for the captiv…
Browse files Browse the repository at this point in the history
…e core backend (#3203)

The history archives cannot be fully trusted because there's always the possibility that someone can compromise the history archives either by compromising the S3 bucket which stores the archives or compromising one of the layers above the s3 bucket. When we provide a corrupt ledger hash to stellar core as --start-at-hash parameter, stellar core will download the ledger chain and validate all the hashes from consensus back to the start hash. If the start hash is not valid, stellar-core will exit with an error before streaming any ledgers. This means that ingestion will be blocked until the history archives are fixed.

Since stellar core will always verify the ledger chain before emitting any ledgers, we can assume that any ledger hashes ingested into the horizon database are correct. So, in the scenario where the history archives are corrupt, we can avoid blocking ingestion by using ledger hashes found in the Horizon database.
  • Loading branch information
tamirms authored Nov 13, 2020
1 parent cc08ac6 commit 21e076a
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 26 deletions.
1 change: 1 addition & 0 deletions exp/services/captivecore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Usage:
captivecore [flags]
Flags:
--db-url Horizon Postgres URL (optional) used to lookup the ledger hash for sequence numbers
--stellar-core-binary-path Path to stellar core binary
--stellar-core-config-path Path to stellar core config file
--history-archive-urls Comma-separated list of stellar history archives to connect with
Expand Down
33 changes: 31 additions & 2 deletions exp/services/captivecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/db"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
)

func main() {
var port int
var networkPassphrase, binaryPath, configPath string
var networkPassphrase, binaryPath, configPath, dbURL string
var historyArchiveURLs []string
var logLevel logrus.Level
logger := supportlog.New()
Expand Down Expand Up @@ -85,6 +86,14 @@ func main() {
},
Usage: "minimum log severity (debug, info, warn, error) to log",
},
&config.ConfigOption{
Name: "db-url",
EnvVar: "DATABASE_URL",
ConfigKey: &dbURL,
OptType: types.String,
Required: false,
Usage: "horizon postgres database to connect with",
},
}
cmd := &cobra.Command{
Use: "captivecore",
Expand All @@ -94,7 +103,24 @@ func main() {
configOpts.SetValues()
logger.Level = logLevel

core, err := ledgerbackend.NewCaptive(binaryPath, configPath, networkPassphrase, historyArchiveURLs)
captiveConfig := ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: binaryPath,
StellarCoreConfigPath: configPath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyArchiveURLs,
}

var dbConn *db.Session
if len(dbURL) > 0 {
var err error
dbConn, err = db.Open("postgres", dbURL)
if err != nil {
logger.WithError(err).Fatal("Could not create db connection instance")
}
captiveConfig.LedgerHashStore = ledgerbackend.NewHorizonDBLedgerHashStore(dbConn)
}

core, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
logger.WithError(err).Fatal("Could not create captive core instance")
}
Expand All @@ -108,6 +134,9 @@ func main() {
},
OnStopping: func() {
api.Shutdown()
if dbConn != nil {
dbConn.Close()
}
},
})
},
Expand Down
10 changes: 6 additions & 4 deletions exp/tools/captive-core-start-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ func main() {

func check(ledger uint32) bool {
c, err := ledgerbackend.NewCaptive(
"stellar-core",
"stellar-core-standalone2.cfg",
"Standalone Network ; February 2017",
[]string{"http://localhost:1570"},
ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: "stellar-core",
StellarCoreConfigPath: "stellar-core-standalone2.cfg",
NetworkPassphrase: "Standalone Network ; February 2017",
HistoryArchiveURLs: []string{"http://localhost:1570"},
},
)
if err != nil {
panic(err)
Expand Down
10 changes: 6 additions & 4 deletions ingest/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ func Example_changes() {

// Requires Stellar-Core 13.2.0+
backend, err := ledgerbackend.NewCaptive(
"/bin/stellar-core",
"/opt/stellar-core.cfg",
networkPassphrase,
[]string{archiveURL},
ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: "/bin/stellar-core",
StellarCoreConfigPath: "/opt/stellar-core.cfg",
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: []string{archiveURL},
},
)
if err != nil {
panic(err)
Expand Down
49 changes: 41 additions & 8 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type CaptiveStellarCore struct {
networkPassphrase string
historyURLs []string
archive historyarchive.ArchiveInterface
ledgerHashStore TrustedLedgerHashStore

ledgerBuffer bufferedLedgerMetaReader

Expand Down Expand Up @@ -94,15 +95,29 @@ type CaptiveStellarCore struct {
log *log.Entry
}

// CaptiveCoreConfig contains all the parameters required to create a CaptiveStellarCore instance
type CaptiveCoreConfig struct {
// StellarCoreBinaryPath is the file path to the Stellar Core binary
StellarCoreBinaryPath string
// StellarCoreConfigPath is the file path to the Stellar Core configuration file used by captive core
StellarCoreConfigPath string
// NetworkPassphrase is the Stellar network passphrase used by captive core when connecting to the Stellar network
NetworkPassphrase string
// HistoryArchiveURLs are a list of history archive urls
HistoryArchiveURLs []string
// LedgerHashStore is an optional store used to obtain hashes for ledger sequences from a trusted source
LedgerHashStore TrustedLedgerHashStore
}

// NewCaptive returns a new CaptiveStellarCore.
//
// All parameters are required, except configPath which is not required when
// working with BoundedRanges only.
func NewCaptive(executablePath, configPath, networkPassphrase string, historyURLs []string) (*CaptiveStellarCore, error) {
func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
archive, err := historyarchive.Connect(
historyURLs[0],
config.HistoryArchiveURLs[0],
historyarchive.ConnectOptions{
NetworkPassphrase: networkPassphrase,
NetworkPassphrase: config.NetworkPassphrase,
},
)
if err != nil {
Expand All @@ -111,14 +126,20 @@ func NewCaptive(executablePath, configPath, networkPassphrase string, historyURL

c := &CaptiveStellarCore{
archive: archive,
executablePath: executablePath,
configPath: configPath,
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
executablePath: config.StellarCoreBinaryPath,
configPath: config.StellarCoreConfigPath,
historyURLs: config.HistoryArchiveURLs,
networkPassphrase: config.NetworkPassphrase,
waitIntervalPrepareRange: time.Second,
ledgerHashStore: config.LedgerHashStore,
}
c.stellarCoreRunnerFactory = func(configPath2 string) (stellarCoreRunnerInterface, error) {
runner, innerErr := newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
runner, innerErr := newStellarCoreRunner(
config.StellarCoreBinaryPath,
configPath2,
config.NetworkPassphrase,
config.HistoryArchiveURLs,
)
if innerErr != nil {
return runner, innerErr
}
Expand Down Expand Up @@ -296,6 +317,18 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH
}

runFrom = from - 1
if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(from)
if err != nil {
err = errors.Wrapf(err, "error trying to read ledger hash %d", from)
return
}
if exists {
return
}
}

ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
Expand Down
69 changes: 65 additions & 4 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ func TestCaptiveNew(t *testing.T) {
historyURLs := []string{"http://history.stellar.org/prd/core-live/core_live_001"}

captiveStellarCore, err := NewCaptive(
executablePath,
configPath,
networkPassphrase,
historyURLs,
CaptiveCoreConfig{
StellarCoreBinaryPath: executablePath,
StellarCoreConfigPath: configPath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyURLs,
},
)

assert.NoError(t, err)
Expand Down Expand Up @@ -836,6 +838,65 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) {
assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error")
}

func TestCaptiveUseOfLedgerHashStore(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetLedgerHeader", uint32(255)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1, 1, 1, 1},
},
}, nil)

mockLedgerHashStore := &MockLedgerHashStore{}
mockLedgerHashStore.On("GetLedgerHash", uint32(1023)).
Return("", false, fmt.Errorf("transient error")).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(255)).
Return("", false, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(63)).
Return("cde", true, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(127)).
Return("ghi", true, nil).Once()

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunner: mockRunner,
ledgerHashStore: mockLedgerHashStore,
}

runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24)
assert.NoError(t, err)
assert.Equal(t, uint32(2), runFrom)
assert.Equal(t, "", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86)
assert.NoError(t, err)
assert.Equal(t, uint32(62), runFrom)
assert.Equal(t, "cde", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(128)
assert.NoError(t, err)
assert.Equal(t, uint32(126), runFrom)
assert.Equal(t, "ghi", ledgerHash)
assert.Equal(t, uint32(64), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(1050)
assert.EqualError(t, err, "error trying to read ledger hash 1023: transient error")

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(300)
assert.NoError(t, err)
assert.Equal(t, uint32(254), runFrom, "runFrom")
assert.Equal(t, "0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
assert.Equal(t, uint32(192), nextLedger, "nextLedger")

mockLedgerHashStore.AssertExpectations(t)
mockArchive.AssertExpectations(t)
}

func TestCaptiveRunFromParams(t *testing.T) {
var tests = []struct {
from uint32
Expand Down
50 changes: 50 additions & 0 deletions ingest/ledgerbackend/ledger_hash_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ledgerbackend

import (
sq "github.com/Masterminds/squirrel"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/support/db"
)

// TrustedLedgerHashStore is used to query ledger data from a trusted source.
// The store should contain ledgers verified by Stellar-Core, do not use untrusted
// source like history archives.
type TrustedLedgerHashStore interface {
// GetLedgerHash returns the ledger hash for the given sequence number
GetLedgerHash(seq uint32) (string, bool, error)
}

// HorizonDBLedgerHashStore is a TrustedLedgerHashStore which uses horizon's db to look up ledger hashes
type HorizonDBLedgerHashStore struct {
session *db.Session
}

// NewHorizonDBLedgerHashStore constructs a new TrustedLedgerHashStore backed by the horizon db
func NewHorizonDBLedgerHashStore(session *db.Session) TrustedLedgerHashStore {
return HorizonDBLedgerHashStore{session: session}
}

// GetLedgerHash returns the ledger hash for the given sequence number
func (h HorizonDBLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error) {
sql := sq.Select("hl.ledger_hash").From("history_ledgers hl").
Limit(1).Where("sequence = ?", seq)

var hash string
err := h.session.Get(&hash, sql)
if h.session.NoRows(err) {
return hash, false, nil
}
return hash, true, err
}

// MockLedgerHashStore is a mock implementation of TrustedLedgerHashStore
type MockLedgerHashStore struct {
mock.Mock
}

// GetLedgerHash returns the ledger hash for the given sequence number
func (m *MockLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error) {
args := m.Called(seq)
return args.Get(0).(string), args.Get(1).(bool), args.Error(2)
}
11 changes: 11 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/guregu/null"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -57,6 +58,11 @@ func TestInsertLedger(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

ledgerHashStore := ledgerbackend.NewHorizonDBLedgerHashStore(tt.HorizonSession())
_, exists, err := ledgerHashStore.GetLedgerHash(100)
tt.Assert.NoError(err)
tt.Assert.False(exists)

expectedLedger := Ledger{
Sequence: 69859,
LedgerHash: "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
Expand Down Expand Up @@ -136,4 +142,9 @@ func TestInsertLedger(t *testing.T) {
expectedLedger.ClosedAt = ledgerFromDB.ClosedAt

tt.Assert.Equal(expectedLedger, ledgerFromDB)

hash, exists, err := ledgerHashStore.GetLedgerHash(uint32(expectedLedger.Sequence))
tt.Assert.NoError(err)
tt.Assert.True(exists)
tt.Assert.Equal(expectedLedger.LedgerHash, hash)
}
11 changes: 7 additions & 4 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,13 @@ func NewSystem(config Config) (System, error) {
} else {
var captiveCoreBackend *ledgerbackend.CaptiveStellarCore
captiveCoreBackend, err = ledgerbackend.NewCaptive(
config.StellarCoreBinaryPath,
config.StellarCoreConfigPath,
config.NetworkPassphrase,
[]string{config.HistoryArchiveURL},
ledgerbackend.CaptiveCoreConfig{
StellarCoreBinaryPath: config.StellarCoreBinaryPath,
StellarCoreConfigPath: config.StellarCoreConfigPath,
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURLs: []string{config.HistoryArchiveURL},
LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession),
},
)
if err != nil {
cancel()
Expand Down

0 comments on commit 21e076a

Please sign in to comment.