Skip to content

Commit

Permalink
Merge pull request #1946 from stellar/release-horizon-v0.23.0
Browse files Browse the repository at this point in the history
Merge release-horizon v0.23.0 into master
  • Loading branch information
bartekn authored Nov 18, 2019
2 parents 2d0dc2e + 921e8ec commit 325abb3
Show file tree
Hide file tree
Showing 154 changed files with 14,013 additions and 1,490 deletions.
67 changes: 45 additions & 22 deletions exp/ingest/io/ledger_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ import (
)

// DBLedgerReader is a database-backed implementation of the io.LedgerReader interface.
// Use NewDBLedgerReader to create a new instance.
type DBLedgerReader struct {
sequence uint32
backend ledgerbackend.LedgerBackend
header xdr.LedgerHeaderHistoryEntry
transactions []LedgerTransaction
readIdx int
initOnce sync.Once
readMutex sync.Mutex
sequence uint32
backend ledgerbackend.LedgerBackend
header xdr.LedgerHeaderHistoryEntry
transactions []LedgerTransaction
upgradeChanges []Change
readMutex sync.Mutex
readIdx int
upgradeReadIdx int
readUpgradeChangeCalled bool
ignoreUpgradeChanges bool
}

// Ensure DBLedgerReader implements LedgerReader
Expand All @@ -30,8 +34,7 @@ func NewDBLedgerReader(sequence uint32, backend ledgerbackend.LedgerBackend) (*D
backend: backend,
}

