Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

soroban-rpc: getLedgerEntries: query multiple ledger entries at once #896

Merged
merged 8 commits into from
Aug 25, 2023
Merged
160 changes: 102 additions & 58 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"encoding/base64"
"encoding/hex"
"fmt"

sq "github.com/Masterminds/squirrel"
Expand All @@ -24,9 +23,14 @@ type LedgerEntryReader interface {
NewCachedTx(ctx context.Context) (LedgerEntryReadTx, error)
}

type LedgerKeyAndEntry struct {
Key xdr.LedgerKey
Entry xdr.LedgerEntry
}

type LedgerEntryReadTx interface {
GetLatestLedgerSequence() (uint32, error)
GetLedgerEntry(key xdr.LedgerKey, includeExpired bool) (bool, xdr.LedgerEntry, error)
GetLedgerEntries(includeExpired bool, keys ...xdr.LedgerKey) ([]LedgerKeyAndEntry, error)
Done() error
}

Expand Down Expand Up @@ -196,84 +200,124 @@ func (l *ledgerEntryReadTx) GetLatestLedgerSequence() (uint32, error) {
return latestLedgerSeq, err
}

func (l *ledgerEntryReadTx) getBinaryLedgerEntry(key xdr.LedgerKey) (bool, string, error) {
encodedKey, err := encodeLedgerKey(l.buffer, key)
if err != nil {
return false, "", err
}

// From compressed XDR keys to XDR entries (i.e. using the DB's representation)
func (l *ledgerEntryReadTx) getLedgerRawLedgerEntries(keys ...string) (map[string]string, error) {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
2opremio marked this conversation as resolved.
Show resolved Hide resolved
result := make(map[string]string, len(keys))
2opremio marked this conversation as resolved.
Show resolved Hide resolved
keysToQueryInDB := keys
if l.ledgerEntryCacheReadTx != nil {
entry, ok := l.ledgerEntryCacheReadTx.get(encodedKey)
if ok {
if entry != nil {
return true, *entry, nil
keysToQueryInDB = make([]string, 0, len(keys))
for _, k := range keys {
entry, ok := l.ledgerEntryCacheReadTx.get(k)
if !ok || entry == nil {
keysToQueryInDB = append(keysToQueryInDB, k)
2opremio marked this conversation as resolved.
Show resolved Hide resolved
} else {
return false, "", nil
result[k] = *entry
}
}
}

sql := sq.Select("entry").From(ledgerEntriesTableName).Where(sq.Eq{"key": encodedKey})
var results []string
if err = l.tx.Select(context.Background(), &results, sql); err != nil {
return false, "", err
if len(keysToQueryInDB) == 0 {
return result, nil
}
switch len(results) {
case 0:

sql := sq.Select("key", "entry").From(ledgerEntriesTableName).Where(sq.Eq{"key": keysToQueryInDB})
type dbLedgerKeyEntry struct {
Key string `db:"key"`
Entry string `db:"entry"`
}
var dbResults []dbLedgerKeyEntry
if err := l.tx.Select(context.Background(), &dbResults, sql); err != nil {
return result, err
}

for _, r := range dbResults {
result[r.Key] = r.Entry
if l.ledgerEntryCacheReadTx != nil {
l.ledgerEntryCacheReadTx.upsert(encodedKey, nil)
entry := r.Entry
l.ledgerEntryCacheReadTx.upsert(r.Key, &entry)

// Add missing config setting entries to the top cache.
// Otherwise, the write-through cache won't get updated on restarts
// (after which we don't process past config setting updates)
keyType, err := xdr.GetBinaryCompressedLedgerKeyType([]byte(r.Key))
if err != nil {
return nil, err
}
if keyType == xdr.LedgerEntryTypeConfigSetting {
l.db.ledgerEntryCacheMutex.Lock()
// Only udpate the cache if the entry is missing, otherwise
// we may end up overwriting the entry with an older version
if _, ok := l.db.ledgerEntryCache.entries[r.Key]; !ok {
l.db.ledgerEntryCache.entries[r.Key] = r.Entry
}
defer l.db.ledgerEntryCacheMutex.Unlock()
}
}
return false, "", nil
}
return result, nil
}

func GetLedgerEntry(tx LedgerEntryReadTx, includeExpired bool, key xdr.LedgerKey) (bool, xdr.LedgerEntry, error) {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
keyEntries, err := tx.GetLedgerEntries(includeExpired, key)
if err != nil {
return false, xdr.LedgerEntry{}, nil
}
switch len(keyEntries) {
case 0:
return false, xdr.LedgerEntry{}, nil
case 1:
// expected length
result := results[0]
if l.ledgerEntryCacheReadTx != nil {
l.ledgerEntryCacheReadTx.upsert(encodedKey, &result)
}
// Add missing config setting entries to the top cache.
// Otherwise, the write-through cache won't get updated on restarts
// (after which don't process past config setting updates)
if key.Type == xdr.LedgerEntryTypeConfigSetting {
l.db.ledgerEntryCacheMutex.Lock()
// Only udpate the cache if the entry is missing, otherwise
// we may end up overwriting the entry with an older version
if _, ok := l.db.ledgerEntryCache.entries[encodedKey]; !ok {
l.db.ledgerEntryCache.entries[encodedKey] = result
}
defer l.db.ledgerEntryCacheMutex.Unlock()
}
return true, result, nil
return true, keyEntries[0].Entry, nil
default:
return false, "", fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), hex.EncodeToString([]byte(encodedKey)), ledgerEntriesTableName)
return false, xdr.LedgerEntry{}, fmt.Errorf("multiple entries (%d) for key %v", len(keyEntries), key)
}
}

func (l *ledgerEntryReadTx) GetLedgerEntry(key xdr.LedgerKey, includeExpired bool) (bool, xdr.LedgerEntry, error) {
found, ledgerEntryBin, err := l.getBinaryLedgerEntry(key)
if err != nil || !found {
return found, xdr.LedgerEntry{}, err
func (l *ledgerEntryReadTx) GetLedgerEntries(includeExpired bool, keys ...xdr.LedgerKey) ([]LedgerKeyAndEntry, error) {
encodedKeys := make([]string, len(keys))
encodedKeyToKey := make(map[string]xdr.LedgerKey, len(keys))
for i, k := range keys {
encodedKey, err := encodeLedgerKey(l.buffer, k)
if err != nil {
return nil, err
}
encodedKeys[i] = encodedKey
encodedKeyToKey[encodedKey] = k
}
var result xdr.LedgerEntry
if err := xdr.SafeUnmarshal([]byte(ledgerEntryBin), &result); err != nil {
return false, xdr.LedgerEntry{}, err

rawResult, err := l.getLedgerRawLedgerEntries(encodedKeys...)
if err != nil {
return nil, err
}

// Disallow access to entries that have expired. Expiration excludes the
// "current" ledger, which we are building.
if !includeExpired {
if expirationLedgerSeq, ok := result.Data.ExpirationLedgerSeq(); ok {
latestClosedLedger, err := l.GetLatestLedgerSequence()
if err != nil {
return false, xdr.LedgerEntry{}, err
}
currentLedger := latestClosedLedger + 1
if expirationLedgerSeq < xdr.Uint32(currentLedger) {
return false, xdr.LedgerEntry{}, nil
result := make([]LedgerKeyAndEntry, 0, len(rawResult))
for encodedKey, key := range encodedKeyToKey {
encodedEntry, ok := rawResult[encodedKey]
if !ok {
continue
}
var entry xdr.LedgerEntry
if err := xdr.SafeUnmarshal([]byte(encodedEntry), &entry); err != nil {
return nil, err
2opremio marked this conversation as resolved.
Show resolved Hide resolved
}
if !includeExpired {
// Disallow access to entries that have expired. Expiration excludes the
// "current" ledger, which we are building.
if expirationLedgerSeq, ok := entry.Data.ExpirationLedgerSeq(); ok {
latestClosedLedger, err := l.GetLatestLedgerSequence()
if err != nil {
return nil, err
}
currentLedger := latestClosedLedger + 1
if expirationLedgerSeq < xdr.Uint32(currentLedger) {
continue
}
}
}
result = append(result, LedgerKeyAndEntry{key, entry})
}

return true, result, nil
return result, nil
}

func (l ledgerEntryReadTx) Done() error {
Expand Down
73 changes: 64 additions & 9 deletions cmd/soroban-rpc/internal/db/ledgerentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func getLedgerEntryAndLatestLedgerSequenceWithErr(db *DB, key xdr.LedgerKey) (bo
return false, xdr.LedgerEntry{}, 0, err
}

present, entry, err := tx.GetLedgerEntry(key, false)
present, entry, err := GetLedgerEntry(tx, false, key)
if err != nil {
return false, xdr.LedgerEntry{}, 0, err
}
Expand Down Expand Up @@ -504,14 +504,14 @@ func TestReadTxsDuringWriteTx(t *testing.T) {

_, err = readTx1.GetLatestLedgerSequence()
assert.Equal(t, ErrEmptyDB, err)
present, _, err := readTx1.GetLedgerEntry(key, false)
present, _, err := GetLedgerEntry(readTx1, false, key)
assert.NoError(t, err)
assert.False(t, present)
assert.NoError(t, readTx1.Done())

_, err = readTx2.GetLatestLedgerSequence()
assert.Equal(t, ErrEmptyDB, err)
present, _, err = readTx2.GetLedgerEntry(key, false)
present, _, err = GetLedgerEntry(readTx2, false, key)
assert.NoError(t, err)
assert.False(t, present)
assert.NoError(t, readTx2.Done())
Expand Down Expand Up @@ -588,7 +588,7 @@ func TestWriteTxsDuringReadTxs(t *testing.T) {
for _, readTx := range []LedgerEntryReadTx{readTx1, readTx2, readTx3} {
_, err = readTx.GetLatestLedgerSequence()
assert.Equal(t, ErrEmptyDB, err)
present, _, err := readTx.GetLedgerEntry(key, false)
present, _, err := GetLedgerEntry(readTx, false, key)
assert.NoError(t, err)
assert.False(t, present)
}
Expand All @@ -600,7 +600,7 @@ func TestWriteTxsDuringReadTxs(t *testing.T) {
for _, readTx := range []LedgerEntryReadTx{readTx1, readTx2, readTx3} {
_, err = readTx.GetLatestLedgerSequence()
assert.Equal(t, ErrEmptyDB, err)
present, _, err := readTx.GetLedgerEntry(key, false)
present, _, err := GetLedgerEntry(readTx, false, key)
assert.NoError(t, err)
assert.False(t, present)
}
Expand Down Expand Up @@ -736,6 +736,65 @@ forloop:

}

func benchmarkLedgerEntry(b *testing.B, cached bool, includeExpired bool) {
db := NewTestDB(b)
keyUint32 := xdr.Uint32(0)
data := xdr.ContractDataEntry{
Contract: xdr.ScAddress{
Type: xdr.ScAddressTypeScAddressTypeContract,
ContractId: &xdr.Hash{0xca, 0xfe},
},
Key: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &keyUint32,
},
Durability: xdr.ContractDataDurabilityPersistent,
Body: xdr.ContractDataEntryBody{
BodyType: xdr.ContractEntryBodyTypeDataEntry,
Data: &xdr.ContractDataEntryData{
Val: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &keyUint32,
},
},
},
ExpirationLedgerSeq: math.MaxUint32,
}
key, entry := getContractDataLedgerEntry(b, data)
tx, err := NewReadWriter(db, 150, 15).NewTx(context.Background())
assert.NoError(b, err)
assert.NoError(b, tx.LedgerEntryWriter().UpsertLedgerEntry(entry))
assert.NoError(b, tx.Commit(2))
reader := NewLedgerEntryReader(db)
const numQueriesPerOp = 15
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
var readTx LedgerEntryReadTx
var err error
if cached {
readTx, err = reader.NewCachedTx(context.Background())
} else {
readTx, err = reader.NewTx(context.Background())
}
assert.NoError(b, err)
for i := 0; i < numQueriesPerOp; i++ {
b.StartTimer()
found, _, err := GetLedgerEntry(readTx, includeExpired, key)
b.StopTimer()
assert.NoError(b, err)
assert.True(b, found)
}
assert.NoError(b, readTx.Done())
}
}

func BenchmarkGetLedgerEntry(b *testing.B) {
b.Run("With cache, include expired", func(b *testing.B) { benchmarkLedgerEntry(b, true, true) })
b.Run("With cache, exclude expired", func(b *testing.B) { benchmarkLedgerEntry(b, true, false) })
b.Run("Without cache, exclude expired", func(b *testing.B) { benchmarkLedgerEntry(b, false, false) })
}

func BenchmarkLedgerUpdate(b *testing.B) {
db := NewTestDB(b)
keyUint32 := xdr.Uint32(0)
Expand Down Expand Up @@ -773,7 +832,6 @@ func BenchmarkLedgerUpdate(b *testing.B) {
}
assert.NoError(b, tx.Commit(uint32(i+1)))
}
b.StopTimer()
}

func NewTestDB(tb testing.TB) *DB {
Expand All @@ -783,9 +841,6 @@ func NewTestDB(tb testing.TB) *DB {
if err != nil {
assert.NoError(tb, db.Close())
}
var ver []string
assert.NoError(tb, db.SelectRaw(context.Background(), &ver, "SELECT sqlite_version()"))
tb.Logf("using sqlite version: %v", ver)
tb.Cleanup(func() {
assert.NoError(tb, db.Close())
})
Expand Down
4 changes: 2 additions & 2 deletions cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (entryReaderTx ConstantLedgerEntryReaderTx) GetLatestLedgerSequence() (uint
return expectedLatestLedgerSequence, nil
}

func (entryReaderTx ConstantLedgerEntryReaderTx) GetLedgerEntry(key xdr.LedgerKey, includeExpired bool) (bool, xdr.LedgerEntry, error) {
return false, xdr.LedgerEntry{}, nil
func (entryReaderTx ConstantLedgerEntryReaderTx) GetLedgerEntries(includeExpired bool, keys ...xdr.LedgerKey) ([]db.LedgerKeyAndEntry, error) {
return nil, nil
}

func (entryReaderTx ConstantLedgerEntryReaderTx) Done() error {
Expand Down
32 changes: 14 additions & 18 deletions cmd/soroban-rpc/internal/methods/get_ledger_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,32 @@ func NewGetLedgerEntriesHandler(logger *log.Entry, ledgerEntryReader db.LedgerEn
}
}

var ledgerEntryResults []LedgerEntryResult
for i, ledgerKey := range ledgerKeys {
present, ledgerEntry, err := tx.GetLedgerEntry(ledgerKey, false)
if err != nil {
logger.WithError(err).WithField("request", request).
Infof("could not obtain ledger entry %v at index %d from storage", ledgerKey, i)
return GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: fmt.Sprintf("could not obtain ledger entry %v at index %d from storage", ledgerKey, i),
}
}

if !present {
continue
ledgerEntryResults := make([]LedgerEntryResult, 0, len(ledgerKeys))
2opremio marked this conversation as resolved.
Show resolved Hide resolved
ledgerKeysAndEntries, err := tx.GetLedgerEntries(false, ledgerKeys...)
if err != nil {
logger.WithError(err).WithField("request", request).
Info("could not obtain ledger entryies from storage")
return GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: "could not obtain ledger entryies from storage",
}
}

ledgerXDR, err := xdr.MarshalBase64(ledgerEntry.Data)
for i, ledgerKeyAndEntry := range ledgerKeysAndEntries {
ledgerXDR, err := xdr.MarshalBase64(ledgerKeyAndEntry.Entry.Data)
if err != nil {
logger.WithError(err).WithField("request", request).
Infof("could not serialize ledger entry data for ledger %v at index %d", ledgerEntry, i)
Infof("could not serialize ledger entry data for ledger entry %v", ledgerKeyAndEntry.Entry)
return GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: fmt.Sprintf("could not serialize ledger entry data for ledger %v at index %d", ledgerEntry, i),
Message: fmt.Sprintf("could not serialize ledger entry data for ledger entry %v", ledgerKeyAndEntry.Entry),
}
}

ledgerEntryResults = append(ledgerEntryResults, LedgerEntryResult{
Key: request.Keys[i],
XDR: ledgerXDR,
LastModifiedLedger: int64(ledgerEntry.LastModifiedLedgerSeq),
LastModifiedLedger: int64(ledgerKeyAndEntry.Entry.LastModifiedLedgerSeq),
})
}

Expand Down
Loading