Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

historyarchive: Introduce a History Archive pool that's selected from for all calls #3402

Merged
merged 10 commits into from
Feb 16, 2021
62 changes: 62 additions & 0 deletions historyarchive/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

package historyarchive

import (
"math/rand"

"github.com/stellar/go/support/errors"
)

// A PooledArchive is just a collection of `ArchiveInterface`s so that we can
// distribute requests fairly throughout the pool.
type PooledArchive []ArchiveInterface

// CreatePool tries connecting to each of the provided history archive URLs,
// returning a pool of valid archives.
//
// If none of the archives work, this returns the error message of the last
// failed archive. Note that the errors for each individual archive are hard to
// track if there's success overall.
//
// Possible FIXME for the above limitation: return []error instead? but then
// users need to check `len(pool) > 0` instead of `err == nil`.
func CreatePool(archiveURLs []string, config ConnectOptions) (*PooledArchive, error) {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if len(archiveURLs) <= 0 {
return nil, errors.New("No history archives provided")
}

var lastErr error = nil

// Try connecting to all of the listed archives, but only store valid ones.
var validArchives PooledArchive
for _, url := range archiveURLs {
archive, err := Connect(
url,
ConnectOptions{
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
Context: config.Context,
},
)

if err != nil {
lastErr = errors.Wrapf(err, "Error connecting to history archive (%s)", url)
continue
}

validArchives = append(validArchives, archive)
}

if len(validArchives) == 0 {
return nil, lastErr
}

return &validArchives, nil
}

func GetRandomArchive(pool PooledArchive) ArchiveInterface {
return pool[rand.Intn(len(pool))]
}
9 changes: 5 additions & 4 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,22 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
var cancel context.CancelFunc
config.Context, cancel = context.WithCancel(parentCtx)

archive, err := historyarchive.Connect(
config.HistoryArchiveURLs[0],
archivePool, err := historyarchive.CreatePool(
config.HistoryArchiveURLs,
historyarchive.ConnectOptions{
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
Context: config.Context,
},
)

if err != nil {
cancel()
return nil, errors.Wrap(err, "error connecting to history archive")
return nil, errors.Wrap(err, "Error connecting to ALL history archives.")
}

c := &CaptiveStellarCore{
archive: archive,
archive: historyarchive.GetRandomArchive(*archivePool),
ledgerHashStore: config.LedgerHashStore,
cancel: cancel,
checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency),
Expand Down