Skip to content

Commit

Permalink
xdr and horizon: Optimize compress-marshaling of ledger keys (#4071)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Nov 16, 2021
1 parent d9d4986 commit 2fcd985
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 174 deletions.
54 changes: 54 additions & 0 deletions benchmarks/xdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,57 @@ func BenchmarkXDRUnsafeMarshalBase64WithEncodingBuffer(b *testing.B) {
_, _ = e.UnsafeMarshalBase64(xdrInput)
}
}

var ledgerKeys = []xdr.LedgerKey{
{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.LedgerKeyAccount{
AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"),
},
},
{
Type: xdr.LedgerEntryTypeTrustline,
TrustLine: &xdr.LedgerKeyTrustLine{
AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"),
Asset: xdr.MustNewCreditAsset("EUR", "GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB").ToTrustLineAsset(),
},
},
{
Type: xdr.LedgerEntryTypeOffer,
Offer: &xdr.LedgerKeyOffer{
SellerId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"),
OfferId: xdr.Int64(3),
},
},
{
Type: xdr.LedgerEntryTypeData,
Data: &xdr.LedgerKeyData{
AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"),
DataName: "foobar",
},
},
{
Type: xdr.LedgerEntryTypeClaimableBalance,
ClaimableBalance: &xdr.LedgerKeyClaimableBalance{
BalanceId: xdr.ClaimableBalanceId{
Type: 0,
V0: &xdr.Hash{0xca, 0xfe, 0xba, 0xbe},
},
},
},
{
Type: xdr.LedgerEntryTypeLiquidityPool,
LiquidityPool: &xdr.LedgerKeyLiquidityPool{
LiquidityPoolId: xdr.PoolId{0xca, 0xfe, 0xba, 0xbe},
},
},
}

func BenchmarkXDRMarshalCompress(b *testing.B) {
e := xdr.NewEncodingBuffer()
for i := 0; i < b.N; i++ {
for _, lk := range ledgerKeys {
_, _ = e.LedgerKeyUnsafeMarshalBinaryCompress(lk)
}
}
}
29 changes: 17 additions & 12 deletions ingest/checkpoint_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type CheckpointChangeReader struct {
totalRead int64
totalSize int64

encodingBuffer *xdr.EncodingBuffer

// This should be set to true in tests only
disableBucketListHashValidation bool
sleep func(time.Duration)
Expand Down Expand Up @@ -109,16 +111,17 @@ func NewCheckpointChangeReader(
}

return &CheckpointChangeReader{
ctx: ctx,
has: &has,
archive: archive,
tempStore: tempStore,
sequence: sequence,
readChan: make(chan readResult, msrBufferSize),
streamOnce: sync.Once{},
closeOnce: sync.Once{},
done: make(chan bool),
sleep: time.Sleep,
ctx: ctx,
has: &has,
archive: archive,
tempStore: tempStore,
sequence: sequence,
readChan: make(chan readResult, msrBufferSize),
streamOnce: sync.Once{},
closeOnce: sync.Once{},
done: make(chan bool),
encodingBuffer: xdr.NewEncodingBuffer(),
sleep: time.Sleep,
}, nil
}

Expand Down Expand Up @@ -367,7 +370,8 @@ LoopBucketEntry:
}