var err error
reader.initOnce.Do(func() { err = reader.init() })
err := reader.init()
if err != nil {
return nil, err
}
Expand All @@ -46,25 +49,12 @@ func (dblrc *DBLedgerReader) GetSequence() uint32 {

// GetHeader returns the XDR Header data associated with the stored ledger.
func (dblrc *DBLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry {
var err error
dblrc.initOnce.Do(func() { err = dblrc.init() })
if err != nil {
// TODO, object should be initialized in constructor.
// Not returning error here, makes this much simpler.
panic(err)
}
return dblrc.header
}

// Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there
// are no more transactions to return, an EOF error is returned.
func (dblrc *DBLedgerReader) Read() (LedgerTransaction, error) {
var err error
dblrc.initOnce.Do(func() { err = dblrc.init() })
if err != nil {
return LedgerTransaction{}, err
}

// Protect all accesses to dblrc.readIdx
dblrc.readMutex.Lock()
defer dblrc.readMutex.Unlock()
Expand All @@ -76,10 +66,38 @@ func (dblrc *DBLedgerReader) Read() (LedgerTransaction, error) {
return LedgerTransaction{}, io.EOF
}

// ReadUpgradeChange returns the next upgrade change in the ledger, each time it
// is called. When there are no more upgrades to return, an EOF error is returned.
func (dblrc *DBLedgerReader) ReadUpgradeChange() (Change, error) {
// Protect all accesses to dblrc.upgradeReadIdx
dblrc.readMutex.Lock()
defer dblrc.readMutex.Unlock()
dblrc.readUpgradeChangeCalled = true

if dblrc.upgradeReadIdx < len(dblrc.upgradeChanges) {
dblrc.upgradeReadIdx++
return dblrc.upgradeChanges[dblrc.upgradeReadIdx-1], nil
}
return Change{}, io.EOF
}

// GetUpgradeChanges returns all ledger upgrade changes.
func (dblrc *DBLedgerReader) GetUpgradeChanges() []Change {
return dblrc.upgradeChanges
}

func (dblrc *DBLedgerReader) IgnoreUpgradeChanges() {
dblrc.ignoreUpgradeChanges = true
}

// Close moves the read pointer so that subsequent calls to Read() will return EOF.
func (dblrc *DBLedgerReader) Close() error {
dblrc.readMutex.Lock()
dblrc.readIdx = len(dblrc.transactions)
if !dblrc.ignoreUpgradeChanges &&
(!dblrc.readUpgradeChangeCalled || dblrc.upgradeReadIdx != len(dblrc.upgradeChanges)) {
return errors.New("Ledger upgrade changes not read! Use ReadUpgradeChange() method.")
}
dblrc.readMutex.Unlock()

return nil
Expand All @@ -100,6 +118,11 @@ func (dblrc *DBLedgerReader) init() error {

dblrc.storeTransactions(ledgerCloseMeta)

for _, upgradeChanges := range ledgerCloseMeta.UpgradesMeta {
changes := getChangesFromLedgerEntryChanges(upgradeChanges)
dblrc.upgradeChanges = append(dblrc.upgradeChanges, changes...)
}

return nil
}

Expand Down
115 changes: 98 additions & 17 deletions exp/ingest/io/ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io

import (
"bytes"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

Expand All @@ -13,8 +16,77 @@ import (
// If an entry is removed: Pre is not nil and Post is nil.
type Change struct {
Type xdr.LedgerEntryType
Pre *xdr.LedgerEntryData
Post *xdr.LedgerEntryData
Pre *xdr.LedgerEntry
Post *xdr.LedgerEntry
}

// AccountChangedExceptSigners returns true if account has changed WITHOUT
// checking the signers (except master key weight!). In other words, if the only
// change is connected to signers, this function will return false.
func (c *Change) AccountChangedExceptSigners() (bool, error) {
if c.Type != xdr.LedgerEntryTypeAccount {
panic("This should not be called on changes other than Account changes")
}

// New account
if c.Pre == nil {
return true, nil
}

// Account merged
// c.Pre != nil at this point.
if c.Post == nil {
return true, nil
}

// c.Pre != nil && c.Post != nil at this point.
if c.Pre.LastModifiedLedgerSeq != c.Post.LastModifiedLedgerSeq {
return true, nil
}

// Don't use short assignment statement (:=) to ensure variables below
// are not pointers (if `xdr` package changes in the future)!
var preAccountEntry, postAccountEntry xdr.AccountEntry
preAccountEntry = c.Pre.Data.MustAccount()
postAccountEntry = c.Post.Data.MustAccount()

// preAccountEntry and postAccountEntry are copies so it's fine to
// modify them here, EXCEPT pointers inside them!
if preAccountEntry.Ext.V == 0 {
preAccountEntry.Ext.V = 1
preAccountEntry.Ext.V1 = &xdr.AccountEntryV1{
Liabilities: xdr.Liabilities{
Buying: 0,
Selling: 0,
},
}
}

preAccountEntry.Signers = nil

if postAccountEntry.Ext.V == 0 {
postAccountEntry.Ext.V = 1
postAccountEntry.Ext.V1 = &xdr.AccountEntryV1{
Liabilities: xdr.Liabilities{
Buying: 0,
Selling: 0,
},
}
}

postAccountEntry.Signers = nil

preBinary, err := preAccountEntry.MarshalBinary()
if err != nil {
return false, errors.Wrap(err, "Error running preAccountEntry.MarshalBinary")
}

postBinary, err := postAccountEntry.MarshalBinary()
if err != nil {
return false, errors.Wrap(err, "Error running postAccountEntry.MarshalBinary")
}

return !bytes.Equal(preBinary, postBinary), nil
}

// AccountSignersChanged returns true if account signers have changed.
Expand All @@ -36,8 +108,8 @@ func (c *Change) AccountSignersChanged() bool {
}

// c.Pre != nil && c.Post != nil at this point.
preAccountEntry := c.Pre.MustAccount()
postAccountEntry := c.Post.MustAccount()
preAccountEntry := c.Pre.Data.MustAccount()
postAccountEntry := c.Post.Data.MustAccount()

preSigners := preAccountEntry.SignerSummary()
postSigners := postAccountEntry.SignerSummary()
Expand Down Expand Up @@ -68,18 +140,27 @@ func (t *LedgerTransaction) GetChanges() []Change {
changes := getChangesFromLedgerEntryChanges(t.FeeChanges)

// Transaction meta
v1Meta, ok := t.Meta.GetV1()
if ok {
switch t.Meta.V {
case 0:
for _, operationMeta := range *t.Meta.Operations {
opChanges := getChangesFromLedgerEntryChanges(
operationMeta.Changes,
)
changes = append(changes, opChanges...)
}
case 1:
v1Meta := t.Meta.MustV1()
txChanges := getChangesFromLedgerEntryChanges(v1Meta.TxChanges)
changes = append(changes, txChanges...)
}

// Operation meta
for _, operationMeta := range t.Meta.OperationsMeta() {
ledgerEntryChanges := operationMeta.Changes
opChanges := getChangesFromLedgerEntryChanges(ledgerEntryChanges)

changes = append(changes, opChanges...)
for _, operationMeta := range v1Meta.Operations {
opChanges := getChangesFromLedgerEntryChanges(
operationMeta.Changes,
)
changes = append(changes, opChanges...)
}
default:
panic("Unkown TransactionMeta version")
}

return changes
Expand All @@ -106,21 +187,21 @@ func getChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
changes = append(changes, Change{
Type: created.Data.Type,
Pre: nil,
Post: &created.Data,
Post: &created,
})
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
state := ledgerEntryChanges[i-1].MustState()
updated := entryChange.MustUpdated()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state.Data,
Post: &updated.Data,
Pre: &state,
Post: &updated,
})
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
state := ledgerEntryChanges[i-1].MustState()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state.Data,
Pre: &state,
Post: nil,
})
case xdr.LedgerEntryChangeTypeLedgerEntryState:
Expand Down
Loading

0 comments on commit 325abb3

Please sign in to comment.