Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

soroban-rpc: Stream ledgers on initialization #904

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config"
Expand Down Expand Up @@ -182,35 +183,34 @@ func MustNew(cfg *config.Config) *Daemon {
cfg.TransactionLedgerRetentionWindow,
)

maxRetentionWindow := cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow {
maxRetentionWindow = cfg.TransactionLedgerRetentionWindow
} else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow {
maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow
}

// initialize the stores using what was on the DB
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
txmetas, err := db.NewLedgerReader(dbConn).GetAllLedgers(readTxMetaCtx)
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
for _, txmeta := range txmetas {
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (len(txmetas) can be larger than the store retention windows)
// but it's probably not worth the pain.
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetads can be larger than the store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) (bool, error) {
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
if err := eventStore.IngestEvents(txmeta); err != nil {
logger.WithError(err).Fatal("could initialize event memory store")
logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := transactionStore.IngestTransactions(txmeta); err != nil {
logger.WithError(err).Fatal("could initialize transaction memory store")
logger.WithError(err).Fatal("could not initialize transaction memory store")
}
return false, nil
})
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
}
maxRetentionWindow := cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow {
maxRetentionWindow = cfg.TransactionLedgerRetentionWindow
} else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow {
maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow
}
ingestService := ingest.NewService(ingest.Config{
Logger: logger,
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, maxRetentionWindow),
Expand Down
29 changes: 23 additions & 6 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ const (
ledgerCloseMetaTableName = "ledger_close_meta"
)

type StreamLedgerFn func(xdr.LedgerCloseMeta) (done bool, err error)

type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
}

type LedgerWriter interface {
Expand All @@ -30,12 +32,27 @@ func NewLedgerReader(db *DB) LedgerReader {
return ledgerReader{db: db}
}

// GetAllLedgers returns all ledgers in the database.
func (r ledgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) {
var results []xdr.LedgerCloseMeta
// StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done).
func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc")
err := r.db.Select(ctx, &results, sql)
return results, err
q, err := r.db.Query(ctx, sql)
if err != nil {
return err
}
var closeMeta xdr.LedgerCloseMeta
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
for q.Next() {
if err = q.Scan(&closeMeta); err != nil {
return err
}
done, err := f(closeMeta)
if err != nil {
return err
}
if done {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
}
return nil
}

// GetLedger fetches a single ledger from the db.
Expand Down
6 changes: 5 additions & 1 deletion cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func createLedger(ledgerSequence uint32) xdr.LedgerCloseMeta {
}

func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) {
allLedgers, err := reader.GetAllLedgers(context.Background())
var allLedgers []xdr.LedgerCloseMeta
err := reader.StreamAllLedgers(context.Background(), func(txmeta xdr.LedgerCloseMeta) (done bool, err error) {
allLedgers = append(allLedgers, txmeta)
return false, nil
})
assert.NoError(t, err)
for i := start - 1; i <= end+1; i++ {
ledger, exists, err := reader.GetLedger(context.Background(), i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// LedgerBucketWindow is a sequence of buckets associated to a ledger window.
type LedgerBucketWindow[T any] struct {
// buckets is a circular buffer where each cell represents
// all events occurring within a specific ledger.
// the content stored for a specific ledger.
buckets []LedgerBucket[T]
// start is the index of the head in the circular buffer.
start uint32
Expand Down
4 changes: 2 additions & 2 deletions cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (ledgerReader *ConstantLedgerReader) GetLedger(ctx context.Context, sequenc
return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil
}

func (ledgerReader *ConstantLedgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) {
return []xdr.LedgerCloseMeta{createLedger(expectedLatestLedgerSequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes)}, nil
func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(ctx context.Context, f db.StreamLedgerFn) error {
return nil
}

func createLedger(ledgerSequence uint32, protocolVersion uint32, hash byte) xdr.LedgerCloseMeta {
Expand Down
Loading