Skip to content

Commit

Permalink
Merge branch 'main' into remove-set-from-spec
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Aug 29, 2023
2 parents 98375e7 + f7de99a commit 71df30d
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 27 deletions.
34 changes: 17 additions & 17 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config"
Expand Down Expand Up @@ -182,35 +183,34 @@ func MustNew(cfg *config.Config) *Daemon {
cfg.TransactionLedgerRetentionWindow,
)

maxRetentionWindow := cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow {
maxRetentionWindow = cfg.TransactionLedgerRetentionWindow
} else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow {
maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow
}

// initialize the stores using what was on the DB
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
txmetas, err := db.NewLedgerReader(dbConn).GetAllLedgers(readTxMetaCtx)
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
for _, txmeta := range txmetas {
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (len(txmetas) can be larger than the store retention windows)
// but it's probably not worth the pain.
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetads can be larger than the store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
if err := eventStore.IngestEvents(txmeta); err != nil {
logger.WithError(err).Fatal("could initialize event memory store")
logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := transactionStore.IngestTransactions(txmeta); err != nil {
logger.WithError(err).Fatal("could initialize transaction memory store")
logger.WithError(err).Fatal("could not initialize transaction memory store")
}
return nil
})
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
}
maxRetentionWindow := cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > maxRetentionWindow {
maxRetentionWindow = cfg.TransactionLedgerRetentionWindow
} else if cfg.EventLedgerRetentionWindow == 0 && cfg.TransactionLedgerRetentionWindow > ledgerbucketwindow.DefaultEventLedgerRetentionWindow {
maxRetentionWindow = ledgerbucketwindow.DefaultEventLedgerRetentionWindow
}
ingestService := ingest.NewService(ingest.Config{
Logger: logger,
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, maxRetentionWindow),
Expand Down
26 changes: 20 additions & 6 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ const (
ledgerCloseMetaTableName = "ledger_close_meta"
)

type StreamLedgerFn func(xdr.LedgerCloseMeta) error

type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
}

type LedgerWriter interface {
Expand All @@ -30,12 +32,24 @@ func NewLedgerReader(db *DB) LedgerReader {
return ledgerReader{db: db}
}

// GetAllLedgers returns all ledgers in the database.
func (r ledgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) {
var results []xdr.LedgerCloseMeta
// StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done).
func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error {
sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc")
err := r.db.Select(ctx, &results, sql)
return results, err
q, err := r.db.Query(ctx, sql)
if err != nil {
return err
}
defer q.Close()
for q.Next() {
var closeMeta xdr.LedgerCloseMeta
if err = q.Scan(&closeMeta); err != nil {
return err
}
if err = f(closeMeta); err != nil {
return err
}
}
return nil
}

// GetLedger fetches a single ledger from the db.
Expand Down
6 changes: 5 additions & 1 deletion cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func createLedger(ledgerSequence uint32) xdr.LedgerCloseMeta {
}

func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) {
allLedgers, err := reader.GetAllLedgers(context.Background())
var allLedgers []xdr.LedgerCloseMeta
err := reader.StreamAllLedgers(context.Background(), func(txmeta xdr.LedgerCloseMeta) error {
allLedgers = append(allLedgers, txmeta)
return nil
})
assert.NoError(t, err)
for i := start - 1; i <= end+1; i++ {
ledger, exists, err := reader.GetLedger(context.Background(), i)
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (l *ledgerEntryReadTx) getRawLedgerEntries(keys ...string) (map[string]stri
if err != nil {
return nil, err
}
defer q.Close()
for q.Next() {
var key, entry string
if err = q.Scan(&key, &entry); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// LedgerBucketWindow is a sequence of buckets associated to a ledger window.
type LedgerBucketWindow[T any] struct {
// buckets is a circular buffer where each cell represents
// all events occurring within a specific ledger.
// the content stored for a specific ledger.
buckets []LedgerBucket[T]
// start is the index of the head in the circular buffer.
start uint32
Expand Down
4 changes: 2 additions & 2 deletions cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (ledgerReader *ConstantLedgerReader) GetLedger(ctx context.Context, sequenc
return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil
}

func (ledgerReader *ConstantLedgerReader) GetAllLedgers(ctx context.Context) ([]xdr.LedgerCloseMeta, error) {
return []xdr.LedgerCloseMeta{createLedger(expectedLatestLedgerSequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes)}, nil
func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(ctx context.Context, f db.StreamLedgerFn) error {
return nil
}

func createLedger(ledgerSequence uint32, protocolVersion uint32, hash byte) xdr.LedgerCloseMeta {
Expand Down
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/methods/get_ledger_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ type GetLedgerEntriesResponse struct {
LatestLedger int64 `json:"latestLedger,string"`
}

const getLedgerEntriesMaxKeys = 200

// NewGetLedgerEntriesHandler returns a JSON RPC handler to retrieve the specified ledger entries from Stellar Core.
func NewGetLedgerEntriesHandler(logger *log.Entry, ledgerEntryReader db.LedgerEntryReader) jrpc2.Handler {
return handler.New(func(ctx context.Context, request GetLedgerEntriesRequest) (GetLedgerEntriesResponse, error) {
if len(request.Keys) > getLedgerEntriesMaxKeys {
return GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("key count (%d) exceeds maximum supported (%d)", len(request.Keys), getLedgerEntriesMaxKeys),
}
}
var ledgerKeys []xdr.LedgerKey
for i, requestKey := range request.Keys {
var ledgerKey xdr.LedgerKey
Expand Down

0 comments on commit 71df30d

Please sign in to comment.