Skip to content

Commit

Permalink
soroban-rpc: Stream ledgers on initialization (#904)
Browse files Browse the repository at this point in the history
This halves the startup ledger memory requirements (since we don't no longer
need to store all the ledgers in memory before initializing the stores).
  • Loading branch information
2opremio authored Aug 28, 2023
1 parent e4885d7 commit 0d3fbcb
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 27 deletions.
34 changes: 17 additions & 17 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config"
Expand Down Expand Up @@ -182,35 +183,34 @@ func MustNew(cfg *config.Config) *Daemon {
cfg.TransactionLedgerRetentionWindow,
)

maxRetentionWindow := cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow {
maxRetentionWindow = cfg.TransactionLedgerRetentionWindow
} else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow {
maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow
}

// initialize the stores using what was on the DB
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
txmetas, err := db.NewLedgerReader(dbConn).GetAllLedgers(readTxMetaCtx)
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
for _, txmeta := range txmetas {
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (len(txmetas) can be larger than the store retention windows)
// but it's probably not worth the pain.
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetads can be larger than the store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
if err := eventStore.IngestEvents(txmeta); err != nil {
logger.WithError(err).Fatal("could initialize event memory store")
logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := transactionStore.IngestTransactions(txmeta); err != nil {
logger.WithError(err).Fatal("could initialize transaction memory store")
logger.WithError(err).Fatal("could not initialize transaction memory store")
}
return nil
})
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
}
maxRetentionWindow := cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow {
maxRetentionWindow = cfg.TransactionLedgerRetentionWindow
} else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow {
maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow
}
ingestService := ingest.NewService(ingest.Config{
Logger: logger,
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, maxRetentionWindow),
Expand Down
26 changes: 20 additions & 6 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ const (
ledgerCloseMetaTableName = "ledger_close_meta"
)

type StreamLedgerFn func(xdr.LedgerCloseMeta) error

type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
}

type LedgerWriter interface {
Expand All @@ -30,12 +32,24 @@ func NewLedgerReader(db *DB) LedgerReader {
return ledgerReader{db: db}
}

// GetAllLedgers returns all ledgers in the database.
func (r ledgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) {
var results []xdr.LedgerCloseMeta
// StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done).
func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error {
sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc")
err := r.db.Select(ctx, &results, sql)
return results, err
q, err := r.db.Query(ctx, sql)
if err != nil {
return err
}
defer q.Close()
for q.Next() {
var closeMeta xdr.LedgerCloseMeta
if err = q.Scan(&closeMeta); err != nil {
return err
}
if err = f(closeMeta); err != nil {
return err
}
}
return nil
}

// GetLedger fetches a single ledger from the db.
Expand Down
6 changes: 5 additions & 1 deletion cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func createLedger(ledgerSequence uint32) xdr.LedgerCloseMeta {
}

func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) {
allLedgers, err := reader.GetAllLedgers(context.Background())
var allLedgers []xdr.LedgerCloseMeta
err := reader.StreamAllLedgers(context.Background(), func(txmeta xdr.LedgerCloseMeta) error {
allLedgers = append(allLedgers, txmeta)
return nil
})
assert.NoError(t, err)
for i := start - 1; i <= end+1; i++ {
ledger, exists, err := reader.GetLedger(context.Background(), i)
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (l *ledgerEntryReadTx) getRawLedgerEntries(keys ...string) (map[string]stri
if err != nil {
return nil, err
}
defer q.Close()
for q.Next() {
var key, entry string
if err = q.Scan(&key, &entry); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// LedgerBucketWindow is a sequence of buckets associated to a ledger window.
type LedgerBucketWindow[T any] struct {
// buckets is a circular buffer where each cell represents
// all events occurring within a specific ledger.
// the content stored for a specific ledger.
buckets []LedgerBucket[T]
// start is the index of the head in the circular buffer.
start uint32
Expand Down
4 changes: 2 additions & 2 deletions cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (ledgerReader *ConstantLedgerReader) GetLedger(ctx context.Context, sequenc
return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil
}

func (ledgerReader *ConstantLedgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) {
return []xdr.LedgerCloseMeta{createLedger(expectedLatestLedgerSequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes)}, nil
func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(ctx context.Context, f db.StreamLedgerFn) error {
return nil
}

func createLedger(ledgerSequence uint32, protocolVersion uint32, hash byte) xdr.LedgerCloseMeta {
Expand Down

0 comments on commit 0d3fbcb

Please sign in to comment.