Skip to content

Commit

Permalink
Actually add & use parallel downloads, preparing checkpoint chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Sep 12, 2022
1 parent 4d82993 commit b6028d9
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 56 deletions.
47 changes: 2 additions & 45 deletions exp/lighthorizon/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
Expand All @@ -30,43 +25,8 @@ type liteIngester struct {
networkPassphrase string
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error {
return nil
}

func (i *liteIngester) NewLedgerTransactionReader(
Expand All @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader(
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
i.networkPassphrase,
ledgerCloseMeta.MustV0())
if err != nil {
return nil, err
}

return &liteLedgerTransactionReader{reader}, err
}
Expand Down
56 changes: 56 additions & 0 deletions exp/lighthorizon/ingester/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand All @@ -15,6 +23,7 @@ import (
type Ingester interface {
metaarchive.MetaArchive

PrepareRange(ctx context.Context, r historyarchive.Range) error
NewLedgerTransactionReader(
ledgerCloseMeta xdr.SerializedLedgerCloseMeta,
) (LedgerTransactionReader, error)
Expand All @@ -29,3 +38,50 @@ type LedgerTransaction struct {
type LedgerTransactionReader interface {
Read() (LedgerTransaction, error)
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

if config.ParallelDownloads > 1 {
log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads)
return NewParallelIngester(
source,
config.NetworkPassphrase,
config.ParallelDownloads), nil
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
}
151 changes: 151 additions & 0 deletions exp/lighthorizon/ingester/parallel_ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package ingester

import (
"context"
"sync"
"time"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

type parallelIngester struct {
liteIngester

ledgerFeedLock sync.RWMutex
ledgerFeed map[uint32]downloadState
ledgerQueue set.Set[uint32]

workQueue chan uint32
signalChan chan error
}

type downloadState struct {
ledger xdr.SerializedLedgerCloseMeta
err error
}

// NewParallelIngester creates an ingester on the given `ledgerSource` using the
// given `networkPassphrase` that can download ledgers in parallel via
// `workerCount` workers via `PrepareRange()`.
func NewParallelIngester(
ledgerSource storage.Storage,
networkPassphrase string,
workerCount uint,
) *parallelIngester {
self := &parallelIngester{
liteIngester: liteIngester{
MetaArchive: metaarchive.NewMetaArchive(ledgerSource),
networkPassphrase: networkPassphrase,
},
ledgerFeedLock: sync.RWMutex{},
ledgerFeed: make(map[uint32]downloadState, 64),
ledgerQueue: set.NewSet[uint32](64),
workQueue: make(chan uint32, workerCount),
signalChan: make(chan error),
}

// These are the workers that download & store ledgers in memory.
for j := uint(0); j < workerCount; j++ {
go func(jj uint) {
for ledgerSeq := range self.workQueue {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
txmeta, err := self.liteIngester.GetLedger(ctx, ledgerSeq)
cancel()

log.WithField("duration", time.Since(start)).
WithField("worker", jj).WithError(err).
Debugf("Downloaded ledger %d", ledgerSeq)

self.ledgerFeedLock.Lock()
self.ledgerFeed[ledgerSeq] = downloadState{txmeta, err}
self.ledgerFeedLock.Unlock()
self.signalChan <- err
}
}(j)
}

return self
}

// PrepareRange will create a set of parallel worker routines that feed ledgers
// to a channel in the order they're downloaded and store the results in an
// array. You can use this to download ledgers in parallel to fetching them
// individually via `GetLedger()`.
//
// Note: The passed in range `r` is inclusive of the boundaries.
func (i *parallelIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error {
// The taskmaster adds ledger sequence numbers to the work queue.
go func() {
start := time.Now()
defer func() {
log.WithField("duration", time.Since(start)).
WithError(ctx.Err()).
Infof("Download of ledger range: [%d, %d] (%d ledgers) complete",
r.Low, r.High, r.Size())
}()

for seq := r.Low; seq <= r.High; seq++ {
if ctx.Err() != nil {
log.Warnf("Cancelling remaining downloads ([%d, %d]): %v",
seq, r.High, ctx.Err())
break
}

// Adding this to the "set of ledgers being downloaded in parallel"
// means that if a GetLedger() request happens in this range but
// outside of the realm of processing, it can be prioritized by the
// normal, direct download.
i.ledgerQueue.Add(seq)

i.workQueue <- seq // blocks until there's an available worker

// We don't remove from the queue here, preferring to remove when
// it's actually pulled from the worker. Removing here would mean
// you could have multiple instances of a ledger download happening.
}
}()

return nil
}

func (i *parallelIngester) GetLedger(
ctx context.Context, ledgerSeq uint32,
) (xdr.SerializedLedgerCloseMeta, error) {
// If the requested ledger is out of the queued up ranges, we can fall back
// to the default non-parallel download method.
if !i.ledgerQueue.Contains(ledgerSeq) {
return i.liteIngester.GetLedger(ctx, ledgerSeq)
}

// If the ledger isn't available yet, wait for the download worker.
var err error
for err == nil {
i.ledgerFeedLock.RLock()
if state, ok := i.ledgerFeed[ledgerSeq]; ok {
i.ledgerFeedLock.RUnlock() // re-lock as a writer
i.ledgerFeedLock.Lock()
delete(i.ledgerFeed, ledgerSeq)
i.ledgerQueue.Remove(ledgerSeq)
i.ledgerFeedLock.Unlock()

return state.ledger, state.err
}
i.ledgerFeedLock.RUnlock()

select {
case err = <-i.signalChan: // blocks until another ledger downloads
case <-ctx.Done():
err = ctx.Err()
}
}

return xdr.SerializedLedgerCloseMeta{}, err
}

var _ Ingester = (*parallelIngester)(nil) // ensure conformity to the interface
4 changes: 4 additions & 0 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ break down accounts by active ledgers.`,
cacheDir, _ := cmd.Flags().GetString("ledger-cache")
cacheSize, _ := cmd.Flags().GetUint("ledger-cache-size")
logLevelParam, _ := cmd.Flags().GetString("log-level")
downloadCount, _ := cmd.Flags().GetUint("parallel-downloads")

L := log.WithField("service", "horizon-lite")
logLevel, err := logrus.ParseLevel(logLevelParam)
Expand All @@ -88,6 +89,7 @@ break down accounts by active ledgers.`,
NetworkPassphrase: networkPassphrase,
CacheDir: cacheDir,
CacheSize: int(cacheSize),
ParallelDownloads: downloadCount,
})
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -129,6 +131,8 @@ break down accounts by active ledgers.`,
"if left empty, uses a temporary directory")
serve.Flags().Uint("ledger-cache-size", defaultCacheSize,
"number of ledgers to store in the cache")
serve.Flags().Uint("parallel-downloads", 1,
"how many workers should download ledgers in parallel?")

cmd.AddCommand(serve)
tools.AddCacheCommands(cmd)
Expand Down
30 changes: 23 additions & 7 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context,
Infof("Fulfilled request for account %s at cursor %d", accountId, cursor)
}()

checkpointMgr := historyarchive.NewCheckpointManager(0)

for {
if checkpointMgr.IsCheckpoint(nextLedger) {
r := historyarchive.Range{
Low: nextLedger,
High: checkpointMgr.NextCheckpoint(nextLedger + 1),
}
log.Infof("prepare range %d, %d", r.Low, r.High)
if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil {
log.Errorf("failed to prepare ledger range [%d, %d]: %v",
r.Low, r.High, innerErr)
}
}

start := time.Now()
ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger)
if ledgerErr != nil {
return errors.Wrapf(ledgerErr,
"ledger export state is out of sync at ledger %d", nextLedger)
ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to retrieve ledger %d from archive", nextLedger)
}
count++
thisFetchDuration := time.Since(start)
if thisFetchDuration > slowFetchDurationThreshold {
log.WithField("duration", thisFetchDuration).
Expand All @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context,
fetchDuration += thisFetchDuration

start = time.Now()
reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if readerErr != nil {
return readerErr
reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to read ledger %d", nextLedger)
}

for {
Expand Down
6 changes: 5 additions & 1 deletion support/collections/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
)

func TestSet(t *testing.T) {
s := Set[string]{}
s := NewSet[string](10)
s.Add("sanity")
require.True(t, s.Contains("sanity"))
require.False(t, s.Contains("check"))

s.AddSlice([]string{"a", "b", "c"})
require.True(t, s.Contains("b"))
require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice())
}

func TestSafeSet(t *testing.T) {
Expand Down
Loading

0 comments on commit b6028d9

Please sign in to comment.