// We're using compressed keys here
keyBytes, e := key.MarshalBinaryCompress()
// safe, since we are converting to string right away
keyBytes, e := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key)
if e != nil {
r.readChan <- r.error(
errors.Wrapf(e, "Error marshaling XDR record %d of hash '%s'", n, hash.String()),
Expand Down Expand Up @@ -421,7 +425,8 @@ LoopBucketEntry:
}

// We're using compressed keys here
keyBytes, e := key.MarshalBinaryCompress()
// Safe, since we are converting to string right away
keyBytes, e := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key)
if e != nil {
r.readChan <- r.error(
errors.Wrapf(
Expand Down
23 changes: 6 additions & 17 deletions xdr/account_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,17 @@ func (aid *AccountId) LedgerKey() (ret LedgerKey) {
return
}

// MarshalBinaryCompress marshals AccountId to []byte but unlike
// MarshalBinary() it removes all unnecessary bytes, exploting the fact
// that XDR is padding data to 4 bytes in union discriminants etc.
// It's primary use is in ingest/io.StateReader that keep LedgerKeys in
// memory so this function decrease memory requirements.
//
// Warning, do not use UnmarshalBinary() on data encoded using this method!
func (aid AccountId) MarshalBinaryCompress() ([]byte, error) {
m := []byte{byte(aid.Type)}

func (e *EncodingBuffer) accountIdCompressEncodeTo(aid AccountId) error {
if err := e.xdrEncoderBuf.WriteByte(byte(aid.Type)); err != nil {
return err
}
switch aid.Type {
case PublicKeyTypePublicKeyTypeEd25519:
pk, err := aid.Ed25519.MarshalBinary()
if err != nil {
return nil, err
}
m = append(m, pk...)
_, err := e.xdrEncoderBuf.Write(aid.Ed25519[:])
return err
default:
panic("Unknown type")
}

return m, nil
}

func MustAddress(address string) AccountId {
Expand Down
38 changes: 0 additions & 38 deletions xdr/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,44 +268,6 @@ func (a Asset) StringCanonical() string {
return fmt.Sprintf("%s:%s", c, i)
}

// MarshalBinaryCompress marshals Asset to []byte but unlike
// MarshalBinary() it removes all unnecessary bytes, exploting the fact
// that XDR is padding data to 4 bytes in union discriminants etc.
// It's primary use is in ingest/io.StateReader that keep LedgerKeys in
// memory so this function decrease memory requirements.
//
// Warning, do not use UnmarshalBinary() on data encoded using this method!
func (a Asset) MarshalBinaryCompress() ([]byte, error) {
m := []byte{byte(a.Type)}

var err error
var code []byte
var issuer []byte

switch a.Type {
case AssetTypeAssetTypeNative:
return m, nil
case AssetTypeAssetTypeCreditAlphanum4:
code = []byte(strings.TrimRight(string(a.AlphaNum4.AssetCode[:]), "\x00"))
issuer, err = a.AlphaNum4.Issuer.MarshalBinary()
if err != nil {
return nil, err
}
case AssetTypeAssetTypeCreditAlphanum12:
code = []byte(strings.TrimRight(string(a.AlphaNum12.AssetCode[:]), "\x00"))
issuer, err = a.AlphaNum12.Issuer.MarshalBinary()
if err != nil {
panic(err)
}
default:
panic(fmt.Errorf("Unknown asset type: %v", a.Type))
}

m = append(m, code...)
m = append(m, issuer...)
return m, nil
}

// Equals returns true if `other` is equivalent to `a`
func (a Asset) Equals(other Asset) bool {
if a.Type != other.Type {
Expand Down
4 changes: 2 additions & 2 deletions xdr/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package xdr_test
import (
"testing"

. "github.com/stellar/go/xdr"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

. "github.com/stellar/go/xdr"
)

var _ = Describe("xdr.Asset#Extract()", func() {
Expand Down
25 changes: 6 additions & 19 deletions xdr/claimable_balance_id.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package xdr

// MarshalBinaryCompress marshals ClaimableBalanceId to []byte but unlike
// MarshalBinary() it removes all unnecessary bytes, exploiting the fact
// that XDR is padding data to 4 bytes in union discriminants etc.
// It's primary use is in ingest/io.StateReader that keep LedgerKeys in
// memory so this function decrease memory requirements.
//
// Warning, do not use UnmarshalBinary() on data encoded using this method!
func (cb ClaimableBalanceId) MarshalBinaryCompress() ([]byte, error) {
m := []byte{byte(cb.Type)}

func (e *EncodingBuffer) claimableBalanceCompressEncodeTo(cb ClaimableBalanceId) error {
if err := e.xdrEncoderBuf.WriteByte(byte(cb.Type)); err != nil {
return err
}
switch cb.Type {
case ClaimableBalanceIdTypeClaimableBalanceIdTypeV0:
hash, err := cb.V0.MarshalBinary()
if err != nil {
return nil, err
}
m = append(m, hash...)
// TODO fix before Protocol 18
// case ClaimableBalanceIdTypeClaimableBalanceIdTypeFromPoolRevoke:
_, err := e.xdrEncoderBuf.Write(cb.V0[:])
return err
default:
panic("Unknown type")
}

return m, nil
}
76 changes: 20 additions & 56 deletions xdr/ledger_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package xdr
import (
"encoding/base64"
"fmt"
"strings"
)

// LedgerKey implements the `Keyer` interface
Expand Down Expand Up @@ -107,7 +106,7 @@ func (key *LedgerKey) SetClaimableBalance(balanceID ClaimableBalanceId) error {
return nil
}

// SetL LquidityPool mutates `key` such that it represents the identity of a
// SetLiquidityPool mutates `key` such that it represents the identity of a
// liquidity pool.
func (key *LedgerKey) SetLiquidityPool(poolID PoolId) error {
data := LedgerKeyLiquidityPool{poolID}
Expand All @@ -120,74 +119,39 @@ func (key *LedgerKey) SetLiquidityPool(poolID PoolId) error {
return nil
}

// MarshalBinaryCompress marshals LedgerKey to []byte but unlike
// MarshalBinary() it removes all unnecessary bytes, exploting the fact
// that XDR is padding data to 4 bytes in union discriminants etc.
// It's primary use is in ingest/io.StateReader that keep LedgerKeys in
// memory so this function decrease memory requirements.
//
// Warning, do not use UnmarshalBinary() on data encoded using this method!
//
// Optimizations:
// - Writes a single byte for union discriminants vs 4 bytes.
// - Removes type and code padding for Asset.
func (key LedgerKey) MarshalBinaryCompress() ([]byte, error) {
m := []byte{byte(key.Type)}
func (e *EncodingBuffer) ledgerKeyCompressEncodeTo(key LedgerKey) error {
if err := e.xdrEncoderBuf.WriteByte(byte(key.Type)); err != nil {
return err
}

switch key.Type {
case LedgerEntryTypeAccount:
account, err := key.Account.AccountId.MarshalBinaryCompress()
if err != nil {
return nil, err
}
m = append(m, account...)
return e.accountIdCompressEncodeTo(key.Account.AccountId)
case LedgerEntryTypeTrustline:
account, err := key.TrustLine.AccountId.MarshalBinaryCompress()
if err != nil {
return nil, err
if err := e.accountIdCompressEncodeTo(key.TrustLine.AccountId); err != nil {
return err
}
m = append(m, account...)
asset, err := key.TrustLine.Asset.MarshalBinaryCompress()
if err != nil {
return nil, err
}
m = append(m, asset...)
return e.assetTrustlineCompressEncodeTo(key.TrustLine.Asset)
case LedgerEntryTypeOffer:
seller, err := key.Offer.SellerId.MarshalBinaryCompress()
if err != nil {
return nil, err
}
m = append(m, seller...)
offer, err := key.Offer.OfferId.MarshalBinary()
if err != nil {
return nil, err
}
m = append(m, offer...)
// We intentionally don't encode the SellerID since the OfferID is enough
// (it's unique to the network)
return key.Offer.OfferId.EncodeTo(e.encoder)
case LedgerEntryTypeData:
account, err := key.Data.AccountId.MarshalBinaryCompress()
if err != nil {
return nil, err
if err := e.accountIdCompressEncodeTo(key.Data.AccountId); err != nil {
return err
}
m = append(m, account...)
dataName := []byte(strings.TrimRight(string(key.Data.DataName), "\x00"))
m = append(m, dataName...)
dataName := trimRightZeros([]byte(key.Data.DataName))
_, err := e.xdrEncoderBuf.Write(dataName)
return err
case LedgerEntryTypeClaimableBalance:
cBalance, err := key.ClaimableBalance.BalanceId.MarshalBinaryCompress()
if err != nil {
return nil, err
}
m = append(m, cBalance...)
return e.claimableBalanceCompressEncodeTo(key.ClaimableBalance.BalanceId)
case LedgerEntryTypeLiquidityPool:
cBalance, err := key.LiquidityPool.LiquidityPoolId.MarshalBinary()
if err != nil {
return nil, err
}
m = append(m, cBalance...)
_, err := e.xdrEncoderBuf.Write(key.LiquidityPool.LiquidityPoolId[:])
return err
default:
panic("Unknown type")
}

return m, nil
}

// MarshalBinaryBase64 marshals XDR into a binary form and then encodes it
Expand Down
26 changes: 21 additions & 5 deletions xdr/ledger_key_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package xdr_test
package xdr

import (
"encoding/base64"
"testing"

"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLedgerKeyTrustLineBinaryMaxLength(t *testing.T) {
key := &xdr.LedgerKey{}
key := &LedgerKey{}
err := key.SetTrustline(
xdr.MustAddress("GBFLTCDLOE6YQ74B66RH3S2UW5I2MKZ5VLTM75F4YMIWUIXRIFVNRNIF"),
xdr.MustNewCreditAsset("123456789012", "GBFLTCDLOE6YQ74B66RH3S2UW5I2MKZ5VLTM75F4YMIWUIXRIFVNRNIF").ToTrustLineAsset(),
MustAddress("GBFLTCDLOE6YQ74B66RH3S2UW5I2MKZ5VLTM75F4YMIWUIXRIFVNRNIF"),
MustNewCreditAsset("123456789012", "GBFLTCDLOE6YQ74B66RH3S2UW5I2MKZ5VLTM75F4YMIWUIXRIFVNRNIF").ToTrustLineAsset(),
)
assert.NoError(t, err)

Expand All @@ -22,3 +22,19 @@ func TestLedgerKeyTrustLineBinaryMaxLength(t *testing.T) {
bcompressed := base64.StdEncoding.EncodeToString(compressed)
assert.Equal(t, len(bcompressed), 124)
}

func TestTrimRightZeros(t *testing.T) {
require.Equal(t, []byte(nil), trimRightZeros(nil))
require.Equal(t, []byte{}, trimRightZeros([]byte{}))
require.Equal(t, []byte{}, trimRightZeros([]byte{0x0}))
require.Equal(t, []byte{}, trimRightZeros([]byte{0x0, 0x0}))
require.Equal(t, []byte{0x1}, trimRightZeros([]byte{0x1}))
require.Equal(t, []byte{0x1}, trimRightZeros([]byte{0x1, 0x0}))
require.Equal(t, []byte{0x1}, trimRightZeros([]byte{0x1, 0x0, 0x0}))
require.Equal(t, []byte{0x1}, trimRightZeros([]byte{0x1, 0x0, 0x0, 0x0}))
require.Equal(t, []byte{0x1, 0x2}, trimRightZeros([]byte{0x1, 0x2}))
require.Equal(t, []byte{0x1, 0x2}, trimRightZeros([]byte{0x1, 0x2, 0x0}))
require.Equal(t, []byte{0x1, 0x2}, trimRightZeros([]byte{0x1, 0x2, 0x0, 0x0}))
require.Equal(t, []byte{0x0, 0x2}, trimRightZeros([]byte{0x0, 0x2, 0x0, 0x0}))
require.Equal(t, []byte{0x0, 0x2, 0x0, 0x1}, trimRightZeros([]byte{0x0, 0x2, 0x0, 0x1, 0x0}))
}
Loading

0 comments on commit 2fcd985

Please sign in to comment.