Skip to content

Commit

Permalink
Merge pull request #2695 from stellar/captive-core
Browse files Browse the repository at this point in the history
Merge Captive core into 1.5.0 release branch
  • Loading branch information
bartekn authored Jun 16, 2020
2 parents dce848d + 936bc3a commit 68d0a2f
Show file tree
Hide file tree
Showing 37 changed files with 1,317 additions and 562 deletions.
48 changes: 48 additions & 0 deletions exp/ingest/io/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,54 @@ type Change struct {
Post *xdr.LedgerEntry
}

// GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change.
// Each `update` and `removed` is preceded with `state` and `create` changes
// are alone, without `state`. The transformation we're doing is to move each
// change (state/update, state/removed or create) to an array of pre/post pairs.
// Then:
// - for create, pre is null and post is a new entry,
// - for update, pre is previous state and post is the current state,
// - for removed, pre is previous state and post is null.
//
// stellar-core source:
// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582
func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change {
changes := []Change{}

for i, entryChange := range ledgerEntryChanges {
switch entryChange.Type {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
created := entryChange.MustCreated()
changes = append(changes, Change{
Type: created.Data.Type,
Pre: nil,
Post: &created,
})
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
state := ledgerEntryChanges[i-1].MustState()
updated := entryChange.MustUpdated()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state,
Post: &updated,
})
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
state := ledgerEntryChanges[i-1].MustState()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state,
Post: nil,
})
case xdr.LedgerEntryChangeTypeLedgerEntryState:
continue
default:
panic("Invalid LedgerEntryChangeType")
}
}

return changes
}

// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.
func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
switch {
Expand Down
165 changes: 0 additions & 165 deletions exp/ingest/io/change_reader.go

This file was deleted.

127 changes: 127 additions & 0 deletions exp/ingest/io/ledger_change_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io

import (
"io"

"github.com/stellar/go/exp/ingest/ledgerbackend"
)

// ChangeReader provides convenient, streaming access to a sequence of Changes.
type ChangeReader interface {
// Read should return the next `Change` in the leader. If there are no more
// changes left it should return an `io.EOF` error.
Read() (Change, error)
// Close should be called when reading is finished. This is especially
// helpful when there are still some changes available so reader can stop
// streaming them.
Close() error
}

// ledgerChangeReaderState defines possible states of LedgerChangeReader.
type ledgerChangeReaderState int

const (
// feeChangesState is active when LedgerChangeReader is reading fee changes.
feeChangesState ledgerChangeReaderState = iota
// feeChangesState is active when LedgerChangeReader is reading transaction meta changes.
metaChangesState
// feeChangesState is active when LedgerChangeReader is reading upgrade changes.
upgradeChangesState
)

// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core
// for a single ledger
type LedgerChangeReader struct {
*LedgerTransactionReader
state ledgerChangeReaderState
pending []Change
pendingIndex int
upgradeIndex int
}

// Ensure LedgerChangeReader implements ChangeReader
var _ ChangeReader = (*LedgerChangeReader)(nil)

// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger.
// Note that the returned LedgerChangeReader is not thread safe and should not be shared
// by multiple goroutines.
func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerChangeReader, error) {
transactionReader, err := NewLedgerTransactionReader(backend, sequence)
if err != nil {
return nil, err
}

return &LedgerChangeReader{
LedgerTransactionReader: transactionReader,
state: feeChangesState,
}, nil
}

// Read returns the next change in the stream.
// If there are no changes remaining io.EOF is returned as an error.
func (r *LedgerChangeReader) Read() (Change, error) {
// Changes within a ledger should be read in the following order:
// - fee changes of all transactions,
// - transaction meta changes of all transactions,
// - upgrade changes.
// Because a single transaction can introduce many changes we read all the
// changes from a single transaction and save them in r.pending.
// When Read() is called we stream pending changes first. We also call Read()
// recursively after adding some changes (what will return them from r.pending)
// to not duplicate the code.
if r.pendingIndex < len(r.pending) {
next := r.pending[r.pendingIndex]
r.pendingIndex++
if r.pendingIndex == len(r.pending) {
r.pendingIndex = 0
r.pending = r.pending[:0]
}
return next, nil
}

switch r.state {
case feeChangesState, metaChangesState:
tx, err := r.LedgerTransactionReader.Read()
if err != nil {
if err == io.EOF {
// If done streaming fee changes rewind to stream meta changes
if r.state == feeChangesState {
r.LedgerTransactionReader.Rewind()
}
r.state++
return r.Read()
}
return Change{}, err
}

switch r.state {
case feeChangesState:
r.pending = append(r.pending, tx.GetFeeChanges()...)
case metaChangesState:
metaChanges, err := tx.GetChanges()
if err != nil {
return Change{}, err
}
r.pending = append(r.pending, metaChanges...)
}
return r.Read()
case upgradeChangesState:
// Get upgrade changes
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta) {
changes := GetChangesFromLedgerEntryChanges(
r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta[r.upgradeIndex],
)
r.pending = append(r.pending, changes...)
r.upgradeIndex++
return r.Read()
}
}

return Change{}, io.EOF
}

// Close should be called when reading is finished.
func (r *LedgerChangeReader) Close() error {
r.pending = nil
return r.LedgerTransactionReader.Close()
}
Loading

0 comments on commit 68d0a2f

Please sign in to comment.