From 895f45610a8b48186098a307380ebab0ca983265 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 28 Jan 2021 10:25:14 -0800 Subject: [PATCH 01/10] Add initial draft version of the pooled archive type --- historyarchive/pool.go | 97 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 historyarchive/pool.go diff --git a/historyarchive/pool.go b/historyarchive/pool.go new file mode 100644 index 0000000000..6d55178793 --- /dev/null +++ b/historyarchive/pool.go @@ -0,0 +1,97 @@ +// Copyright 2016 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +package historyarchive + +import ( + "math/rand" + + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// Type PooledArchive forwards all API calls to a random ArchiveInterface within +// its internal pool. +type PooledArchive struct { + pool []ArchiveInterface +} + +var _ ArchiveInterface = &PooledArchive{} + +func CreatePool(archives []ArchiveInterface) (*PooledArchive, error) { + if len(archives) <= 0 { + return nil, errors.New("No history archives provided") + } + return &PooledArchive{pool: archives}, nil +} + +func (pa *PooledArchive) GetAnyArchive() ArchiveInterface { + return pa.pool[rand.Intn(len(pa.pool))] +} + +// Below are the ArchiveInterface method implementations. + +func (pa *PooledArchive) GetPathHAS(path string) (HistoryArchiveState, error) { + return pa.GetAnyArchive().GetPathHAS(path) +} + +func (pa *PooledArchive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { + return pa.GetAnyArchive().PutPathHAS(path, has, opts) +} + +func (pa *PooledArchive) BucketExists(bucket Hash) (bool, error) { + return pa.GetAnyArchive().BucketExists(bucket) +} + +func (pa *PooledArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { + return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk) +} + +func (pa *PooledArchive) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { + return pa.GetAnyArchive().GetLedgerHeader(chk) +} + +func (pa *PooledArchive) GetRootHAS() (HistoryArchiveState, error) { + return pa.GetAnyArchive().GetRootHAS() +} + +func (pa *PooledArchive) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { + return pa.GetAnyArchive().GetCheckpointHAS(chk) +} + +func (pa *PooledArchive) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { + return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts) +} + +func (pa *PooledArchive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { + return pa.GetAnyArchive().PutRootHAS(has, opts) +} + +func (pa *PooledArchive) ListBucket(dp DirPrefix) (chan string, chan error) { + return pa.GetAnyArchive().ListBucket(dp) +} + +func (pa *PooledArchive) ListAllBuckets() (chan string, chan error) { + return pa.GetAnyArchive().ListAllBuckets() +} + +func (pa *PooledArchive) ListAllBucketHashes() (chan Hash, chan error) { + return pa.GetAnyArchive().ListAllBucketHashes() +} + +func (pa *PooledArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { + return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth) +} + +func (pa *PooledArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { + return pa.GetAnyArchive().GetXdrStreamForHash(hash) +} + +func (pa *PooledArchive) GetXdrStream(pth string) (*XdrStream, error) { + return pa.GetAnyArchive().GetXdrStream(pth) +} + +func (pa *PooledArchive) GetCheckpointManager() CheckpointManager { + return pa.GetAnyArchive().GetCheckpointManager() +} From b0a1bcf295fd7e52b72cceb4150cdc76c5328bb3 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 28 Jan 2021 13:46:16 -0800 Subject: [PATCH 02/10] Make Captive Core backend use the new pool --- historyarchive/pool.go | 2 +- ingest/ledgerbackend/captive_core_backend.go | 32 ++++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/historyarchive/pool.go b/historyarchive/pool.go index 6d55178793..88be2fe87d 100644 --- a/historyarchive/pool.go +++ b/historyarchive/pool.go @@ -19,7 +19,7 @@ type PooledArchive struct { var _ ArchiveInterface = &PooledArchive{} -func CreatePool(archives []ArchiveInterface) (*PooledArchive, error) { +func CreatePool(archives ...ArchiveInterface) (*PooledArchive, error) { if len(archives) <= 0 { return nil, errors.New("No history archives provided") } diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 080167ecb3..3b5639fa89 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -146,21 +146,33 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { var cancel context.CancelFunc config.Context, cancel = context.WithCancel(parentCtx) - archive, err := historyarchive.Connect( - config.HistoryArchiveURLs[0], - historyarchive.ConnectOptions{ - NetworkPassphrase: config.NetworkPassphrase, - CheckpointFrequency: config.CheckpointFrequency, - Context: config.Context, - }, - ) + var validArchives []historyarchive.ArchiveInterface + + for _, url := range config.HistoryArchiveURLs { + archive, err := historyarchive.Connect( + url, + historyarchive.ConnectOptions{ + NetworkPassphrase: config.NetworkPassphrase, + CheckpointFrequency: config.CheckpointFrequency, + Context: config.Context, + }, + ) + + if err != nil { + continue + } + + validArchives = append(validArchives, archive) + } + + pool, err := historyarchive.CreatePool(validArchives...) if err != nil { cancel() - return nil, errors.Wrap(err, "error connecting to history archive") + return nil, err } c := &CaptiveStellarCore{ - archive: archive, + archive: pool, ledgerHashStore: config.LedgerHashStore, cancel: cancel, checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency), From f023572a58c35455ad8baad867a5f2057bf69254 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 10 Feb 2021 15:29:14 -0800 Subject: [PATCH 03/10] Add logging messages --- ingest/ledgerbackend/captive_core_backend.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 3b5639fa89..f6deb1e14d 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -159,6 +159,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { ) if err != nil { + config.Log.Warnf("Error connecting to history archive (%s): %s", url, err) continue } @@ -168,6 +169,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { pool, err := historyarchive.CreatePool(validArchives...) if err != nil { cancel() + config.Log.Error("Error connecting to ALL history archives.") return nil, err } From 3da69c13550ea94691b13a4390c2374ee0e2eb7f Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 11 Feb 2021 13:39:42 -0800 Subject: [PATCH 04/10] Convert pool object to just be a collection of archives --- historyarchive/pool.go | 121 +++++++------------ ingest/ledgerbackend/captive_core_backend.go | 33 ++--- 2 files changed, 53 insertions(+), 101 deletions(-) diff --git a/historyarchive/pool.go b/historyarchive/pool.go index 88be2fe87d..52756e8d51 100644 --- a/historyarchive/pool.go +++ b/historyarchive/pool.go @@ -1,4 +1,4 @@ -// Copyright 2016 Stellar Development Foundation and contributors. Licensed +// Copyright 2021 Stellar Development Foundation and contributors. Licensed // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 @@ -8,90 +8,55 @@ import ( "math/rand" "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" ) -// Type PooledArchive forwards all API calls to a random ArchiveInterface within -// its internal pool. -type PooledArchive struct { - pool []ArchiveInterface -} - -var _ ArchiveInterface = &PooledArchive{} - -func CreatePool(archives ...ArchiveInterface) (*PooledArchive, error) { - if len(archives) <= 0 { +// A PooledArchive is just a collection of `ArchiveInterface`s so that we can +// distribute requests fairly throughout the pool. +type PooledArchive []ArchiveInterface + +// CreatePool tries connecting to each of the provided history archive URLs, +// returning a pool of valid archives. +// +// If none of the archives work, this returns the error message of the last +// failed archive. Note that the errors for each individual archive are hard to +// track if there's success overall. +// +// Possible FIXME for the above limitation: return []error instead? but then +// users need to check `len(pool) > 0` instead of `err == nil`. +func CreatePool(archiveURLs []string, config ConnectOptions) (*PooledArchive, error) { + if len(archiveURLs) <= 0 { return nil, errors.New("No history archives provided") } - return &PooledArchive{pool: archives}, nil -} - -func (pa *PooledArchive) GetAnyArchive() ArchiveInterface { - return pa.pool[rand.Intn(len(pa.pool))] -} - -// Below are the ArchiveInterface method implementations. - -func (pa *PooledArchive) GetPathHAS(path string) (HistoryArchiveState, error) { - return pa.GetAnyArchive().GetPathHAS(path) -} -func (pa *PooledArchive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { - return pa.GetAnyArchive().PutPathHAS(path, has, opts) -} - -func (pa *PooledArchive) BucketExists(bucket Hash) (bool, error) { - return pa.GetAnyArchive().BucketExists(bucket) -} - -func (pa *PooledArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { - return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk) -} - -func (pa *PooledArchive) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { - return pa.GetAnyArchive().GetLedgerHeader(chk) -} - -func (pa *PooledArchive) GetRootHAS() (HistoryArchiveState, error) { - return pa.GetAnyArchive().GetRootHAS() -} - -func (pa *PooledArchive) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { - return pa.GetAnyArchive().GetCheckpointHAS(chk) -} - -func (pa *PooledArchive) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { - return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts) -} - -func (pa *PooledArchive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { - return pa.GetAnyArchive().PutRootHAS(has, opts) -} - -func (pa *PooledArchive) ListBucket(dp DirPrefix) (chan string, chan error) { - return pa.GetAnyArchive().ListBucket(dp) -} - -func (pa *PooledArchive) ListAllBuckets() (chan string, chan error) { - return pa.GetAnyArchive().ListAllBuckets() -} - -func (pa *PooledArchive) ListAllBucketHashes() (chan Hash, chan error) { - return pa.GetAnyArchive().ListAllBucketHashes() -} - -func (pa *PooledArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { - return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth) -} + var lastErr error = nil + + // Try connecting to all of the listed archives, but only store valid ones. + var validArchives PooledArchive + for _, url := range archiveURLs { + archive, err := Connect( + url, + ConnectOptions{ + NetworkPassphrase: config.NetworkPassphrase, + CheckpointFrequency: config.CheckpointFrequency, + Context: config.Context, + }, + ) + + if err != nil { + lastErr = errors.Wrapf(err, "Error connecting to history archive (%s)", url) + continue + } + + validArchives = append(validArchives, archive) + } -func (pa *PooledArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { - return pa.GetAnyArchive().GetXdrStreamForHash(hash) -} + if len(validArchives) == 0 { + return nil, lastErr + } -func (pa *PooledArchive) GetXdrStream(pth string) (*XdrStream, error) { - return pa.GetAnyArchive().GetXdrStream(pth) + return &validArchives, nil } -func (pa *PooledArchive) GetCheckpointManager() CheckpointManager { - return pa.GetAnyArchive().GetCheckpointManager() +func GetRandomArchive(pool PooledArchive) ArchiveInterface { + return pool[rand.Intn(len(pool))] } diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index f6deb1e14d..16c16567f8 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -146,35 +146,22 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { var cancel context.CancelFunc config.Context, cancel = context.WithCancel(parentCtx) - var validArchives []historyarchive.ArchiveInterface - - for _, url := range config.HistoryArchiveURLs { - archive, err := historyarchive.Connect( - url, - historyarchive.ConnectOptions{ - NetworkPassphrase: config.NetworkPassphrase, - CheckpointFrequency: config.CheckpointFrequency, - Context: config.Context, - }, - ) - - if err != nil { - config.Log.Warnf("Error connecting to history archive (%s): %s", url, err) - continue - } - - validArchives = append(validArchives, archive) - } + archivePool, err := historyarchive.CreatePool( + config.HistoryArchiveURLs, + historyarchive.ConnectOptions{ + NetworkPassphrase: config.NetworkPassphrase, + CheckpointFrequency: config.CheckpointFrequency, + Context: config.Context, + }, + ) - pool, err := historyarchive.CreatePool(validArchives...) if err != nil { cancel() - config.Log.Error("Error connecting to ALL history archives.") - return nil, err + return nil, errors.Wrap(err, "Error connecting to ALL history archives.") } c := &CaptiveStellarCore{ - archive: pool, + archive: historyarchive.GetRandomArchive(*archivePool), ledgerHashStore: config.LedgerHashStore, cancel: cancel, checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency), From c3429511f0d8dc6d9bf47f1ebcf16bd04205fa34 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Fri, 12 Feb 2021 15:01:55 -0800 Subject: [PATCH 05/10] Add ArchiveInterface-compatible methods back to PooledArchive --- historyarchive/pool.go | 82 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 78 insertions(+), 4 deletions(-) diff --git a/historyarchive/pool.go b/historyarchive/pool.go index 52756e8d51..dff584f666 100644 --- a/historyarchive/pool.go +++ b/historyarchive/pool.go @@ -8,6 +8,7 @@ import ( "math/rand" "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" ) // A PooledArchive is just a collection of `ArchiveInterface`s so that we can @@ -23,7 +24,7 @@ type PooledArchive []ArchiveInterface // // Possible FIXME for the above limitation: return []error instead? but then // users need to check `len(pool) > 0` instead of `err == nil`. -func CreatePool(archiveURLs []string, config ConnectOptions) (*PooledArchive, error) { +func CreatePool(archiveURLs []string, config ConnectOptions) (PooledArchive, error) { if len(archiveURLs) <= 0 { return nil, errors.New("No history archives provided") } @@ -54,9 +55,82 @@ func CreatePool(archiveURLs []string, config ConnectOptions) (*PooledArchive, er return nil, lastErr } - return &validArchives, nil + return validArchives, nil } -func GetRandomArchive(pool PooledArchive) ArchiveInterface { - return pool[rand.Intn(len(pool))] +// Ensure the pool conforms to the ArchiveInterface +var _ ArchiveInterface = PooledArchive{} + +// Below are the ArchiveInterface method implementations. + +func (pa PooledArchive) GetAnyArchive() ArchiveInterface { + return pa[rand.Intn(len(pa))] +} + +func (pa PooledArchive) GetPathHAS(path string) (HistoryArchiveState, error) { + return pa.GetAnyArchive().GetPathHAS(path) +} + +func (pa PooledArchive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { + return pa.GetAnyArchive().PutPathHAS(path, has, opts) +} + +func (pa PooledArchive) BucketExists(bucket Hash) (bool, error) { + return pa.GetAnyArchive().BucketExists(bucket) +} + +func (pa PooledArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { + return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk) +} + +func (pa PooledArchive) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { + return pa.GetAnyArchive().GetLedgerHeader(chk) +} + +func (pa PooledArchive) GetRootHAS() (HistoryArchiveState, error) { + return pa.GetAnyArchive().GetRootHAS() +} + +func (pa PooledArchive) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) { + return pa.GetAnyArchive().GetLedgers(start, end) +} + +func (pa PooledArchive) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { + return pa.GetAnyArchive().GetCheckpointHAS(chk) +} + +func (pa PooledArchive) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { + return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts) +} + +func (pa PooledArchive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { + return pa.GetAnyArchive().PutRootHAS(has, opts) +} + +func (pa PooledArchive) ListBucket(dp DirPrefix) (chan string, chan error) { + return pa.GetAnyArchive().ListBucket(dp) +} + +func (pa PooledArchive) ListAllBuckets() (chan string, chan error) { + return pa.GetAnyArchive().ListAllBuckets() +} + +func (pa PooledArchive) ListAllBucketHashes() (chan Hash, chan error) { + return pa.GetAnyArchive().ListAllBucketHashes() +} + +func (pa PooledArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { + return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth) +} + +func (pa PooledArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { + return pa.GetAnyArchive().GetXdrStreamForHash(hash) +} + +func (pa PooledArchive) GetXdrStream(pth string) (*XdrStream, error) { + return pa.GetAnyArchive().GetXdrStream(pth) +} + +func (pa PooledArchive) GetCheckpointManager() CheckpointManager { + return pa.GetAnyArchive().GetCheckpointManager() } From 10b88f8ae1de72768910251b4b3b55036909174c Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Fri, 12 Feb 2021 15:02:16 -0800 Subject: [PATCH 06/10] Use the new interface when running Captive Core --- ingest/ledgerbackend/captive_core_backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 16c16567f8..4d645cf30b 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -161,7 +161,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { } c := &CaptiveStellarCore{ - archive: historyarchive.GetRandomArchive(*archivePool), + archive: archivePool, ledgerHashStore: config.LedgerHashStore, cancel: cancel, checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency), From fdc2e20e5e346cdbca3f1cfd7a9a75b595b69cee Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Fri, 12 Feb 2021 15:03:02 -0800 Subject: [PATCH 07/10] Rename object to ArchivePool --- historyarchive/pool.go | 46 +++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/historyarchive/pool.go b/historyarchive/pool.go index dff584f666..229b350c6c 100644 --- a/historyarchive/pool.go +++ b/historyarchive/pool.go @@ -11,9 +11,9 @@ import ( "github.com/stellar/go/xdr" ) -// A PooledArchive is just a collection of `ArchiveInterface`s so that we can +// A ArchivePool is just a collection of `ArchiveInterface`s so that we can // distribute requests fairly throughout the pool. -type PooledArchive []ArchiveInterface +type ArchivePool []ArchiveInterface // CreatePool tries connecting to each of the provided history archive URLs, // returning a pool of valid archives. @@ -24,7 +24,7 @@ type PooledArchive []ArchiveInterface // // Possible FIXME for the above limitation: return []error instead? but then // users need to check `len(pool) > 0` instead of `err == nil`. -func CreatePool(archiveURLs []string, config ConnectOptions) (PooledArchive, error) { +func CreatePool(archiveURLs []string, config ConnectOptions) (ArchivePool, error) { if len(archiveURLs) <= 0 { return nil, errors.New("No history archives provided") } @@ -32,7 +32,7 @@ func CreatePool(archiveURLs []string, config ConnectOptions) (PooledArchive, err var lastErr error = nil // Try connecting to all of the listed archives, but only store valid ones. - var validArchives PooledArchive + var validArchives ArchivePool for _, url := range archiveURLs { archive, err := Connect( url, @@ -59,78 +59,78 @@ func CreatePool(archiveURLs []string, config ConnectOptions) (PooledArchive, err } // Ensure the pool conforms to the ArchiveInterface -var _ ArchiveInterface = PooledArchive{} +var _ ArchiveInterface = ArchivePool{} // Below are the ArchiveInterface method implementations. -func (pa PooledArchive) GetAnyArchive() ArchiveInterface { +func (pa ArchivePool) GetAnyArchive() ArchiveInterface { return pa[rand.Intn(len(pa))] } -func (pa PooledArchive) GetPathHAS(path string) (HistoryArchiveState, error) { +func (pa ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) { return pa.GetAnyArchive().GetPathHAS(path) } -func (pa PooledArchive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { +func (pa ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { return pa.GetAnyArchive().PutPathHAS(path, has, opts) } -func (pa PooledArchive) BucketExists(bucket Hash) (bool, error) { +func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) { return pa.GetAnyArchive().BucketExists(bucket) } -func (pa PooledArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { +func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk) } -func (pa PooledArchive) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { +func (pa ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { return pa.GetAnyArchive().GetLedgerHeader(chk) } -func (pa PooledArchive) GetRootHAS() (HistoryArchiveState, error) { +func (pa ArchivePool) GetRootHAS() (HistoryArchiveState, error) { return pa.GetAnyArchive().GetRootHAS() } -func (pa PooledArchive) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) { +func (pa ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) { return pa.GetAnyArchive().GetLedgers(start, end) } -func (pa PooledArchive) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { +func (pa ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { return pa.GetAnyArchive().GetCheckpointHAS(chk) } -func (pa PooledArchive) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { +func (pa ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts) } -func (pa PooledArchive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { +func (pa ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { return pa.GetAnyArchive().PutRootHAS(has, opts) } -func (pa PooledArchive) ListBucket(dp DirPrefix) (chan string, chan error) { +func (pa ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) { return pa.GetAnyArchive().ListBucket(dp) } -func (pa PooledArchive) ListAllBuckets() (chan string, chan error) { +func (pa ArchivePool) ListAllBuckets() (chan string, chan error) { return pa.GetAnyArchive().ListAllBuckets() } -func (pa PooledArchive) ListAllBucketHashes() (chan Hash, chan error) { +func (pa ArchivePool) ListAllBucketHashes() (chan Hash, chan error) { return pa.GetAnyArchive().ListAllBucketHashes() } -func (pa PooledArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { +func (pa ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth) } -func (pa PooledArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { +func (pa ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { return pa.GetAnyArchive().GetXdrStreamForHash(hash) } -func (pa PooledArchive) GetXdrStream(pth string) (*XdrStream, error) { +func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) { return pa.GetAnyArchive().GetXdrStream(pth) } -func (pa PooledArchive) GetCheckpointManager() CheckpointManager { +func (pa ArchivePool) GetCheckpointManager() CheckpointManager { return pa.GetAnyArchive().GetCheckpointManager() } From 8a80c1a66e69118e4e7c983d80f92bd050ae0894 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Fri, 12 Feb 2021 15:04:40 -0800 Subject: [PATCH 08/10] Allow the methods to work on pointers I don't understand Golang *super* well yet, but I think that methods on a *Type are way less restrictive than methods on a Type, right? --- historyarchive/pool.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/historyarchive/pool.go b/historyarchive/pool.go index 229b350c6c..b513c06076 100644 --- a/historyarchive/pool.go +++ b/historyarchive/pool.go @@ -59,7 +59,7 @@ func CreatePool(archiveURLs []string, config ConnectOptions) (ArchivePool, error } // Ensure the pool conforms to the ArchiveInterface -var _ ArchiveInterface = ArchivePool{} +var _ ArchiveInterface = &ArchivePool{} // Below are the ArchiveInterface method implementations. @@ -67,70 +67,70 @@ func (pa ArchivePool) GetAnyArchive() ArchiveInterface { return pa[rand.Intn(len(pa))] } -func (pa ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) { +func (pa *ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) { return pa.GetAnyArchive().GetPathHAS(path) } -func (pa ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { +func (pa *ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error { return pa.GetAnyArchive().PutPathHAS(path, has, opts) } -func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) { +func (pa *ArchivePool) BucketExists(bucket Hash) (bool, error) { return pa.GetAnyArchive().BucketExists(bucket) } -func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { +func (pa *ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk) } -func (pa ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { +func (pa *ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) { return pa.GetAnyArchive().GetLedgerHeader(chk) } -func (pa ArchivePool) GetRootHAS() (HistoryArchiveState, error) { +func (pa *ArchivePool) GetRootHAS() (HistoryArchiveState, error) { return pa.GetAnyArchive().GetRootHAS() } -func (pa ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) { +func (pa *ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) { return pa.GetAnyArchive().GetLedgers(start, end) } -func (pa ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { +func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) { return pa.GetAnyArchive().GetCheckpointHAS(chk) } -func (pa ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { +func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts) } -func (pa ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { +func (pa *ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error { return pa.GetAnyArchive().PutRootHAS(has, opts) } -func (pa ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) { +func (pa *ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) { return pa.GetAnyArchive().ListBucket(dp) } -func (pa ArchivePool) ListAllBuckets() (chan string, chan error) { +func (pa *ArchivePool) ListAllBuckets() (chan string, chan error) { return pa.GetAnyArchive().ListAllBuckets() } -func (pa ArchivePool) ListAllBucketHashes() (chan Hash, chan error) { +func (pa *ArchivePool) ListAllBucketHashes() (chan Hash, chan error) { return pa.GetAnyArchive().ListAllBucketHashes() } -func (pa ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { +func (pa *ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) { return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth) } -func (pa ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { +func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { return pa.GetAnyArchive().GetXdrStreamForHash(hash) } -func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) { +func (pa *ArchivePool) GetXdrStream(pth string) (*XdrStream, error) { return pa.GetAnyArchive().GetXdrStream(pth) } -func (pa ArchivePool) GetCheckpointManager() CheckpointManager { +func (pa *ArchivePool) GetCheckpointManager() CheckpointManager { return pa.GetAnyArchive().GetCheckpointManager() } From b819a5713171540a31d5533fe7e4bd5a0511396d Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Fri, 12 Feb 2021 15:07:58 -0800 Subject: [PATCH 09/10] Fix pointer issue (thx go vet!) --- ingest/ledgerbackend/captive_core_backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 4d645cf30b..7a0ae0225a 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -161,7 +161,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { } c := &CaptiveStellarCore{ - archive: archivePool, + archive: &archivePool, ledgerHashStore: config.LedgerHashStore, cancel: cancel, checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency), From ccab114c20de93cf80984d16a6c7260ea98e4097 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 16 Feb 2021 10:09:37 -0800 Subject: [PATCH 10/10] Change filename and constructor name to match type --- historyarchive/{pool.go => archive_pool.go} | 7 ++----- ingest/ledgerbackend/captive_core_backend.go | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) rename historyarchive/{pool.go => archive_pool.go} (92%) diff --git a/historyarchive/pool.go b/historyarchive/archive_pool.go similarity index 92% rename from historyarchive/pool.go rename to historyarchive/archive_pool.go index b513c06076..b7db9d2b2b 100644 --- a/historyarchive/pool.go +++ b/historyarchive/archive_pool.go @@ -15,16 +15,13 @@ import ( // distribute requests fairly throughout the pool. type ArchivePool []ArchiveInterface -// CreatePool tries connecting to each of the provided history archive URLs, +// NewArchivePool tries connecting to each of the provided history archive URLs, // returning a pool of valid archives. // // If none of the archives work, this returns the error message of the last // failed archive. Note that the errors for each individual archive are hard to // track if there's success overall. -// -// Possible FIXME for the above limitation: return []error instead? but then -// users need to check `len(pool) > 0` instead of `err == nil`. -func CreatePool(archiveURLs []string, config ConnectOptions) (ArchivePool, error) { +func NewArchivePool(archiveURLs []string, config ConnectOptions) (ArchivePool, error) { if len(archiveURLs) <= 0 { return nil, errors.New("No history archives provided") } diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 7a0ae0225a..10f7c29db5 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -146,7 +146,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { var cancel context.CancelFunc config.Context, cancel = context.WithCancel(parentCtx) - archivePool, err := historyarchive.CreatePool( + archivePool, err := historyarchive.NewArchivePool( config.HistoryArchiveURLs, historyarchive.ConnectOptions{ NetworkPassphrase: config.NetworkPassphrase,