Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

/exp/services/ledgerexporter: resumable export, check data storage for optimal starting point #5264

Merged
merged 41 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1c49dc1
#hubble-272: add resumability, adjusting the requested start ledger b…
sreuland Mar 28, 2024
b46722b
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Mar 28, 2024
98ecc59
#hubble-272: app will prevent export if resume determines range is al…
sreuland Mar 28, 2024
14c8252
#hubble-272: fix comments for gofmt
sreuland Mar 28, 2024
22ca900
#hubble-272: updated reference toml for datastore config params
sreuland Mar 28, 2024
58a292c
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Mar 28, 2024
823c10b
#hubble-272: fixed errors found with integration test on fake gcs
sreuland Mar 28, 2024
6e74e8a
#hubble-272: fixed fmt warning
sreuland Mar 28, 2024
2f11f78
#hubble-272: live testing, fixed gcsdatastore Exists() and app refere…
sreuland Mar 29, 2024
574bd88
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Mar 29, 2024
decdcd3
#hubble-272: process review feedback, renamed ExporterConfig to Ledge…
sreuland Mar 29, 2024
7689519
#hubble-272: process review feedback, removed whitespace
sreuland Mar 29, 2024
32302d4
#hubble-272: process review feedback, forgot to commit new files for …
sreuland Mar 29, 2024
b7acd76
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Mar 29, 2024
ebe8bc9
#hubble-272: process review feedback, renamed gcs destination_url to …
sreuland Mar 30, 2024
f8aae6f
#hubble-272: process review feedback, add 2 checkpoint padding on end…
sreuland Apr 1, 2024
4151d9c
#hubble-272: review feedback, use go's sort.Search built-in for binar…
sreuland Apr 8, 2024
540735e
review feedback, minimize gcs_datastore path parsing
sreuland Apr 22, 2024
6e8edb5
review feedback on logging level
sreuland Apr 22, 2024
4753320
#hubble-272: review feedback, simplify variable initialization in sea…
sreuland Apr 22, 2024
e001313
#hubble-272: review feedback, use logger fields
sreuland Apr 23, 2024
7302849
#hubble-272: added --resume flag
sreuland Apr 23, 2024
57e5cea
#hubble-272: validate ledgers per file on during resume from remote d…
sreuland Apr 24, 2024
ef4a400
#hubble-272: removed NetworkManager in favor of using ArchiveInterfac…
sreuland Apr 25, 2024
7ddc4a0
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Apr 25, 2024
d8cb7b2
#hubble-272: review feedback, config pointer, private scope on functions
sreuland Apr 26, 2024
afac63c
#hubble-272: review feedback, make ExportManager a struct only, no in…
sreuland Apr 26, 2024
582a32d
#hubble-272: tidy up go.sum
sreuland Apr 26, 2024
e4c873f
#hubble-272# removed usage of Config in ResumableManager, review feed…
sreuland Apr 26, 2024
7a1231b
#hubble-272: added error output on resumeability attempts, per review…
sreuland Apr 27, 2024
fb2e2cd
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Apr 27, 2024
4c84adf
Merge remote-tracking branch 'upstream/master' into resume_exporter
sreuland Apr 29, 2024
dbb5756
#hubble-272: updated FindStart method docs
sreuland Apr 29, 2024
dbb56b8
#hubble-272: use absentLedger terminology on FindStart
sreuland Apr 29, 2024
f26af67
#hubble-272: changed FindStart to return absentLedger=start when no r…
sreuland Apr 29, 2024
fb984e4
#hubble-272: fixed go fmt warning
sreuland Apr 29, 2024
4a55f54
Update exp/services/ledgerexporter/internal/queue_test.go
sreuland Apr 30, 2024
595fe73
Update exp/services/ledgerexporter/internal/resumablemanager.go
sreuland Apr 30, 2024
a60db19
#hubble-272: removed duplicate logic in FindStart, review feedback
sreuland Apr 30, 2024
e094a4f
#hubble-272: fix govet warning
sreuland Apr 30, 2024
f79c6b8
#hubble-272: update readme description of new resume feature, review …
sreuland Apr 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions exp/services/ledgerexporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ $(if $(STELLAR_CORE_VERSION), --build-arg STELLAR_CORE_VERSION=$(STELLAR_CORE_VE
-t $(DOCKER_IMAGE):$(VERSION) \
-t $(DOCKER_IMAGE):latest .

docker-test:
docker-clean:
sreuland marked this conversation as resolved.
Show resolved Hide resolved
$(SUDO) docker stop fake-gcs-server || true
$(SUDO) docker rm fake-gcs-server || true
$(SUDO) rm -rf ${PWD}/storage || true
$(SUDO) docker network rm test-network || true

docker-test: docker-clean
# Create temp storage dir
$(SUDO) mkdir -p ${PWD}/storage/exporter-test
$(SUDO) mkdir -p ${PWD}/storage/exporter-test

# Create test network for docker
$(SUDO) docker network create test-network
Expand All @@ -28,16 +34,13 @@ docker-test:
# Run the ledger-exporter
$(SUDO) docker run --platform linux/amd64 -t --network test-network\
-e NETWORK=pubnet \
-e ARCHIVE_TARGET=gcs://exporter-test \
-e ARCHIVE_TARGET=exporter-test/test-subpath \
-e START=1000 \
-e END=2000 \
-e STORAGE_EMULATOR_HOST=http://fake-gcs-server:4443 \
$(DOCKER_IMAGE):$(VERSION)

$(SUDO) docker stop fake-gcs-server
$(SUDO) docker rm fake-gcs-server
$(SUDO) rm -rf ${PWD}/storage
$(SUDO) docker network rm test-network

$(MAKE) docker-clean

docker-push:
$(SUDO) docker push $(DOCKER_IMAGE):$(VERSION)
Expand Down
17 changes: 12 additions & 5 deletions exp/services/ledgerexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,24 @@ Exports ledgers continuously starting from --start. In this mode, the end ledger
ledgerexporter --start <start_ledger> --config-file <config_file_path>
```

#### Resumability:
Exporting a ledger range can be optimized further by enabling resumability if the remote data store supports it.

Starts exporting from a specified number of ledgers before the latest ledger sequence number on the network.
```bash
ledgerexporter --from-last <number_of_ledgers> --config-file <config_file_path>
```
By default, resumability is disabled, `--resume false`

When enabled, `--resume true`, ledgerexporter will search the remote data store within the requested range, looking for the oldest absent ledger sequence number within range. If abscence is detected, the export range is narrowed to `--start <absent_ledger_sequence>`.
This feature requires any existing data within the requested range on remote data store to be in an ordered and consistent state, beginning at `--start` and populalated contiguously in increasing order of sequence numbers, without gaps, up to point of abscence or `--end <end>`, whichever comes first or if streaming with `--end 0`.
sreuland marked this conversation as resolved.
Show resolved Hide resolved

### Configuration (toml):

```toml
network = "testnet" # Options: `testnet` or `pubnet`
destination_url = "gcs://your-bucket-name"

[datastore_config]
type = "GCS"
sreuland marked this conversation as resolved.
Show resolved Hide resolved

[datastore_config.params]
destination_bucket_path = "your-bucket-name/<optional_subpaths>"

[exporter_config]
ledgers_per_file = 64
Expand Down
10 changes: 9 additions & 1 deletion exp/services/ledgerexporter/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
network = "testnet"
destination_url = "gcs://exporter-test/ledgers"

[datastore_config]
type = "GCS"

[datastore_config.params]
destination_bucket_path = "exporter-test/ledgers"

[exporter_config]
ledgers_per_file = 1
files_per_partition = 64000

[stellar_core_config]
stellar_core_binary_path = "/usr/local/bin/stellar-core"

11 changes: 6 additions & 5 deletions exp/services/ledgerexporter/docker/start
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ files_per_partition="${FILES_PER_PARTITION:-64000}"
# Generate TOML configuration
cat <<EOF > config.toml
network = "${NETWORK}"
destination_url = "${ARCHIVE_TARGET}"

[datastore_config]
type = "GCS"

[datastore_config.params]
destination_bucket_path = "${ARCHIVE_TARGET}"

[exporter_config]
ledgers_per_file = $ledgers_per_file
Expand All @@ -29,10 +34,6 @@ EOF
if [[ -n "$START" || -n "$END" ]]; then
echo "START: $START END: $END"
/usr/bin/ledgerexporter --config-file config.toml --start $START --end $END
# Check if FROM_LAST variable is set
sreuland marked this conversation as resolved.
Show resolved Hide resolved
elif [[ -n "$FROM_LAST" ]]; then
echo "FROM_LAST: $FROM_LAST"
/usr/bin/ledgerexporter --config-file config.toml --from-last $FROM_LAST
else
echo "Error: No ledger range provided."
exit 1
Expand Down
155 changes: 117 additions & 38 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stellar/go/historyarchive"

"github.com/stellar/go/ingest/ledgerbackend"
_ "github.com/stellar/go/network"
Expand All @@ -24,41 +25,118 @@ var (
logger = log.New().WithField("service", "ledger-exporter")
)

func NewDataAlreadyExportedError(Start uint32, End uint32) *DataAlreadyExportedError {
return &DataAlreadyExportedError{
Start: Start,
End: End,
}
}

type DataAlreadyExportedError struct {
Start uint32
End uint32
}

func (m DataAlreadyExportedError) Error() string {
return fmt.Sprintf("For export ledger range start=%d, end=%d, the remote storage has all the data, there is no need to continue export", m.Start, m.End)
}

func NewInvalidDataStoreError(LedgerSequence uint32, LedgersPerFile uint32) *InvalidDataStoreError {
return &InvalidDataStoreError{
LedgerSequence: LedgerSequence,
LedgersPerFile: LedgersPerFile,
}
}

type InvalidDataStoreError struct {
LedgerSequence uint32
LedgersPerFile uint32
}

func (m InvalidDataStoreError) Error() string {
return fmt.Sprintf("The remote data store has inconsistent data, "+
"a resumable starting ledger of %v was identified, "+
"but that is not aligned to expected ledgers-per-file of %v. use '--resume false' to bypass",
m.LedgerSequence, m.LedgersPerFile)
}

type App struct {
config Config
config *Config
ledgerBackend ledgerbackend.LedgerBackend
dataStore DataStore
exportManager *ExportManager
uploader Uploader
flags Flags
prometheusRegistry *prometheus.Registry
}

func NewApp() *App {
func NewApp(flags Flags) *App {
logger.SetLevel(log.DebugLevel)

config := Config{}
err := config.LoadConfig()
logFatalIf(err, "Could not load configuration")

app := &App{config: config, prometheusRegistry: prometheus.NewRegistry()}
app.prometheusRegistry.MustRegister(
registry := prometheus.NewRegistry()
registry.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{Namespace: "ledger_exporter"}),
collectors.NewGoCollector(),
)
app := &App{flags: flags, prometheusRegistry: registry}
return app
}

func (a *App) init(ctx context.Context) {
a.dataStore = mustNewDataStore(ctx, a.config)
a.ledgerBackend = mustNewLedgerBackend(ctx, a.config, a.prometheusRegistry)
func (a *App) init(ctx context.Context) error {
var err error
var archive historyarchive.ArchiveInterface

if a.config, err = NewConfig(ctx, a.flags); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = createHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil {
return err
}
a.config.ValidateAndSetLedgerRange(ctx, archive)

if a.dataStore, err = NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil {
return errors.Wrap(err, "Could not connect to destination data store")
}
if a.config.Resume {
if err = a.applyResumability(ctx,
NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil {
return err
}
}

logger.Infof("Final computed ledger range for backend retrieval and export, start=%d, end=%d", a.config.StartLedger, a.config.EndLedger)

if a.ledgerBackend, err = newLedgerBackend(ctx, a.config, a.prometheusRegistry); err != nil {
return err
}

// TODO: make number of upload workers configurable instead of hard coding it to 1
queue := NewUploadQueue(1, a.prometheusRegistry)
a.exportManager = NewExportManager(a.config.ExporterConfig, a.ledgerBackend, queue, a.prometheusRegistry)
a.uploader = NewUploader(
a.dataStore,
queue,
a.prometheusRegistry,
)
if a.exportManager, err = NewExportManager(a.config.LedgerBatchConfig, a.ledgerBackend, queue, a.prometheusRegistry); err != nil {
return err
}
a.uploader = NewUploader(a.dataStore, queue, a.prometheusRegistry)

return nil
}

func (a *App) applyResumability(ctx context.Context, resumableManager ResumableManager) error {
absentLedger, ok, err := resumableManager.FindStart(ctx, a.config.StartLedger, a.config.EndLedger)
if err != nil {
return err
}
if !ok {
return NewDataAlreadyExportedError(a.config.StartLedger, a.config.EndLedger)
}

// TODO - evaluate a more robust validation of remote data for ledgers-per-file consistency
// this assumes ValidateAndSetLedgerRange() has conditioned the a.config.StartLedger to be at least > 1
if absentLedger > 2 && absentLedger != a.config.LedgerBatchConfig.GetSequenceNumberStartBoundary(absentLedger) {
return NewInvalidDataStoreError(absentLedger, a.config.LedgerBatchConfig.LedgersPerFile)
}
logger.Infof("For export ledger range start=%d, end=%d, the remote storage has some of this data already, will resume at later start ledger of %d", a.config.StartLedger, a.config.EndLedger, absentLedger)
a.config.StartLedger = absentLedger

return nil
}

func (a *App) close() {
Expand Down Expand Up @@ -92,7 +170,15 @@ func (a *App) Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a.init(ctx)
if err := a.init(ctx); err != nil {
var dataAlreadyExported DataAlreadyExportedError
if errors.As(err, &dataAlreadyExported) {
logger.Info(err.Error())
logger.Info("Shutting down ledger-exporter")
return
}
logger.WithError(err).Fatal("Stopping ledger-exporter")
}
defer a.close()

var wg sync.WaitGroup
Expand Down Expand Up @@ -133,22 +219,20 @@ func (a *App) Run() {
logger.Info("Shutting down ledger-exporter")
}

func mustNewDataStore(ctx context.Context, config Config) DataStore {
dataStore, err := NewDataStore(ctx, fmt.Sprintf("%s/%s", config.DestinationURL, config.Network))
logFatalIf(err, "Could not connect to destination data store")
return dataStore
}

// mustNewLedgerBackend Creates and initializes captive core ledger backend
// newLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func mustNewLedgerBackend(ctx context.Context, config Config, prometheusRegistry *prometheus.Registry) ledgerbackend.LedgerBackend {
captiveConfig := config.GenerateCaptiveCoreConfig()
func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
captiveConfig, err := config.GenerateCaptiveCoreConfig()
if err != nil {
return nil, err
}

var backend ledgerbackend.LedgerBackend
var err error
// Create a new captive core backend
backend, err = ledgerbackend.NewCaptive(captiveConfig)
logFatalIf(err, "Failed to create captive-core instance")
if err != nil {
return nil, errors.Wrap(err, "Failed to create captive-core instance")
}
backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter")

var ledgerRange ledgerbackend.Range
Expand All @@ -158,13 +242,8 @@ func mustNewLedgerBackend(ctx context.Context, config Config, prometheusRegistry
ledgerRange = ledgerbackend.BoundedRange(config.StartLedger, config.EndLedger)
}

err = backend.PrepareRange(ctx, ledgerRange)
logFatalIf(err, "Could not prepare captive core ledger backend")
return backend
}

func logFatalIf(err error, message string, args ...interface{}) {
if err != nil {
logger.WithError(err).Fatalf(message, args...)
if err = backend.PrepareRange(ctx, ledgerRange); err != nil {
return nil, errors.Wrap(err, "Could not prepare captive core ledger backend")
}
return backend, nil
}
Loading
Loading