diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 68efba972..ad9460087 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -19,6 +19,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" supporthttp "github.com/stellar/go/support/http" supportlog "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config" @@ -182,35 +183,34 @@ func MustNew(cfg *config.Config) *Daemon { cfg.TransactionLedgerRetentionWindow, ) - maxRetentionWindow := cfg.EventLedgerRetentionWindow - if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow { - maxRetentionWindow = cfg.TransactionLedgerRetentionWindow - } else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow { - maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow - } - // initialize the stores using what was on the DB readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) defer cancelReadTxMeta() - txmetas, err := db.NewLedgerReader(dbConn).GetAllLedgers(readTxMetaCtx) - if err != nil { - logger.WithError(err).Fatal("could not obtain txmeta cache from the database") - } - for _, txmeta := range txmetas { - // NOTE: We could optimize this to avoid unnecessary ingestion calls - // (len(txmetas) can be larger than the store retention windows) - // but it's probably not worth the pain. + // NOTE: We could optimize this to avoid unnecessary ingestion calls + // (the range of txmetads can be larger than the store retention windows) + // but it's probably not worth the pain. + err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error { if err := eventStore.IngestEvents(txmeta); err != nil { - logger.WithError(err).Fatal("could initialize event memory store") + logger.WithError(err).Fatal("could not initialize event memory store") } if err := transactionStore.IngestTransactions(txmeta); err != nil { - logger.WithError(err).Fatal("could initialize transaction memory store") + logger.WithError(err).Fatal("could not initialize transaction memory store") } + return nil + }) + if err != nil { + logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } onIngestionRetry := func(err error, dur time.Duration) { logger.WithError(err).Error("could not run ingestion. Retrying") } + maxRetentionWindow := cfg.EventLedgerRetentionWindow + if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow { + maxRetentionWindow = cfg.TransactionLedgerRetentionWindow + } else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow { + maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow + } ingestService := ingest.NewService(ingest.Config{ Logger: logger, DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, maxRetentionWindow), diff --git a/cmd/soroban-rpc/internal/db/ledger.go b/cmd/soroban-rpc/internal/db/ledger.go index 2c40ff093..1b4b0aa24 100644 --- a/cmd/soroban-rpc/internal/db/ledger.go +++ b/cmd/soroban-rpc/internal/db/ledger.go @@ -13,9 +13,11 @@ const ( ledgerCloseMetaTableName = "ledger_close_meta" ) +type StreamLedgerFn func(xdr.LedgerCloseMeta) error + type LedgerReader interface { GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) - GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) + StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error } type LedgerWriter interface { @@ -30,12 +32,24 @@ func NewLedgerReader(db *DB) LedgerReader { return ledgerReader{db: db} } -// GetAllLedgers returns all ledgers in the database. -func (r ledgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) { - var results []xdr.LedgerCloseMeta +// StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done). +func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error { sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc") - err := r.db.Select(ctx, &results, sql) - return results, err + q, err := r.db.Query(ctx, sql) + if err != nil { + return err + } + defer q.Close() + for q.Next() { + var closeMeta xdr.LedgerCloseMeta + if err = q.Scan(&closeMeta); err != nil { + return err + } + if err = f(closeMeta); err != nil { + return err + } + } + return nil } // GetLedger fetches a single ledger from the db. diff --git a/cmd/soroban-rpc/internal/db/ledger_test.go b/cmd/soroban-rpc/internal/db/ledger_test.go index 906ad9ff1..362f33653 100644 --- a/cmd/soroban-rpc/internal/db/ledger_test.go +++ b/cmd/soroban-rpc/internal/db/ledger_test.go @@ -28,7 +28,11 @@ func createLedger(ledgerSequence uint32) xdr.LedgerCloseMeta { } func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) { - allLedgers, err := reader.GetAllLedgers(context.Background()) + var allLedgers []xdr.LedgerCloseMeta + err := reader.StreamAllLedgers(context.Background(), func(txmeta xdr.LedgerCloseMeta) error { + allLedgers = append(allLedgers, txmeta) + return nil + }) assert.NoError(t, err) for i := start - 1; i <= end+1; i++ { ledger, exists, err := reader.GetLedger(context.Background(), i) diff --git a/cmd/soroban-rpc/internal/db/ledgerentry.go b/cmd/soroban-rpc/internal/db/ledgerentry.go index 0c650a561..6363e2518 100644 --- a/cmd/soroban-rpc/internal/db/ledgerentry.go +++ b/cmd/soroban-rpc/internal/db/ledgerentry.go @@ -233,6 +233,7 @@ func (l *ledgerEntryReadTx) getRawLedgerEntries(keys ...string) (map[string]stri if err != nil { return nil, err } + defer q.Close() for q.Next() { var key, entry string if err = q.Scan(&key, &entry); err != nil { diff --git a/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go b/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go index 08b9dc9d1..0d447e716 100644 --- a/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go +++ b/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go @@ -7,7 +7,7 @@ import ( // LedgerBucketWindow is a sequence of buckets associated to a ledger window. type LedgerBucketWindow[T any] struct { // buckets is a circular buffer where each cell represents - // all events occurring within a specific ledger. + // the content stored for a specific ledger. buckets []LedgerBucket[T] // start is the index of the head in the circular buffer. start uint32 diff --git a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go index a57ad4fec..1beb15b0a 100644 --- a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go +++ b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go @@ -54,8 +54,8 @@ func (ledgerReader *ConstantLedgerReader) GetLedger(ctx context.Context, sequenc return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil } -func (ledgerReader *ConstantLedgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) { - return []xdr.LedgerCloseMeta{createLedger(expectedLatestLedgerSequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes)}, nil +func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(ctx context.Context, f db.StreamLedgerFn) error { + return nil } func createLedger(ledgerSequence uint32, protocolVersion uint32, hash byte) xdr.LedgerCloseMeta {