Skip to content

Commit

Permalink
exp/ingest/io: Add LedgerEntryChangeCache to improve performance of t…
Browse files Browse the repository at this point in the history
…x meta (#2004)

This commit adds `exp/ingest/io.LedgerEntryChangeCache` that squashes
all the ledger entry changes. This can be later used to decrease number
of DB queries when applying them. See #2003.

Some ledgers that add a lot of changes connected to a small set of
entries are causing a performance issues because every ledger entry
change is applied to a DB.  `LedgerEntryChangeCache` solves this problem
because it makes holds a final version of a ledger entry after all the
changes. 

Before this fix, extreme cases when two accounts send a payment between
each other 1000 times in a ledger required 3000 DB updates (2000 account
changes due to payment and 500 fee meta per account). After the fix, it
requires just 2 DB updates.

Algorithm used in `LedgerEntryChangeCache` is explained below:

1. If the change is CREATED it checks if any change connected to given entry
   is already in the cache. If not, it adds CREATED change. Otherwise, if
   existing change is:
   a. CREATED it returns error because we can't add an entry that already
      exists.
   b. UPDATED it returns error because we can't add an entry that already
      exists.
   c. REMOVED it means that due to previous transitions we want to remove
      this from a DB what means that it already exists in a DB so we need to
      update the type of change to UPDATED.
2. If the change is UPDATE it checks if any change connected to given entry
   is already in the cache. If not, it adds UPDATE change. Otherwise, if
   existing change is:
   a. CREATED it means that due to previous transitions we want to create
      this in a DB what means that it doesn't exist in a DB so we need to
      update the entry but stay with CREATED type.
   b. UPDATED we simply update it with the new value.
   c. REMOVED it means that at this point in the ledger the entry is removed
      so updating it returns an error.
3. If the change is REMOVE it checks if any change connected to given entry
   is already in the cache. If not, it adds REMOVE change. Otherwise, if
   existing change is:
   a. CREATED it means that due to previous transitions we want to create
      this in a DB what means that it doesn't exist in a DB. If it was
      created and removed in the same ledger it's a noop so we remove entry
      from the cache.
   b. UPDATED we simply update it to be a REMOVE change because the UPDATE
      change means the entry exists in a DB.
   c. REMOVED it returns error because we can't remove an entry that was
      already removed.
  • Loading branch information
bartekn authored Dec 12, 2019
1 parent e486e1c commit 6398a58
Show file tree
Hide file tree
Showing 17 changed files with 845 additions and 242 deletions.
13 changes: 13 additions & 0 deletions exp/ingest/errors/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package errors

// StateError are errors indicating invalid state. Type is used to differentiate
// between network, i/o, marshaling, bad usage etc. errors and actual state errors.
// You can use type assertion or type switch to check for type.
type StateError struct {
error
}

// NewStateError creates a new StateError.
func NewStateError(err error) StateError {
return StateError{err}
}
217 changes: 217 additions & 0 deletions exp/ingest/io/ledger_entry_change_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package io

import (
"sync"

ingesterrors "github.com/stellar/go/exp/ingest/errors"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// LedgerEntryChangeCache is a cache of ledger entry changes that squashes all
// changes within a single ledger. By doing this, it decreases number of DB
// queries sent to a DB to update the current state of the ledger.
// It has integrity checks built in so ex. removing an account that was
// previously removed returns an error. In such case verify.StateError is
// returned.
//
// It applies changes to the cache using the following algorithm:
//
// 1. If the change is CREATED it checks if any change connected to given entry
// is already in the cache. If not, it adds CREATED change. Otherwise, if
// existing change is:
// a. CREATED it returns error because we can't add an entry that already
// exists.
// b. UPDATED it returns error because we can't add an entry that already
// exists.
// c. REMOVED it means that due to previous transitions we want to remove
// this from a DB what means that it already exists in a DB so we need to
// update the type of change to UPDATED.
// 2. If the change is UPDATE it checks if any change connected to given entry
// is already in the cache. If not, it adds UPDATE change. Otherwise, if
// existing change is:
// a. CREATED it means that due to previous transitions we want to create
// this in a DB what means that it doesn't exist in a DB so we need to
// update the entry but stay with CREATED type.
// b. UPDATED we simply update it with the new value.
// c. REMOVED it means that at this point in the ledger the entry is removed
// so updating it returns an error.
// 3. If the change is REMOVE it checks if any change connected to given entry
// is already in the cache. If not, it adds REMOVE change. Otherwise, if
// existing change is:
// a. CREATED it means that due to previous transitions we want to create
// this in a DB what means that it doesn't exist in a DB. If it was
// created and removed in the same ledger it's a noop so we remove entry
// from the cache.
// b. UPDATED we simply update it to be a REMOVE change because the UPDATE
// change means the entry exists in a DB.
// c. REMOVED it returns error because we can't remove an entry that was
// already removed.
type LedgerEntryChangeCache struct {
// ledger key => Change
cache map[string]Change
mutex sync.Mutex
}

// NewLedgerEntryChangeCache returns a new LedgerEntryChangeCache.
func NewLedgerEntryChangeCache() *LedgerEntryChangeCache {
return &LedgerEntryChangeCache{
cache: make(map[string]Change),
}
}

// AddChange adds a change to LedgerEntryChangeCache. All changes are stored
// in memory. To get the final, squashed changes call GetChanges.
//
// Please note that the current ledger capacity in pubnet (max 1000 ops/ledger)
// makes LedgerEntryChangeCache safe to use in terms of memory usage. If the
// cache takes too much memory, you apply changes returned by GetChanges and
// create a new LedgerEntryChangeCache object to continue ingestion.
func (c *LedgerEntryChangeCache) AddChange(change Change) error {
c.mutex.Lock()
defer c.mutex.Unlock()

switch {
case change.Pre == nil && change.Post != nil:
return c.addCreatedChange(change)
case change.Pre != nil && change.Post != nil:
return c.addUpdatedChange(change)
case change.Pre != nil && change.Post == nil:
return c.addRemovedChange(change)
default:
return errors.New("Unknown entry change state")
}
}

// addCreatedChange adds a change to the cache, but returns an error if create
// change is unexpected.
func (c *LedgerEntryChangeCache) addCreatedChange(change Change) error {
ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64()
if err != nil {
return errors.Wrap(err, "Error MarshalBinaryBase64")
}

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
return ingesterrors.NewStateError(errors.Errorf(
"can't create an entry that already exists (ledger key = %s)",
ledgerKeyString,
))
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
return ingesterrors.NewStateError(errors.Errorf(
"can't create an entry that already exists (ledger key = %s)",
ledgerKeyString,
))
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
// If existing type is removed it means that this entry does exist
// in a DB so we update entry change.
c.cache[ledgerKeyString] = Change{
Type: change.Post.LedgerKey().Type,
Pre: existingChange.Pre,
Post: change.Post,
}
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
}

return nil
}

// addUpdatedChange adds a change to the cache, but returns an error if update
// change is unexpected.
func (c *LedgerEntryChangeCache) addUpdatedChange(change Change) error {
ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64()
if err != nil {
return errors.Wrap(err, "Error MarshalBinaryBase64")
}

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
// If existing type is created it means that this entry does not
// exist in a DB so we update entry change.
c.cache[ledgerKeyString] = Change{
Type: change.Post.LedgerKey().Type,
Pre: existingChange.Pre, // = nil
Post: change.Post,
}
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = Change{
Type: change.Post.LedgerKey().Type,
Pre: existingChange.Pre,
Post: change.Post,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return ingesterrors.NewStateError(errors.Errorf(
"can't update an entry that was previously removed (ledger key = %s)",
ledgerKeyString,
))
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
}

return nil
}

// addRemovedChange adds a change to the cache, but returns an error if remove
// change is unexpected.
func (c *LedgerEntryChangeCache) addRemovedChange(change Change) error {
ledgerKeyString, err := change.Pre.LedgerKey().MarshalBinaryBase64()
if err != nil {
return errors.Wrap(err, "Error MarshalBinaryBase64")
}

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
// If existing type is created it means that this will be no op.
// Entry was created and is now removed in a single ledger.
delete(c.cache, ledgerKeyString)
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = Change{
Type: change.Pre.LedgerKey().Type,
Pre: existingChange.Pre,
Post: nil,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return ingesterrors.NewStateError(errors.Errorf(
"can't remove an entry that was previously removed (ledger key = %s)",
ledgerKeyString,
))
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
}

return nil
}

// GetChanges returns a slice of Changes in the cache. The order of changes is
// random but each change is connected to a separate entry.
func (c *LedgerEntryChangeCache) GetChanges() []Change {
c.mutex.Lock()
defer c.mutex.Unlock()

changes := make([]Change, 0, len(c.cache))

for _, entryChange := range c.cache {
changes = append(changes, entryChange)
}

return changes
}
Loading

0 comments on commit 6398a58

Please sign in to comment.