diff --git a/exp/ingest/io/change.go b/exp/ingest/io/change.go index 5a87301a4f..6d65cd7e74 100644 --- a/exp/ingest/io/change.go +++ b/exp/ingest/io/change.go @@ -20,6 +20,54 @@ type Change struct { Post *xdr.LedgerEntry } +// GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. +// Each `update` and `removed` is preceded with `state` and `create` changes +// are alone, without `state`. The transformation we're doing is to move each +// change (state/update, state/removed or create) to an array of pre/post pairs. +// Then: +// - for create, pre is null and post is a new entry, +// - for update, pre is previous state and post is the current state, +// - for removed, pre is previous state and post is null. +// +// stellar-core source: +// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582 +func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { + changes := []Change{} + + for i, entryChange := range ledgerEntryChanges { + switch entryChange.Type { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + created := entryChange.MustCreated() + changes = append(changes, Change{ + Type: created.Data.Type, + Pre: nil, + Post: &created, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + state := ledgerEntryChanges[i-1].MustState() + updated := entryChange.MustUpdated() + changes = append(changes, Change{ + Type: state.Data.Type, + Pre: &state, + Post: &updated, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + state := ledgerEntryChanges[i-1].MustState() + changes = append(changes, Change{ + Type: state.Data.Type, + Pre: &state, + Post: nil, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryState: + continue + default: + panic("Invalid LedgerEntryChangeType") + } + } + + return changes +} + // LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { switch { diff --git a/exp/ingest/io/change_reader.go b/exp/ingest/io/change_reader.go deleted file mode 100644 index 065df2989e..0000000000 --- a/exp/ingest/io/change_reader.go +++ /dev/null @@ -1,165 +0,0 @@ -package io - -import ( - "context" - "io" - - "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/xdr" -) - -// ChangeReader provides convenient, streaming access to a sequence of Changes -type ChangeReader interface { - // Read should return the next `Change` in the leader. If there are no more - // changes left it should return an `io.EOF` error. - Read() (Change, error) - // Close should be called when reading is finished. This is especially - // helpful when there are still some changes available so reader can stop - // streaming them. - Close() error -} - -// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core -// for a single ledger -type LedgerChangeReader struct { - dbReader DBLedgerReader - streamedFeeChanges bool - streamedMetaChanges bool - streamedUpgradeChanges bool - pending []Change - pendingIndex int -} - -// Ensure LedgerChangeReader implements ChangeReader -var _ ChangeReader = (*LedgerChangeReader)(nil) - -// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. -// Note that the returned LedgerChangeReader is not thread safe and should not be shared -// by multiple goroutines. -func NewLedgerChangeReader( - ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, -) (*LedgerChangeReader, error) { - reader, err := NewDBLedgerReader(ctx, sequence, backend) - if err != nil { - return nil, err - } - - return &LedgerChangeReader{dbReader: *reader}, nil -} - -// GetHeader returns the ledger header for the reader -func (r *LedgerChangeReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return r.dbReader.GetHeader() -} - -func (r *LedgerChangeReader) getNextFeeChange() (Change, error) { - if r.streamedFeeChanges { - return Change{}, io.EOF - } - - // Remember that it's possible that transaction can remove a preauth - // tx signer even when it's a failed transaction so we need to check - // failed transactions too. - for { - transaction, err := r.dbReader.Read() - if err != nil { - if err == io.EOF { - r.dbReader.rewind() - r.streamedFeeChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - changes := transaction.GetFeeChanges() - if len(changes) >= 1 { - r.pending = append(r.pending, changes[1:]...) - return changes[0], nil - } - } -} - -func (r *LedgerChangeReader) getNextMetaChange() (Change, error) { - if r.streamedMetaChanges { - return Change{}, io.EOF - } - - for { - transaction, err := r.dbReader.Read() - if err != nil { - if err == io.EOF { - r.streamedMetaChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - changes, err := transaction.GetChanges() - if err != nil { - return Change{}, err - } - if len(changes) >= 1 { - r.pending = append(r.pending, changes[1:]...) - return changes[0], nil - } - } -} - -func (r *LedgerChangeReader) getNextUpgradeChange() (Change, error) { - if r.streamedUpgradeChanges { - return Change{}, io.EOF - } - - change, err := r.dbReader.readUpgradeChange() - if err != nil { - if err == io.EOF { - r.streamedUpgradeChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - return change, nil -} - -// Read returns the next change in the stream. -// If there are no changes remaining io.EOF is returned -// as an error. -func (r *LedgerChangeReader) Read() (Change, error) { - if err := r.dbReader.ctx.Err(); err != nil { - return Change{}, err - } - - if r.pendingIndex < len(r.pending) { - next := r.pending[r.pendingIndex] - r.pendingIndex++ - if r.pendingIndex == len(r.pending) { - r.pendingIndex = 0 - r.pending = r.pending[:0] - } - return next, nil - } - - change, err := r.getNextFeeChange() - if err == nil || err != io.EOF { - return change, err - } - - change, err = r.getNextMetaChange() - if err == nil || err != io.EOF { - return change, err - } - - return r.getNextUpgradeChange() -} - -func (r *LedgerChangeReader) Close() error { - r.pending = nil - r.streamedFeeChanges = true - r.streamedMetaChanges = true - r.streamedUpgradeChanges = true - return r.dbReader.Close() -} diff --git a/exp/ingest/io/ledger_change_reader.go b/exp/ingest/io/ledger_change_reader.go new file mode 100644 index 0000000000..3879efaebe --- /dev/null +++ b/exp/ingest/io/ledger_change_reader.go @@ -0,0 +1,127 @@ +package io + +import ( + "io" + + "github.com/stellar/go/exp/ingest/ledgerbackend" +) + +// ChangeReader provides convenient, streaming access to a sequence of Changes. +type ChangeReader interface { + // Read should return the next `Change` in the leader. If there are no more + // changes left it should return an `io.EOF` error. + Read() (Change, error) + // Close should be called when reading is finished. This is especially + // helpful when there are still some changes available so reader can stop + // streaming them. + Close() error +} + +// ledgerChangeReaderState defines possible states of LedgerChangeReader. +type ledgerChangeReaderState int + +const ( + // feeChangesState is active when LedgerChangeReader is reading fee changes. + feeChangesState ledgerChangeReaderState = iota + // feeChangesState is active when LedgerChangeReader is reading transaction meta changes. + metaChangesState + // feeChangesState is active when LedgerChangeReader is reading upgrade changes. + upgradeChangesState +) + +// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core +// for a single ledger +type LedgerChangeReader struct { + *LedgerTransactionReader + state ledgerChangeReaderState + pending []Change + pendingIndex int + upgradeIndex int +} + +// Ensure LedgerChangeReader implements ChangeReader +var _ ChangeReader = (*LedgerChangeReader)(nil) + +// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. +// Note that the returned LedgerChangeReader is not thread safe and should not be shared +// by multiple goroutines. +func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerChangeReader, error) { + transactionReader, err := NewLedgerTransactionReader(backend, sequence) + if err != nil { + return nil, err + } + + return &LedgerChangeReader{ + LedgerTransactionReader: transactionReader, + state: feeChangesState, + }, nil +} + +// Read returns the next change in the stream. +// If there are no changes remaining io.EOF is returned as an error. +func (r *LedgerChangeReader) Read() (Change, error) { + // Changes within a ledger should be read in the following order: + // - fee changes of all transactions, + // - transaction meta changes of all transactions, + // - upgrade changes. + // Because a single transaction can introduce many changes we read all the + // changes from a single transaction and save them in r.pending. + // When Read() is called we stream pending changes first. We also call Read() + // recursively after adding some changes (what will return them from r.pending) + // to not duplicate the code. + if r.pendingIndex < len(r.pending) { + next := r.pending[r.pendingIndex] + r.pendingIndex++ + if r.pendingIndex == len(r.pending) { + r.pendingIndex = 0 + r.pending = r.pending[:0] + } + return next, nil + } + + switch r.state { + case feeChangesState, metaChangesState: + tx, err := r.LedgerTransactionReader.Read() + if err != nil { + if err == io.EOF { + // If done streaming fee changes rewind to stream meta changes + if r.state == feeChangesState { + r.LedgerTransactionReader.Rewind() + } + r.state++ + return r.Read() + } + return Change{}, err + } + + switch r.state { + case feeChangesState: + r.pending = append(r.pending, tx.GetFeeChanges()...) + case metaChangesState: + metaChanges, err := tx.GetChanges() + if err != nil { + return Change{}, err + } + r.pending = append(r.pending, metaChanges...) + } + return r.Read() + case upgradeChangesState: + // Get upgrade changes + if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta) { + changes := GetChangesFromLedgerEntryChanges( + r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta[r.upgradeIndex], + ) + r.pending = append(r.pending, changes...) + r.upgradeIndex++ + return r.Read() + } + } + + return Change{}, io.EOF +} + +// Close should be called when reading is finished. +func (r *LedgerChangeReader) Close() error { + r.pending = nil + return r.LedgerTransactionReader.Close() +} diff --git a/exp/ingest/io/ledger_change_reader_test.go b/exp/ingest/io/ledger_change_reader_test.go index 85cf27ee30..4fc58d6180 100644 --- a/exp/ingest/io/ledger_change_reader_test.go +++ b/exp/ingest/io/ledger_change_reader_test.go @@ -1,7 +1,6 @@ package io import ( - "context" "fmt" "io" "testing" @@ -26,11 +25,11 @@ func TestNewLedgerChangeReaderFails(t *testing.T) { ledgerbackend.LedgerCloseMeta{}, fmt.Errorf("ledger error"), ).Once() - _, err := NewLedgerChangeReader(context.Background(), seq, mock) + _, err := NewLedgerChangeReader(mock, seq) assert.EqualError( t, err, - "error reading ledger from backend: ledger error", + "error getting ledger from the backend: ledger error", ) } @@ -42,7 +41,7 @@ func TestNewLedgerChangeReaderLedgerDoesNotExist(t *testing.T) { ledgerbackend.LedgerCloseMeta{}, nil, ).Once() - _, err := NewLedgerChangeReader(context.Background(), seq, mock) + _, err := NewLedgerChangeReader(mock, seq) assert.Equal( t, err, @@ -69,7 +68,7 @@ func TestNewLedgerChangeReaderSucceeds(t *testing.T) { nil, ).Once() - reader, err := NewLedgerChangeReader(context.Background(), seq, mock) + reader, err := NewLedgerChangeReader(mock, seq) assert.NoError(t, err) assert.Equal(t, reader.GetHeader(), header) @@ -108,7 +107,7 @@ func assertChangesEqual( backend ledgerbackend.LedgerBackend, expected []balanceEntry, ) { - reader, err := NewLedgerChangeReader(context.Background(), sequence, backend) + reader, err := NewLedgerChangeReader(backend, sequence) assert.NoError(t, err) changes := []balanceEntry{} @@ -266,73 +265,3 @@ func TestLedgerChangeReaderOrder(t *testing.T) { assertChangesEqual(t, seq, mock, []balanceEntry{}) mock.AssertExpectations(t) } - -func TestLedgerChangeReaderContext(t *testing.T) { - mock := &ledgerbackend.MockDatabaseBackend{} - seq := uint32(123) - - ledger := ledgerbackend.LedgerCloseMeta{ - TransactionResult: []xdr.TransactionResultPair{ - xdr.TransactionResultPair{}, - xdr.TransactionResultPair{}, - }, - TransactionEnvelope: []xdr.TransactionEnvelope{ - xdr.TransactionEnvelope{}, - xdr.TransactionEnvelope{}, - }, - TransactionMeta: []xdr.TransactionMeta{ - xdr.TransactionMeta{ - V: 1, - V1: &xdr.TransactionMetaV1{ - Operations: []xdr.OperationMeta{}, - }, - }, - xdr.TransactionMeta{ - V: 1, - V1: &xdr.TransactionMetaV1{ - Operations: []xdr.OperationMeta{}, - }, - }, - }, - TransactionFeeChanges: []xdr.LedgerEntryChanges{ - xdr.LedgerEntryChanges{ - buildChange(feeAddress, 100), - }, - xdr.LedgerEntryChanges{ - buildChange(feeAddress, 300), - }, - }, - UpgradesMeta: []xdr.LedgerEntryChanges{ - xdr.LedgerEntryChanges{ - buildChange(upgradeAddress, 2), - }, - xdr.LedgerEntryChanges{ - buildChange(upgradeAddress, 3), - }, - }, - } - - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - ctx, cancel := context.WithCancel(context.Background()) - reader, err := NewLedgerChangeReader(ctx, seq, mock) - mock.AssertExpectations(t) - assert.NoError(t, err) - - cancel() - _, err = reader.Read() - assert.Equal(t, context.Canceled, err) - - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - ctx, cancel = context.WithCancel(context.Background()) - reader, err = NewLedgerChangeReader(ctx, seq, mock) - mock.AssertExpectations(t) - assert.NoError(t, err) - - change, err := reader.Read() - assert.Equal(t, balanceEntry{feeAddress, 100}, parseChange(change)) - assert.NoError(t, err) - - cancel() - _, err = reader.Read() - assert.Equal(t, context.Canceled, err) -} diff --git a/exp/ingest/io/ledger_reader.go b/exp/ingest/io/ledger_reader.go deleted file mode 100644 index 63bc271cd8..0000000000 --- a/exp/ingest/io/ledger_reader.go +++ /dev/null @@ -1,144 +0,0 @@ -package io - -import ( - "context" - "io" - - "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// LedgerReader provides convenient, streaming access to the transactions within a ledger. -type LedgerReader interface { - GetSequence() uint32 - GetHeader() xdr.LedgerHeaderHistoryEntry - // Read should return the next transaction. If there are no more - // transactions it should return `io.EOF` error. - Read() (LedgerTransaction, error) - // Close should be called when reading is finished. This is especially - // helpful when there are still some transactions available so reader can stop - // streaming them. - Close() error -} - -// DBLedgerReader is a database-backed implementation of the io.LedgerReader interface. -// Use NewDBLedgerReader to create a new instance. -type DBLedgerReader struct { - ctx context.Context - sequence uint32 - backend ledgerbackend.LedgerBackend - header xdr.LedgerHeaderHistoryEntry - transactions []LedgerTransaction - upgradeChanges []Change - readIdx int - upgradeReadIdx int -} - -// Ensure DBLedgerReader implements LedgerReader -var _ LedgerReader = (*DBLedgerReader)(nil) - -// NewDBLedgerReader creates a new DBLedgerReader instance. -// Note that DBLedgerReader is not thread safe and should not be shared by multiple goroutines -func NewDBLedgerReader( - ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, -) (*DBLedgerReader, error) { - reader := &DBLedgerReader{ - ctx: ctx, - sequence: sequence, - backend: backend, - } - - err := reader.init() - if err != nil { - return nil, err - } - - return reader, nil -} - -// GetSequence returns the sequence number of the ledger data stored by this object. -func (dblrc *DBLedgerReader) GetSequence() uint32 { - return dblrc.sequence -} - -// GetHeader returns the XDR Header data associated with the stored ledger. -func (dblrc *DBLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return dblrc.header -} - -// Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there -// are no more transactions to return, an EOF error is returned. -func (dblrc *DBLedgerReader) Read() (LedgerTransaction, error) { - if err := dblrc.ctx.Err(); err != nil { - return LedgerTransaction{}, err - } - - if dblrc.readIdx < len(dblrc.transactions) { - dblrc.readIdx++ - return dblrc.transactions[dblrc.readIdx-1], nil - } - return LedgerTransaction{}, io.EOF -} - -// readUpgradeChange returns the next upgrade change in the ledger, each time it -// is called. When there are no more upgrades to return, an EOF error is returned. -func (dblrc *DBLedgerReader) readUpgradeChange() (Change, error) { - if err := dblrc.ctx.Err(); err != nil { - return Change{}, err - } - - if dblrc.upgradeReadIdx < len(dblrc.upgradeChanges) { - dblrc.upgradeReadIdx++ - return dblrc.upgradeChanges[dblrc.upgradeReadIdx-1], nil - } - return Change{}, io.EOF -} - -// Rewind resets the reader back to the first transaction in the ledger -func (dblrc *DBLedgerReader) rewind() { - dblrc.readIdx = 0 -} - -// Init pulls data from the backend to set this object up for use. -func (dblrc *DBLedgerReader) init() error { - exists, ledgerCloseMeta, err := dblrc.backend.GetLedger(dblrc.sequence) - - if err != nil { - return errors.Wrap(err, "error reading ledger from backend") - } - if !exists { - return ErrNotFound - } - - dblrc.header = ledgerCloseMeta.LedgerHeader - - dblrc.storeTransactions(ledgerCloseMeta) - - for _, upgradeChanges := range ledgerCloseMeta.UpgradesMeta { - changes := getChangesFromLedgerEntryChanges(upgradeChanges) - dblrc.upgradeChanges = append(dblrc.upgradeChanges, changes...) - } - - return nil -} - -// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide -// a per-transaction view of the data when Read() is called. -func (dblrc *DBLedgerReader) storeTransactions(lcm ledgerbackend.LedgerCloseMeta) { - for i := range lcm.TransactionEnvelope { - dblrc.transactions = append(dblrc.transactions, LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: lcm.TransactionEnvelope[i], - Result: lcm.TransactionResult[i], - Meta: lcm.TransactionMeta[i], - FeeChanges: lcm.TransactionFeeChanges[i], - }) - } -} - -func (dblrc *DBLedgerReader) Close() error { - dblrc.transactions = nil - dblrc.upgradeChanges = nil - return nil -} diff --git a/exp/ingest/io/ledger_transaction.go b/exp/ingest/io/ledger_transaction.go index 0f222f4f8c..766d2afe67 100644 --- a/exp/ingest/io/ledger_transaction.go +++ b/exp/ingest/io/ledger_transaction.go @@ -24,7 +24,7 @@ func (t *LedgerTransaction) txInternalError() bool { // GetFeeChanges returns a developer friendly representation of LedgerEntryChanges // connected to fees. func (t *LedgerTransaction) GetFeeChanges() []Change { - return getChangesFromLedgerEntryChanges(t.FeeChanges) + return GetChangesFromLedgerEntryChanges(t.FeeChanges) } // GetChanges returns a developer friendly representation of LedgerEntryChanges. @@ -40,7 +40,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { return changes, errors.New("TransactionMeta.V=0 not supported") case 1: v1Meta := t.Meta.MustV1() - txChanges := getChangesFromLedgerEntryChanges(v1Meta.TxChanges) + txChanges := GetChangesFromLedgerEntryChanges(v1Meta.TxChanges) changes = append(changes, txChanges...) // Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111 @@ -49,7 +49,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { } for _, operationMeta := range v1Meta.Operations { - opChanges := getChangesFromLedgerEntryChanges( + opChanges := GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) changes = append(changes, opChanges...) @@ -57,7 +57,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { case 2: v2Meta := t.Meta.MustV2() - txChangesBefore := getChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore) + txChangesBefore := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore) changes = append(changes, txChangesBefore...) // Ignore operations meta and txChangesAfter if txInternalError @@ -67,13 +67,13 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { } for _, operationMeta := range v2Meta.Operations { - opChanges := getChangesFromLedgerEntryChanges( + opChanges := GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) changes = append(changes, opChanges...) } - txChangesAfter := getChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter) + txChangesAfter := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter) changes = append(changes, txChangesAfter...) default: return changes, errors.New("Unsupported TransactionMeta version") @@ -120,55 +120,7 @@ func operationChanges(ops []xdr.OperationMeta, index uint32) []Change { } operationMeta := ops[index] - return getChangesFromLedgerEntryChanges( + return GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) } - -// getChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. -// Each `update` and `removed` is preceded with `state` and `create` changes -// are alone, without `state`. The transformation we're doing is to move each -// change (state/update, state/removed or create) to an array of pre/post pairs. -// Then: -// - for create, pre is null and post is a new entry, -// - for update, pre is previous state and post is the current state, -// - for removed, pre is previous state and post is null. -// -// stellar-core source: -// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582 -func getChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { - changes := []Change{} - - for i, entryChange := range ledgerEntryChanges { - switch entryChange.Type { - case xdr.LedgerEntryChangeTypeLedgerEntryCreated: - created := entryChange.MustCreated() - changes = append(changes, Change{ - Type: created.Data.Type, - Pre: nil, - Post: &created, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: - state := ledgerEntryChanges[i-1].MustState() - updated := entryChange.MustUpdated() - changes = append(changes, Change{ - Type: state.Data.Type, - Pre: &state, - Post: &updated, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: - state := ledgerEntryChanges[i-1].MustState() - changes = append(changes, Change{ - Type: state.Data.Type, - Pre: &state, - Post: nil, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryState: - continue - default: - panic("Invalid LedgerEntryChangeType") - } - } - - return changes -} diff --git a/exp/ingest/io/ledger_transaction_reader.go b/exp/ingest/io/ledger_transaction_reader.go new file mode 100644 index 0000000000..534ca86a72 --- /dev/null +++ b/exp/ingest/io/ledger_transaction_reader.go @@ -0,0 +1,81 @@ +package io + +import ( + "io" + + "github.com/stellar/go/exp/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// LedgerTransactionReader reads transactions for a given ledger sequence from a backend. +// Use NewTransactionReader to create a new instance. +type LedgerTransactionReader struct { + ledgerCloseMeta ledgerbackend.LedgerCloseMeta + transactions []LedgerTransaction + readIdx int +} + +// NewLedgerTransactionReader creates a new TransactionReader instance. +// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines +func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerTransactionReader, error) { + exists, ledgerCloseMeta, err := backend.GetLedger(sequence) + if err != nil { + return nil, errors.Wrap(err, "error getting ledger from the backend") + } + + if !exists { + return nil, ErrNotFound + } + + reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta} + reader.storeTransactions(ledgerCloseMeta) + return reader, nil +} + +// GetSequence returns the sequence number of the ledger data stored by this object. +func (reader *LedgerTransactionReader) GetSequence() uint32 { + return uint32(reader.ledgerCloseMeta.LedgerHeader.Header.LedgerSeq) +} + +// GetHeader returns the XDR Header data associated with the stored ledger. +func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { + return reader.ledgerCloseMeta.LedgerHeader +} + +// Read returns the next transaction in the ledger, ordered by tx number, each time +// it is called. When there are no more transactions to return, an EOF error is returned. +func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { + if reader.readIdx < len(reader.transactions) { + reader.readIdx++ + return reader.transactions[reader.readIdx-1], nil + } + return LedgerTransaction{}, io.EOF +} + +// Rewind resets the reader back to the first transaction in the ledger +func (reader *LedgerTransactionReader) Rewind() { + reader.readIdx = 0 +} + +// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide +// a per-transaction view of the data when Read() is called. +func (reader *LedgerTransactionReader) storeTransactions(lcm ledgerbackend.LedgerCloseMeta) { + for i := range lcm.TransactionEnvelope { + reader.transactions = append(reader.transactions, LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: lcm.TransactionEnvelope[i], + Result: lcm.TransactionResult[i], + Meta: lcm.TransactionMeta[i], + FeeChanges: lcm.TransactionFeeChanges[i], + }) + } +} + +// Close should be called when reading is finished. This is especially +// helpful when there are still some transactions available so reader can stop +// streaming them. +func (reader *LedgerTransactionReader) Close() error { + reader.transactions = nil + return nil +} diff --git a/exp/ingest/io/mock_ledger_reader.go b/exp/ingest/io/mock_ledger_reader.go deleted file mode 100644 index aa05bde538..0000000000 --- a/exp/ingest/io/mock_ledger_reader.go +++ /dev/null @@ -1,46 +0,0 @@ -package io - -import ( - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" -) - -var _ LedgerReader = (*MockLedgerReader)(nil) - -type MockLedgerReader struct { - mock.Mock -} - -func (m *MockLedgerReader) GetSequence() uint32 { - args := m.Called() - return args.Get(0).(uint32) -} - -func (m *MockLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - args := m.Called() - return args.Get(0).(xdr.LedgerHeaderHistoryEntry) -} - -func (m *MockLedgerReader) Read() (LedgerTransaction, error) { - args := m.Called() - return args.Get(0).(LedgerTransaction), args.Error(1) -} - -func (m *MockLedgerReader) ReadUpgradeChange() (Change, error) { - args := m.Called() - return args.Get(0).(Change), args.Error(1) -} - -func (m *MockLedgerReader) GetUpgradeChanges() []Change { - args := m.Called() - return args.Get(0).([]Change) -} - -func (m *MockLedgerReader) IgnoreUpgradeChanges() { - m.Called() -} - -func (m *MockLedgerReader) Close() error { - args := m.Called() - return args.Error(0) -} diff --git a/exp/ingest/io/mock_ledger_transaction_processor.go b/exp/ingest/io/mock_ledger_transaction_processor.go deleted file mode 100644 index 4834682cc5..0000000000 --- a/exp/ingest/io/mock_ledger_transaction_processor.go +++ /dev/null @@ -1,14 +0,0 @@ -package io - -import "github.com/stretchr/testify/mock" - -var _ LedgerTransactionProcessor = (*MockLedgerTransactionProcessor)(nil) - -type MockLedgerTransactionProcessor struct { - mock.Mock -} - -func (m *MockLedgerTransactionProcessor) ProcessTransaction(transaction LedgerTransaction) error { - args := m.Called(transaction) - return args.Error(0) -} diff --git a/exp/ingest/io/processors.go b/exp/ingest/io/processors.go index 6f5271370e..ee4f2e81e1 100644 --- a/exp/ingest/io/processors.go +++ b/exp/ingest/io/processors.go @@ -16,7 +16,7 @@ type LedgerTransactionProcessor interface { func StreamLedgerTransactions( txProcessor LedgerTransactionProcessor, - reader LedgerReader, + reader *LedgerTransactionReader, ) error { for { tx, err := reader.Read() diff --git a/exp/ingest/io/single_ledger_state_reader_test.go b/exp/ingest/io/single_ledger_state_reader_test.go index 3ca82be580..25f0209883 100644 --- a/exp/ingest/io/single_ledger_state_reader_test.go +++ b/exp/ingest/io/single_ledger_state_reader_test.go @@ -607,7 +607,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsWithDiscard() { secondEntry := metaEntry(2) b := &bytes.Buffer{} - s.Require().NoError(historyarchive.WriteFramedXdr(b, firstEntry)) + s.Require().NoError(xdr.MarshalFramed(b, firstEntry)) writeInvalidFrame(b) s.mockArchive. @@ -643,7 +643,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryFailsWithDiscardError() { firstEntry := metaEntry(1) b := &bytes.Buffer{} - s.Require().NoError(historyarchive.WriteFramedXdr(b, firstEntry)) + s.Require().NoError(xdr.MarshalFramed(b, firstEntry)) writeInvalidFrame(b) s.mockArchive. @@ -679,7 +679,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsAfterDiscardError() secondEntry := metaEntry(2) b := &bytes.Buffer{} - s.Require().NoError(historyarchive.WriteFramedXdr(b, firstEntry)) + s.Require().NoError(xdr.MarshalFramed(b, firstEntry)) writeInvalidFrame(b) s.mockArchive. @@ -765,7 +765,7 @@ func createInvalidXdrStream(closeError error) *historyarchive.XdrStream { func writeInvalidFrame(b *bytes.Buffer) { bufferSize := b.Len() - err := historyarchive.WriteFramedXdr(b, metaEntry(1)) + err := xdr.MarshalFramed(b, metaEntry(1)) if err != nil { panic(err) } @@ -776,7 +776,7 @@ func writeInvalidFrame(b *bytes.Buffer) { func createXdrStream(entries ...xdr.BucketEntry) *historyarchive.XdrStream { b := &bytes.Buffer{} for _, e := range entries { - err := historyarchive.WriteFramedXdr(b, e) + err := xdr.MarshalFramed(b, e) if err != nil { panic(err) } diff --git a/exp/ingest/ledgerbackend/captive_core_backend.go b/exp/ingest/ledgerbackend/captive_core_backend.go new file mode 100644 index 0000000000..9a9a8ff620 --- /dev/null +++ b/exp/ingest/ledgerbackend/captive_core_backend.go @@ -0,0 +1,395 @@ +package ledgerbackend + +import ( + "io" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/stellar/go/network" + "github.com/stellar/go/support/historyarchive" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" +) + +// Ensure captiveStellarCore implements LedgerBackend +var _ LedgerBackend = (*captiveStellarCore)(nil) + +// This is a not-very-complete or well-organized sketch of code be used to +// stream LedgerCloseMeta data from a "captive" stellar-core: one running as a +// subprocess and replaying portions of history against an in-memory ledger. +// +// A captive stellar-core still needs (and allocates, in os.TempDir()) a +// temporary directory to run in: one in which its config file is stored, along +// with temporary files it downloads and decompresses, and its bucket +// state. Only the ledger will be in-memory (and we might even switch this to +// SQLite + large buffers in the future if the in-memory ledger gets too big.) +// +// Feel free to reorganize this to fit better. It's preliminary! + +// TODO: switch from history URLs to history archive interface provided from support package, to permit mocking + +const ( + // In this (crude, initial) sketch, we replay ledgers in blocks of 17,280 + // which is 24 hours worth of ledgers at 5 second intervals. + ledgersPerProcess = 17280 + ledgersPerCheckpoint = 64 + + // The number of checkpoints we're willing to scan over and ignore, without + // restarting a subprocess. + numCheckpointsLeeway = 10 + + readAheadBufferSize = 2 +) + +func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { + v := (ledger / ledgersPerCheckpoint) * ledgersPerCheckpoint + if v == 0 { + return 1 + } + // All other checkpoints start at the next multiple of 64 + return v +} + +type metaResult struct { + *xdr.LedgerCloseMeta + err error +} + +type captiveStellarCore struct { + networkPassphrase string + historyURLs []string + lastLedger *uint32 // end of current segment if offline, nil if online + + // read-ahead buffer + stop chan struct{} + wait sync.WaitGroup + metaC chan metaResult + + stellarCoreRunner stellarCoreRunnerInterface + + nextLedgerMutex sync.Mutex + nextLedger uint32 // next ledger expected, error w/ restart if not seen +} + +// NewCaptive returns a new captiveStellarCore that is not running. Will lazily start a subprocess +// to feed it a block of streaming metadata when user calls .GetLedger(), and will kill +// and restart the subprocess if subsequent calls to .GetLedger() are discontiguous. +// +// Platform-specific pipe setup logic is in the .start() methods. +func NewCaptive(executablePath, networkPassphrase string, historyURLs []string) *captiveStellarCore { + return &captiveStellarCore{ + networkPassphrase: networkPassphrase, + historyURLs: historyURLs, + nextLedger: 0, + stellarCoreRunner: &stellarCoreRunner{ + executablePath: executablePath, + networkPassphrase: networkPassphrase, + historyURLs: historyURLs, + }, + } +} + +// Each captiveStellarCore is either doing bulk offline replay or tracking +// a network as it closes ledgers online. These cases are differentiated +// by the lastLedger field of captiveStellarCore, which is nil in the online +// case (indicating there's no end to the subprocess) and non-nil in the +// offline case (indicating that the subprocess will be closed after it yields +// the last ledger in the segment). +func (c *captiveStellarCore) IsInOfflineReplayMode() bool { + return c.lastLedger != nil +} + +func (c *captiveStellarCore) IsInOnlineTrackingMode() bool { + return c.lastLedger == nil +} + +// Returns the sequence number of an LCM, returning an error if the LCM is of +// an unknown version. +func peekLedgerSequence(xlcm *xdr.LedgerCloseMeta) (uint32, error) { + v0, ok := xlcm.GetV0() + if !ok { + err := errors.New("unexpected XDR LedgerCloseMeta version") + return 0, err + } + return uint32(v0.LedgerHeader.Header.LedgerSeq), nil +} + +// Note: the xdr.LedgerCloseMeta structure is _not_ the same as +// the ledgerbackend.LedgerCloseMeta structure; the latter should +// probably migrate to the former eventually. +func (c *captiveStellarCore) copyLedgerCloseMeta(xlcm *xdr.LedgerCloseMeta, lcm *LedgerCloseMeta) error { + v0, ok := xlcm.GetV0() + if !ok { + return errors.New("unexpected XDR LedgerCloseMeta version") + } + lcm.LedgerHeader = v0.LedgerHeader + envelopes := make(map[xdr.Hash]xdr.TransactionEnvelope) + for _, tx := range v0.TxSet.Txs { + hash, e := network.HashTransactionInEnvelope(tx, c.networkPassphrase) + if e != nil { + return errors.Wrap(e, "error hashing tx in LedgerCloseMeta") + } + envelopes[hash] = tx + } + for _, trm := range v0.TxProcessing { + txe, ok := envelopes[trm.Result.TransactionHash] + if !ok { + return errors.New("unknown tx hash in LedgerCloseMeta") + } + lcm.TransactionEnvelope = append(lcm.TransactionEnvelope, txe) + lcm.TransactionResult = append(lcm.TransactionResult, trm.Result) + lcm.TransactionMeta = append(lcm.TransactionMeta, trm.TxApplyProcessing) + lcm.TransactionFeeChanges = append(lcm.TransactionFeeChanges, trm.FeeProcessing) + } + for _, urm := range v0.UpgradesProcessing { + lcm.UpgradesMeta = append(lcm.UpgradesMeta, urm.Changes) + } + return nil +} + +func (c *captiveStellarCore) openOfflineReplaySubprocess(nextLedger, lastLedger uint32) error { + c.Close() + maxLedger, e := c.GetLatestLedgerSequence() + if e != nil { + return errors.Wrap(e, "getting latest ledger sequence") + } + if nextLedger > maxLedger { + err := errors.Errorf("sequence %d greater than max available %d", + nextLedger, maxLedger) + return err + } + if lastLedger > maxLedger { + lastLedger = maxLedger + } + + err := c.stellarCoreRunner.run(nextLedger, lastLedger) + if err != nil { + return errors.Wrap(err, "error running stellar-core") + } + + // The next ledger should be the first ledger of the checkpoint containing + // the requested ledger + c.nextLedgerMutex.Lock() + c.nextLedger = roundDownToFirstReplayAfterCheckpointStart(nextLedger) + c.nextLedgerMutex.Unlock() + c.lastLedger = &lastLedger + + // read-ahead buffer + c.metaC = make(chan metaResult, readAheadBufferSize) + c.stop = make(chan struct{}) + c.wait.Add(1) + go c.sendLedgerMeta(lastLedger) + return nil +} + +// sendLedgerMeta reads from the captive core pipe, decodes the ledger metadata +// and sends it to the metadata buffered channel +func (c *captiveStellarCore) sendLedgerMeta(untilSequence uint32) { + defer c.wait.Done() + printBufferOccupation := time.NewTicker(5 * time.Second) + defer printBufferOccupation.Stop() + for { + select { + case <-c.stop: + return + case <-printBufferOccupation.C: + log.Debug("captive core read-ahead buffer occupation:", len(c.metaC)) + default: + } + meta, err := c.readLedgerMetaFromPipe() + if err != nil { + select { + case <-c.stop: + case c.metaC <- metaResult{nil, err}: + } + return + } + select { + case <-c.stop: + return + case c.metaC <- metaResult{meta, nil}: + } + seq, err := peekLedgerSequence(meta) + if err != nil { + select { + case <-c.stop: + case c.metaC <- metaResult{nil, err}: + } + return + } + if seq >= untilSequence { + // we are done + return + } + } +} + +func (c *captiveStellarCore) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { + metaPipe := c.stellarCoreRunner.getMetaPipe() + if metaPipe == nil { + return nil, errors.New("missing metadata pipe") + } + var xlcm xdr.LedgerCloseMeta + _, e0 := xdr.UnmarshalFramed(metaPipe, &xlcm) + if e0 != nil { + if e0 == io.EOF { + return nil, errors.Wrap(e0, "got EOF from subprocess") + } else { + return nil, errors.Wrap(e0, "unmarshalling framed LedgerCloseMeta") + } + } + return &xlcm, nil +} + +func (c *captiveStellarCore) PrepareRange(from uint32, to uint32) error { + // `from-1` here because being able to read ledger `from-1` is a confirmation + // that the range is ready. This effectively makes getting ledger #1 impossible. + // TODO: should be replaced with by a tee reader with buffer or similar in the + // later stage of development. + if e := c.openOfflineReplaySubprocess(from-1, to); e != nil { + return errors.Wrap(e, "opening subprocess") + } + + if c.stellarCoreRunner.getMetaPipe() == nil { + return errors.New("missing metadata pipe") + } + + _, _, err := c.GetLedger(from - 1) + if err != nil { + return errors.Wrap(err, "opening getting ledger `from-1`") + } + + return nil +} + +// We assume that we'll be called repeatedly asking for ledgers in ascending +// order, so when asked for ledger 23 we start a subprocess doing catchup +// "100023/100000", which should replay 23, 24, 25, ... 100023. The wrinkle in +// this is that core will actually replay from the _checkpoint before_ +// the implicit start ledger, so we might need to skip a few ledgers until +// we hit the one requested (this routine does so transparently if needed). +func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, error) { + // First, if we're open but out of range for the request, close. + if !c.IsClosed() && !c.LedgerWithinCheckpoints(sequence, numCheckpointsLeeway) { + c.Close() + } + + // Next, if we're closed, open. + if c.IsClosed() { + if e := c.openOfflineReplaySubprocess(sequence, sequence+ledgersPerProcess); e != nil { + return false, LedgerCloseMeta{}, errors.Wrap(e, "opening subprocess") + } + } + + // Check that we're where we expect to be: in range ... + if !c.LedgerWithinCheckpoints(sequence, 1) { + return false, LedgerCloseMeta{}, errors.New("unexpected subprocess next-ledger") + } + + // Now loop along the range until we find the ledger we want. + var errOut error +loop: + for { + metaResult := <-c.metaC + if metaResult.err != nil { + errOut = metaResult.err + break loop + } + + seq, e1 := peekLedgerSequence(metaResult.LedgerCloseMeta) + if e1 != nil { + errOut = e1 + break + } + c.nextLedgerMutex.Lock() + if seq != c.nextLedger { + // We got something unexpected; close and reset + errOut = errors.Errorf("unexpected ledger (expected=%d actual=%d)", c.nextLedger, seq) + c.nextLedgerMutex.Unlock() + break + } + c.nextLedger++ + c.nextLedgerMutex.Unlock() + if seq == sequence { + // Found the requested seq + var lcm LedgerCloseMeta + e2 := c.copyLedgerCloseMeta(metaResult.LedgerCloseMeta, &lcm) + if e2 != nil { + errOut = e2 + break + } + // If we got the _last_ ledger in a segment, close before returning. + if c.lastLedger != nil && *c.lastLedger == seq { + c.Close() + } + return true, lcm, nil + } + } + // All paths above that break out of the loop (instead of return) + // set e to non-nil: there was an error and we should close and + // reset state before retuning an error to our caller. + c.Close() + return false, LedgerCloseMeta{}, errOut +} + +func (c *captiveStellarCore) GetLatestLedgerSequence() (uint32, error) { + archive, e := historyarchive.Connect( + c.historyURLs[0], + historyarchive.ConnectOptions{}, + ) + if e != nil { + return 0, e + } + has, e := archive.GetRootHAS() + if e != nil { + return 0, e + } + return has.CurrentLedger, nil +} + +// LedgerWithinCheckpoints returns true if a given ledger is after the next ledger to be read +// from a given subprocess (so ledger will be read eventually) and no more +// than numCheckpoints checkpoints ahead of the next ledger to be read +// (so it will not be too long before ledger is read). +func (c *captiveStellarCore) LedgerWithinCheckpoints(ledger uint32, numCheckpoints uint32) bool { + return ((c.nextLedger <= ledger) && + (ledger <= (c.nextLedger + (numCheckpoints * ledgersPerCheckpoint)))) +} + +func (c *captiveStellarCore) IsClosed() bool { + c.nextLedgerMutex.Lock() + defer c.nextLedgerMutex.Unlock() + return c.nextLedger == 0 +} + +func (c *captiveStellarCore) Close() error { + if c.IsClosed() { + return nil + } + c.nextLedgerMutex.Lock() + c.nextLedger = 0 + c.nextLedgerMutex.Unlock() + + if c.stop != nil { + close(c.stop) + // discard pending data in case the goroutine is blocked writing to the channel + select { + case <-c.metaC: + default: + } + // Do not close the communication channel until we know + // the goroutine is done + c.wait.Wait() + close(c.metaC) + } + + c.lastLedger = nil + + err := c.stellarCoreRunner.close() + if err != nil { + return errors.Wrap(err, "error closing stellar-core subprocess") + } + return nil +} diff --git a/exp/ingest/ledgerbackend/captive_core_backend_test.go b/exp/ingest/ledgerbackend/captive_core_backend_test.go new file mode 100644 index 0000000000..5e66b09ed7 --- /dev/null +++ b/exp/ingest/ledgerbackend/captive_core_backend_test.go @@ -0,0 +1,138 @@ +package ledgerbackend + +import ( + "bytes" + "encoding/hex" + "io" + "testing" + + "github.com/stellar/go/network" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// TODO: test frame decoding +// TODO: test from static base64-encoded data + +type stellarCoreRunnerMock struct { + mock.Mock +} + +func (m *stellarCoreRunnerMock) run(from, to uint32) error { + a := m.Called(from, to) + return a.Error(0) +} + +func (m *stellarCoreRunnerMock) getMetaPipe() io.Reader { + a := m.Called() + return a.Get(0).(io.Reader) +} + +func (m *stellarCoreRunnerMock) close() error { + a := m.Called() + return a.Error(0) +} + +func writeLedgerHeader(w io.Writer, sequence uint32) error { + opResults := []xdr.OperationResult{} + opMeta := []xdr.OperationMeta{} + + tmpHash, _ := hex.DecodeString("cde54da3901f5b9c0331d24fbb06ac9c5c5de76de9fb2d4a7b86c09e46f11d8c") + var hash [32]byte + copy(hash[:], tmpHash) + + ledgerCloseMeta := xdr.LedgerCloseMeta{ + V: 0, + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(sequence), + }, + }, + TxSet: xdr.TransactionSet{ + Txs: []xdr.TransactionEnvelope{ + { + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAccountAddress("GAEJJMDDCRYF752PKIJICUVL7MROJBNXDV2ZB455T7BAFHU2LCLSE2LW"), + Fee: xdr.Uint32(sequence), + }, + }, + }, + }, + }, + TxProcessing: []xdr.TransactionResultMeta{ + { + Result: xdr.TransactionResultPair{ + TransactionHash: xdr.Hash(hash), + Result: xdr.TransactionResult{ + FeeCharged: xdr.Int64(sequence), + Result: xdr.TransactionResultResult{ + Code: xdr.TransactionResultCodeTxSuccess, + Results: &opResults, + }, + }, + }, + TxApplyProcessing: xdr.TransactionMeta{ + Operations: &opMeta, + }, + }, + }, + }, + } + + return xdr.MarshalFramed(w, ledgerCloseMeta) +} + +func TestCaptiveNew(t *testing.T) { + executablePath := "/etc/stellar-core" + networkPassphrase := network.PublicNetworkPassphrase + historyURLs := []string{"http://history.stellar.org/prd/core-live/core_live_001"} + + captiveStellarCore := NewCaptive( + executablePath, + networkPassphrase, + historyURLs, + ) + + assert.Equal(t, networkPassphrase, captiveStellarCore.networkPassphrase) + assert.Equal(t, historyURLs, captiveStellarCore.historyURLs) + assert.Equal(t, uint32(0), captiveStellarCore.nextLedger) + + assert.Equal(t, executablePath, captiveStellarCore.stellarCoreRunner.(*stellarCoreRunner).executablePath) + assert.Equal(t, networkPassphrase, captiveStellarCore.stellarCoreRunner.(*stellarCoreRunner).networkPassphrase) + assert.Equal(t, historyURLs, captiveStellarCore.stellarCoreRunner.(*stellarCoreRunner).historyURLs) +} + +func TestCaptivePrepareRange(t *testing.T) { + var buf bytes.Buffer + + // Core will actually start with the last checkpoint before the from ledger + // and then rewind to the `from` ledger. + for i := 64; i <= 99; i++ { + err := writeLedgerHeader(&buf, uint32(i)) + require.NoError(t, err) + } + + mockRunner := &stellarCoreRunnerMock{} + // We prepare [from-1, to] range because it's not possible to rewind the reader + // and there is no other way to check if stellar-core has built the state without + // reading actual ledger. + mockRunner.On("run", uint32(99), uint32(200)).Return(nil).Once() + mockRunner.On("getMetaPipe").Return(&buf) + mockRunner.On("close").Return(nil).Once() + + captiveBackend := captiveStellarCore{ + networkPassphrase: network.PublicNetworkPassphrase, + historyURLs: []string{"http://history.stellar.org/prd/core-live/core_live_001"}, + stellarCoreRunner: mockRunner, + } + + err := captiveBackend.PrepareRange(100, 200) + assert.NoError(t, err) + err = captiveBackend.Close() + assert.NoError(t, err) +} diff --git a/exp/ingest/ledgerbackend/database_backend.go b/exp/ingest/ledgerbackend/database_backend.go index 40f8d3620f..ac5059d70d 100644 --- a/exp/ingest/ledgerbackend/database_backend.go +++ b/exp/ingest/ledgerbackend/database_backend.go @@ -39,6 +39,28 @@ func NewDatabaseBackendFromSession(session *db.Session) (*DatabaseBackend, error return &DatabaseBackend{session: session}, nil } +func (dbb *DatabaseBackend) PrepareRange(from uint32, to uint32) error { + fromExists, _, err := dbb.GetLedger(from) + if err != nil { + return errors.Wrap(err, "error getting ledger") + } + + if !fromExists { + return errors.New("`from` ledger does not exist") + } + + toExists, _, err := dbb.GetLedger(to) + if err != nil { + return errors.Wrap(err, "error getting ledger") + } + + if !toExists { + return errors.New("`to` ledger does not exist") + } + + return nil +} + // GetLatestLedgerSequence returns the most recent ledger sequence number present in the database. func (dbb *DatabaseBackend) GetLatestLedgerSequence() (uint32, error) { var ledger []ledgerHeader diff --git a/exp/ingest/ledgerbackend/ledger_backend.go b/exp/ingest/ledgerbackend/ledger_backend.go index 22faf4be61..03092b0f2d 100644 --- a/exp/ingest/ledgerbackend/ledger_backend.go +++ b/exp/ingest/ledgerbackend/ledger_backend.go @@ -1,12 +1,18 @@ package ledgerbackend -import "github.com/stellar/go/xdr" +import ( + "github.com/stellar/go/xdr" +) // LedgerBackend represents the interface to a ledger data store. type LedgerBackend interface { GetLatestLedgerSequence() (sequence uint32, err error) // The first returned value is false when the ledger does not exist in a backend. GetLedger(sequence uint32) (bool, LedgerCloseMeta, error) + // Prepares the given range (including from and to) to be loaded. Some backends + // (like captive stellar-core) need to process data before being able to stream + // ledgers. + PrepareRange(from uint32, to uint32) error Close() error } diff --git a/exp/ingest/ledgerbackend/mock_database_backend.go b/exp/ingest/ledgerbackend/mock_database_backend.go index 9a370cc972..5edd0f5b4f 100644 --- a/exp/ingest/ledgerbackend/mock_database_backend.go +++ b/exp/ingest/ledgerbackend/mock_database_backend.go @@ -15,6 +15,11 @@ func (m *MockDatabaseBackend) GetLatestLedgerSequence() (uint32, error) { return args.Get(0).(uint32), args.Error(1) } +func (m *MockDatabaseBackend) PrepareRange(from uint32, to uint32) error { + args := m.Called(from, to) + return args.Error(1) +} + func (m *MockDatabaseBackend) GetLedger(sequence uint32) (bool, LedgerCloseMeta, error) { args := m.Called(sequence) return args.Bool(0), args.Get(1).(LedgerCloseMeta), args.Error(2) diff --git a/exp/ingest/ledgerbackend/stellar_core_runner.go b/exp/ingest/ledgerbackend/stellar_core_runner.go new file mode 100644 index 0000000000..0fc407e11d --- /dev/null +++ b/exp/ingest/ledgerbackend/stellar_core_runner.go @@ -0,0 +1,143 @@ +package ledgerbackend + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/pkg/errors" +) + +type stellarCoreRunnerInterface interface { + run(from, to uint32) error + getMetaPipe() io.Reader + close() error +} + +type stellarCoreRunner struct { + executablePath string + networkPassphrase string + historyURLs []string + + cmd *exec.Cmd + metaPipe io.Reader + tempDir string +} + +func (r *stellarCoreRunner) getConf() string { + lines := []string{ + "# Generated file -- do not edit", + "RUN_STANDALONE=true", + "NODE_IS_VALIDATOR=false", + "DISABLE_XDR_FSYNC=true", + "UNSAFE_QUORUM=true", + fmt.Sprintf(`NETWORK_PASSPHRASE="%s"`, r.networkPassphrase), + fmt.Sprintf(`BUCKET_DIR_PATH="%s"`, filepath.Join(r.getTmpDir(), "buckets")), + fmt.Sprintf(`METADATA_OUTPUT_STREAM="%s"`, r.getPipeName()), + } + for i, val := range r.historyURLs { + lines = append(lines, fmt.Sprintf("[HISTORY.h%d]", i)) + lines = append(lines, fmt.Sprintf(`get="curl -sf %s/{0} -o {1}"`, val)) + } + // Add a fictional quorum -- necessary to convince core to start up; + // but not used at all for our purposes. Pubkey here is just random. + lines = append(lines, + "[QUORUM_SET]", + "THRESHOLD_PERCENT=100", + `VALIDATORS=["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]`) + return strings.ReplaceAll(strings.Join(lines, "\n"), "\\", "\\\\") +} + +func (r *stellarCoreRunner) getConfFileName() string { + return filepath.Join(r.getTmpDir(), "stellar-core.conf") +} + +func (*stellarCoreRunner) GetLogLineWriter() io.Writer { + r, w := io.Pipe() + br := bufio.NewReader(r) + // Strip timestamps from log lines from captive stellar-core. We emit our own. + dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) + go func() { + for { + line, e := br.ReadString('\n') + if e != nil { + break + } + line = dateRx.ReplaceAllString(line, "") + fmt.Print(line) + } + }() + return w +} + +func (r *stellarCoreRunner) getTmpDir() string { + if r.tempDir != "" { + return r.tempDir + } + random := rand.New(rand.NewSource(time.Now().UnixNano())) + r.tempDir = filepath.Join(os.TempDir(), fmt.Sprintf("captive-stellar-core-%x", random.Uint64())) + return r.tempDir +} + +// Makes the temp directory and writes the config file to it; called by the +// platform-specific captiveStellarCore.Start() methods. +func (r *stellarCoreRunner) writeConf() error { + dir := r.getTmpDir() + e := os.MkdirAll(dir, 0755) + if e != nil { + return errors.Wrap(e, "error creating subprocess tmpdir") + } + conf := r.getConf() + return ioutil.WriteFile(r.getConfFileName(), []byte(conf), 0644) +} + +func (r *stellarCoreRunner) run(from, to uint32) error { + err := r.writeConf() + if err != nil { + return errors.Wrap(err, "error writing configuration") + } + + rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) + args := []string{"--conf", r.getConfFileName(), "catchup", rangeArg, "--replay-in-memory"} + cmd := exec.Command(r.executablePath, args...) + cmd.Dir = r.getTmpDir() + // In order to get the full stellar core logs: + // cmd.Stdout = r.GetLogLineWriter() + cmd.Stderr = cmd.Stdout + r.cmd = cmd + err = r.start() + if err != nil { + return errors.Wrap(err, "error starting stellar-core subprocess") + } + return nil +} + +func (r *stellarCoreRunner) getMetaPipe() io.Reader { + return r.metaPipe +} + +func (r *stellarCoreRunner) close() error { + var err1, err2 error + + if r.processIsAlive() { + err1 = r.cmd.Process.Kill() + r.cmd.Wait() + r.cmd = nil + } + err2 = os.RemoveAll(r.getTmpDir()) + if err1 != nil { + return errors.Wrap(err1, "error killing subprocess") + } + if err2 != nil { + return errors.Wrap(err2, "error removing subprocess tmpdir") + } + return nil +} diff --git a/exp/ingest/ledgerbackend/stellar_core_runner_posix.go b/exp/ingest/ledgerbackend/stellar_core_runner_posix.go new file mode 100644 index 0000000000..f0f80e8859 --- /dev/null +++ b/exp/ingest/ledgerbackend/stellar_core_runner_posix.go @@ -0,0 +1,64 @@ +// +build !windows + +package ledgerbackend + +import ( + "os" + "syscall" + + "github.com/pkg/errors" +) + +// Posix-specific methods for the stellarCoreRunner type. + +func (c *stellarCoreRunner) getPipeName() string { + // The exec.Cmd.ExtraFiles field carries *io.File values that are assigned + // to child process fds counting from 3, and we'll be passing exactly one + // fd: the write end of the anonymous pipe below. + return "fd:3" +} + +// Starts the subprocess and sets the c.metaPipe field +func (c *stellarCoreRunner) start() error { + // First make an anonymous pipe. + // Note io.File objects close-on-finalization. + readFile, writeFile, e := os.Pipe() + if e != nil { + return errors.Wrap(e, "error making a pipe") + } + + defer writeFile.Close() + + // Then write config file pointing to it. + e = c.writeConf() + if e != nil { + return errors.Wrap(e, "error writing conf") + } + + // Add the write-end to the set of inherited file handles. This is defined + // to be fd 3 on posix platforms. + c.cmd.ExtraFiles = []*os.File{writeFile} + e = c.cmd.Start() + if e != nil { + return errors.Wrap(e, "error starting stellar-core") + } + + // Launch a goroutine to reap immediately on exit (I think this is right, + // as we do not want zombies and we might abruptly forget / kill / close + // the process, but I'm not certain). + cmd := c.cmd + go cmd.Wait() + + c.metaPipe = readFile + return nil +} + +func (c *stellarCoreRunner) processIsAlive() bool { + if c.cmd == nil { + return false + } + if c.cmd.Process == nil { + return false + } + return c.cmd.Process.Signal(syscall.Signal(0)) == nil +} diff --git a/exp/ingest/ledgerbackend/stellar_core_runner_windows.go b/exp/ingest/ledgerbackend/stellar_core_runner_windows.go new file mode 100644 index 0000000000..8e950d67a3 --- /dev/null +++ b/exp/ingest/ledgerbackend/stellar_core_runner_windows.go @@ -0,0 +1,68 @@ +// +build windows + +package ledgerbackend + +import ( + "bufio" + "fmt" + "os" + + "github.com/Microsoft/go-winio" +) + +// Windows-specific methods for the stellarCoreRunner type. + +func (c *stellarCoreRunner) getPipeName() string { + return fmt.Sprintf(`\\.\pipe\%s`, c.nonce) +} + +func (c *stellarCoreRunner) start() error { + // First set up the server pipe. + listener, e := winio.ListenPipe(c.getPipeName(), nil) + if e != nil { + return e + } + + // Then write config file pointing to it. + e = c.writeConf() + if e != nil { + return e + } + + // Then start the process. + e = c.cmd.Start() + if e != nil { + return e + } + + // Launch a goroutine to reap immediately on exit (I think this is right, + // as we do not want zombies and we might abruptly forget / kill / close + // the process, but I'm not certain). + cmd := c.cmd + go func() { + cmd.Wait() + }() + + // Then accept on the server end. + connection, e := listener.Accept() + if e != nil { + return e + } + + c.metaPipe = bufio.NewReaderSize(connection, 1024*1024) + return nil +} + +func (c *captiveStellarCore) processIsAlive() bool { + if c.cmd == nil { + return false + } + if c.cmd.Process == nil { + return false + } + p, e := os.FindProcess(c.cmd.Process.Pid) + if e != nil || p == nil { + return false + } + return true +} diff --git a/go.list b/go.list index c9db790501..8778b5bac4 100644 --- a/go.list +++ b/go.list @@ -4,6 +4,7 @@ cloud.google.com/go v0.34.0 firebase.google.com/go v3.12.0+incompatible github.com/BurntSushi/toml v0.3.1 github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 +github.com/Microsoft/go-winio v0.4.14 github.com/Shopify/sarama v1.19.0 github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f @@ -98,7 +99,7 @@ github.com/sebest/xff v0.0.0-20150611211316-7a36e3a787b5 github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 -github.com/sirupsen/logrus v1.2.0 +github.com/sirupsen/logrus v1.4.1 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 diff --git a/go.mod b/go.mod index 0715db44f5..b1efe28d7d 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( firebase.google.com/go v3.12.0+incompatible github.com/BurntSushi/toml v0.3.1 github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 + github.com/Microsoft/go-winio v0.4.14 github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f github.com/aws/aws-sdk-go v1.25.25 @@ -58,7 +59,7 @@ require ( github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 - github.com/sirupsen/logrus v1.2.0 + github.com/sirupsen/logrus v1.4.1 github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 // indirect github.com/spf13/cobra v0.0.0-20160830174925-9c28e4bbd74e diff --git a/go.sum b/go.sum index 09dc406dd6..76b0f86559 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 h1:PPfYWScYacO3Q6JMCLkyh6Ea2Q/REDTMgmiTAeiV8Jg= github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5/go.mod h1:xnKTFzjGUiZtiOagBsfnvomW+nJg2usB1ZpordQWqNM= +github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f h1:zvClvFQwU++UpIUBGC8YmDlfhUrweEy1R1Fj1gu5iIM= @@ -188,6 +190,8 @@ github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06B github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8= @@ -276,6 +280,7 @@ golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aWCh3PIquJB2fYlv9wcs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index eb658645b7..55dd716db1 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -179,6 +179,9 @@ var dbReingestRangeCmd = &cobra.Command{ HistoryArchiveURL: config.HistoryArchiveURLs[0], IngestFailedTransactions: config.IngestFailedTransactions, } + if config.EnableCaptiveCoreIngestion { + ingestConfig.StellarCorePath = config.StellarCoreBinaryPath + } system, err := expingest.NewSystem(ingestConfig) if err != nil { diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 2d6597e434..b04e789809 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -108,6 +108,9 @@ var ingestVerifyRangeCmd = &cobra.Command{ HistoryArchiveURL: config.HistoryArchiveURLs[0], IngestFailedTransactions: config.IngestFailedTransactions, } + if config.EnableCaptiveCoreIngestion { + ingestConfig.StellarCorePath = config.StellarCoreBinaryPath + } system, err := expingest.NewSystem(ingestConfig) if err != nil { @@ -185,6 +188,9 @@ var ingestStressTestCmd = &cobra.Command{ HistoryArchiveURL: config.HistoryArchiveURLs[0], IngestFailedTransactions: config.IngestFailedTransactions, } + if config.EnableCaptiveCoreIngestion { + ingestConfig.StellarCorePath = config.StellarCoreBinaryPath + } system, err := expingest.NewSystem(ingestConfig) if err != nil { diff --git a/services/horizon/cmd/root.go b/services/horizon/cmd/root.go index d30b3c5b7d..12ac89236f 100644 --- a/services/horizon/cmd/root.go +++ b/services/horizon/cmd/root.go @@ -108,6 +108,24 @@ var dbURLConfigOption = &support.ConfigOption{ // Add a new entry here to connect a new field in the horizon.Config struct var configOpts = support.ConfigOptions{ dbURLConfigOption, + &support.ConfigOption{ + Name: "stellar-core-binary-path", + EnvVar: "STELLAR_CORE_BINARY_PATH", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to stellar core binary", + ConfigKey: &config.StellarCoreBinaryPath, + }, + &support.ConfigOption{ + Name: "enable-captive-core-ingestion", + EnvVar: "ENABLE_CAPTIVE_CORE_INGESTION", + OptType: types.Bool, + FlagDefault: false, + Required: false, + Usage: "[experimental flag!] causes Horizon to ingest from a Stellar Core subprocess instead of a persistent Stellar Core database", + ConfigKey: &config.EnableCaptiveCoreIngestion, + }, &support.ConfigOption{ Name: "stellar-core-db-url", EnvVar: "STELLAR_CORE_DATABASE_URL", @@ -397,6 +415,10 @@ func initRootConfig() { stdLog.Fatalf("--history-archive-urls must be set when --ingest is set") } + if config.EnableCaptiveCoreIngestion && config.StellarCoreBinaryPath == "" { + stdLog.Fatalf("--stellar-core-binary-path must be set when --enable-captive-core-ingestion is set") + } + // Configure log file if config.LogFile != "" { logFile, err := os.OpenFile(config.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) diff --git a/services/horizon/docker/Dockerfile b/services/horizon/docker/Dockerfile index 192fcd8d86..7e5e56318d 100644 --- a/services/horizon/docker/Dockerfile +++ b/services/horizon/docker/Dockerfile @@ -6,10 +6,21 @@ RUN go mod download COPY . ./ RUN go install github.com/stellar/go/services/horizon -FROM ubuntu:16.04 +FROM ubuntu:18.04 + +ENV STELLAR_CORE_VERSION 13.0.0-1220-9ed3da29 +ENV STELLAR_CORE_BINARY_PATH /usr/local/bin/stellar-core # ca-certificates are required to make tls connections -RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates wget + + +RUN apt-get install -y --no-install-recommends libpqxx-4.0v5 curl +RUN wget -O stellar-core.deb https://s3.amazonaws.com/stellar.org/releases/stellar-core/stellar-core-${STELLAR_CORE_VERSION}_amd64.deb +RUN dpkg -i stellar-core.deb +RUN rm stellar-core.deb + +RUN apt-get clean COPY --from=builder /go/bin/horizon ./ diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 882313a16a..e559872312 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -11,12 +11,14 @@ import ( // Config is the configuration for horizon. It gets populated by the // app's main function and is provided to NewApp. type Config struct { - DatabaseURL string - StellarCoreDatabaseURL string - StellarCoreURL string - HistoryArchiveURLs []string - Port uint - AdminPort uint + DatabaseURL string + StellarCoreBinaryPath string + StellarCoreDatabaseURL string + StellarCoreURL string + EnableCaptiveCoreIngestion bool + HistoryArchiveURLs []string + Port uint + AdminPort uint // MaxDBConnections has a priority over all 4 values below. MaxDBConnections int diff --git a/services/horizon/internal/expingest/fake_ledger_backend.go b/services/horizon/internal/expingest/fake_ledger_backend.go index 0001036558..2f9ce1c404 100644 --- a/services/horizon/internal/expingest/fake_ledger_backend.go +++ b/services/horizon/internal/expingest/fake_ledger_backend.go @@ -16,6 +16,10 @@ func (fakeLedgerBackend) GetLatestLedgerSequence() (uint32, error) { return 1, nil } +func (fakeLedgerBackend) PrepareRange(from uint32, to uint32) error { + return nil +} + func fakeAccount() xdr.LedgerEntryChange { account := keypair.MustRandom().Address() return xdr.LedgerEntryChange{ diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index ee9e4f1771..b2dc95b579 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -555,6 +555,23 @@ func (h reingestHistoryRangeState) run(s *System) (transition, error) { return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) } + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + }).Info("Preparing ledger backend to retrieve range") + startTime := time.Now() + + err := s.ledgerBackend.PrepareRange(h.fromLedger, h.toLedger) + if err != nil { + return stop(), errors.Wrap(err, "error preparing range") + } + + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + "duration": time.Since(startTime).Seconds(), + }).Info("Range ready") + if h.force { if err := s.historyQ.Begin(); err != nil { return stop(), errors.Wrap(err, "Error starting a transaction") diff --git a/services/horizon/internal/expingest/ingest_history_range_state_test.go b/services/horizon/internal/expingest/ingest_history_range_state_test.go index 504867a574..d9cbc7e5ab 100644 --- a/services/horizon/internal/expingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/expingest/ingest_history_range_state_test.go @@ -166,6 +166,7 @@ type ReingestHistoryRangeStateTestSuite struct { suite.Suite historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter + ledgerBackend *mockLedgerBackend runner *mockProcessorsRunner system *System } @@ -173,17 +174,21 @@ type ReingestHistoryRangeStateTestSuite struct { func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} + s.ledgerBackend = &mockLedgerBackend{} s.runner = &mockProcessorsRunner{} s.system = &System{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, + ledgerBackend: s.ledgerBackend, runner: s.runner, } s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("Begin").Return(nil).Once() + + s.ledgerBackend.On("PrepareRange", uint32(100), uint32(200)).Return(nil).Once() } func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { @@ -342,7 +347,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) @@ -354,6 +358,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("Commit").Return(nil).Once() + // Recreate mock in this single test to remove previous assertion. + *s.ledgerBackend = mockLedgerBackend{} + s.ledgerBackend.On("PrepareRange", uint32(100), uint32(100)).Return(nil).Once() + err := s.system.ReingestRange(100, 100, false) s.Assert().NoError(err) } diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 5762a515b1..cca155edbf 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -55,6 +55,7 @@ type Config struct { CoreSession *db.Session StellarCoreURL string StellarCoreCursor string + StellarCorePath string NetworkPassphrase string HistorySession *db.Session @@ -138,10 +139,19 @@ func NewSystem(config Config) (*System, error) { coreSession := config.CoreSession.Clone() coreSession.Ctx = ctx - ledgerBackend, err := ledgerbackend.NewDatabaseBackendFromSession(coreSession) - if err != nil { - cancel() - return nil, errors.Wrap(err, "error creating ledger backend") + var ledgerBackend ledgerbackend.LedgerBackend + if len(config.StellarCorePath) > 0 { + ledgerBackend = ledgerbackend.NewCaptive( + config.StellarCorePath, + config.NetworkPassphrase, + []string{config.HistoryArchiveURL}, + ) + } else { + ledgerBackend, err = ledgerbackend.NewDatabaseBackendFromSession(coreSession) + if err != nil { + cancel() + return nil, errors.Wrap(err, "error creating ledger backend") + } } historyQ := &history.Q{config.HistorySession.Clone()} diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index d3bbb95cc4..5b9226b85a 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -356,6 +356,30 @@ func (m *mockDBQ) CreateAssets(assets []xdr.Asset, batchSize int) (map[string]hi return args.Get(0).(map[string]history.Asset), args.Error(1) } +type mockLedgerBackend struct { + mock.Mock +} + +func (m *mockLedgerBackend) GetLatestLedgerSequence() (sequence uint32, err error) { + args := m.Called() + return args.Get(0).(uint32), args.Error(1) +} + +func (m *mockLedgerBackend) GetLedger(sequence uint32) (bool, ledgerbackend.LedgerCloseMeta, error) { + args := m.Called(sequence) + return args.Get(0).(bool), args.Get(1).(ledgerbackend.LedgerCloseMeta), args.Error(2) +} + +func (m *mockLedgerBackend) PrepareRange(from uint32, to uint32) error { + args := m.Called(from, to) + return args.Error(0) +} + +func (m *mockLedgerBackend) Close() error { + args := m.Called() + return args.Error(0) +} + type mockProcessorsRunner struct { mock.Mock } diff --git a/services/horizon/internal/expingest/processor_runner.go b/services/horizon/internal/expingest/processor_runner.go index 71c87570a5..214f2683b7 100644 --- a/services/horizon/internal/expingest/processor_runner.go +++ b/services/horizon/internal/expingest/processor_runner.go @@ -158,20 +158,19 @@ func (s *ProcessorRunner) validateBucketList(ledgerSequence uint32) error { return errors.Wrap(err, "Error getting bucket list hash") } - ledgerReader, err := io.NewDBLedgerReader(s.ctx, ledgerSequence, s.ledgerBackend) + exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(ledgerSequence) if err != nil { - if err == io.ErrNotFound { - return fmt.Errorf( - "cannot validate bucket hash list. Checkpoint ledger (%d) must exist in Stellar-Core database.", - ledgerSequence, - ) - } else { - return errors.Wrap(err, "Error getting ledger") - } + return errors.Wrap(err, "Error getting ledger") + } + + if !exists { + return fmt.Errorf( + "cannot validate bucket hash list. Checkpoint ledger (%d) must exist in Stellar-Core database.", + ledgerSequence, + ) } - ledgerHeader := ledgerReader.GetHeader() - ledgerBucketHashList := ledgerHeader.Header.BucketListHash + ledgerBucketHashList := ledgerCloseMeta.LedgerHeader.Header.BucketListHash if !bytes.Equal(historyBucketListHash[:], ledgerBucketHashList[:]) { return fmt.Errorf( @@ -238,7 +237,7 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( ) error { var changeReader io.ChangeReader var err error - changeReader, err = io.NewLedgerChangeReader(s.ctx, ledger, s.ledgerBackend) + changeReader, err = io.NewLedgerChangeReader(s.ledgerBackend, ledger) if err != nil { return errors.Wrap(err, "Error creating ledger change reader") } @@ -264,13 +263,13 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.StatsLedgerTransactionProcessorResults, error) { ledgerTransactionStats := io.StatsLedgerTransactionProcessor{} - ledgerReader, err := io.NewDBLedgerReader(s.ctx, ledger, s.ledgerBackend) + transactionReader, err := io.NewLedgerTransactionReader(s.ledgerBackend, ledger) if err != nil { return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error creating ledger reader") } - txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, ledgerReader.GetHeader()) - err = io.StreamLedgerTransactions(txProcessor, ledgerReader) + txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, transactionReader.GetHeader()) + err = io.StreamLedgerTransactions(txProcessor, transactionReader) if err != nil { return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error streaming changes from ledger") } diff --git a/support/historyarchive/verify.go b/support/historyarchive/verify.go index 56f35d4779..378fbf4b6c 100644 --- a/support/historyarchive/verify.go +++ b/support/historyarchive/verify.go @@ -228,7 +228,7 @@ func (arch *Archive) VerifyBucketEntries(h Hash) error { var entry xdr.BucketEntry err = rdr.ReadOne(&entry) if err == nil { - err2 := WriteFramedXdr(hsh, &entry) + err2 := xdr.MarshalFramed(hsh, &entry) if err2 != nil { return err2 } diff --git a/support/historyarchive/xdrstream.go b/support/historyarchive/xdrstream.go index ed1b2bf815..60647c9c8b 100644 --- a/support/historyarchive/xdrstream.go +++ b/support/historyarchive/xdrstream.go @@ -189,23 +189,3 @@ func (x *XdrStream) BytesRead() int64 { func (x *XdrStream) Discard(n int64) (int64, error) { return io.CopyN(ioutil.Discard, x.rdr, n) } - -func WriteFramedXdr(out io.Writer, in interface{}) error { - var tmp bytes.Buffer - n, err := xdr.Marshal(&tmp, in) - if err != nil { - return err - } - un := uint32(n) - if un > 0x7fffffff { - return fmt.Errorf("Overlong write: %d bytes", n) - } - - un = un | 0x80000000 - binary.Write(out, binary.BigEndian, &un) - k, err := tmp.WriteTo(out) - if int64(n) != k { - return fmt.Errorf("Mismatched write length: %d vs. %d", n, k) - } - return err -} diff --git a/support/historyarchive/xdrstream_test.go b/support/historyarchive/xdrstream_test.go index 4edea22143..8347d56a69 100644 --- a/support/historyarchive/xdrstream_test.go +++ b/support/historyarchive/xdrstream_test.go @@ -35,7 +35,7 @@ func TestXdrStreamHash(t *testing.T) { // - uint32 representing the number of bytes of a structure, // - xdr-encoded `BucketEntry` above. b := &bytes.Buffer{} - err := WriteFramedXdr(b, bucketEntry) + err := xdr.MarshalFramed(b, bucketEntry) require.NoError(t, err) expectedHash := sha256.Sum256(b.Bytes()) @@ -82,8 +82,8 @@ func TestXdrStreamDiscard(t *testing.T) { fullStream := createXdrStream(firstEntry, secondEntry) b := &bytes.Buffer{} - require.NoError(t, WriteFramedXdr(b, firstEntry)) - require.NoError(t, WriteFramedXdr(b, secondEntry)) + require.NoError(t, xdr.MarshalFramed(b, firstEntry)) + require.NoError(t, xdr.MarshalFramed(b, secondEntry)) expectedHash := sha256.Sum256(b.Bytes()) fullStream.SetExpectedHash(expectedHash) @@ -121,7 +121,7 @@ func TestXdrStreamDiscard(t *testing.T) { func createXdrStream(entries ...xdr.BucketEntry) *XdrStream { b := &bytes.Buffer{} for _, e := range entries { - err := WriteFramedXdr(b, e) + err := xdr.MarshalFramed(b, e) if err != nil { panic(err) } diff --git a/xdr/main.go b/xdr/main.go index c236854110..ac099d16ec 100644 --- a/xdr/main.go +++ b/xdr/main.go @@ -5,9 +5,13 @@ package xdr import ( "bytes" "encoding/base64" + "encoding/binary" "fmt" "io" "strings" + + xdr "github.com/stellar/go-xdr/xdr3" + "github.com/stellar/go/support/errors" ) // Keyer represents a type that can be converted into a LedgerKey @@ -69,6 +73,55 @@ func MarshalBase64(v interface{}) (string, error) { return base64.StdEncoding.EncodeToString(raw.Bytes()), nil } +func MarshalFramed(w io.Writer, v interface{}) error { + var tmp bytes.Buffer + n, err := Marshal(&tmp, v) + if err != nil { + return err + } + un := uint32(n) + if un > 0x7fffffff { + return fmt.Errorf("Overlong write: %d bytes", n) + } + + un = un | 0x80000000 + err = binary.Write(w, binary.BigEndian, &un) + if err != nil { + return errors.Wrap(err, "error in binary.Write") + } + k, err := tmp.WriteTo(w) + if int64(n) != k { + return fmt.Errorf("Mismatched write length: %d vs. %d", n, k) + } + return err +} + +// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte +// big-endian length header that has the high bit set, followed by that length worth of +// XDR data. Decoding this involves just a little more work than xdr.Unmarshal. +func UnmarshalFramed(r io.Reader, v interface{}) (int, error) { + var frameLen uint32 + n, e := Unmarshal(r, &frameLen) + if e != nil { + return n, errors.Wrap(e, "unmarshalling XDR frame header") + } + if n != 4 { + return n, errors.New("bad length of XDR frame header") + } + if (frameLen & 0x80000000) != 0x80000000 { + return n, errors.New("malformed XDR frame header") + } + frameLen &= 0x7fffffff + m, err := xdr.Unmarshal(r, v) + if err != nil { + return n + m, errors.Wrap(err, "unmarshalling framed XDR") + } + if int64(m) != int64(frameLen) { + return n + m, errors.New("bad length of XDR frame body") + } + return m + n, nil +} + type countWriter struct { Count int }