Skip to content

Commit

Permalink
services/horizon: Parallelize db reingest range (#2724)
Browse files Browse the repository at this point in the history
This change breaks down the ledger range to reingest in subranges
which are submitted to a pre-defined number of workers, processing the
subranges in parallel.

For now, the workers are simply Go routines using their own `System`
(with their own DB connections etc ...).

In the future workers could be fully fledged Horizon instances running
in multiple machines (e.g. orchestrated through Kubernetes Jobs or AWS
Batch Jobs).

New flags:
--parallel-workers: [optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers
--parallel-job-size: [optional] parallel workers will run jobs processing ledger batches of the supplied size
  • Loading branch information
2opremio authored Jun 29, 2020
1 parent 41f89ec commit 4419539
Show file tree
Hide file tree
Showing 17 changed files with 500 additions and 106 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x

## Unreleased

* Add `--parallel-workers` and `--parallel-job-size` to `horizon db reingest range`. `--parallel-workers` will parallelize reingestion using the supplied number of workers.
* Add transaction set operation count to `history_ledger`([#2690](https://github.com/stellar/go/pull/2690)).
Extend ingestion to store the total number of operations in the transaction set and expose it in the ledger resource via `tx_set_operation_count`. This feature allow you to assess the used capacity of a transaction set.
* Remove `--ingest-failed-transactions` flag. From now on Horizon will always ingest failed transactions. WARNING: if your application is using Horizon DB directly (not recommended!) remember that now it will also contain failed txs. ([#2702](https://github.com/stellar/go/pull/2702)).
Expand Down
88 changes: 73 additions & 15 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/stellar/go/services/horizon/internal/db2/schema"
"github.com/stellar/go/services/horizon/internal/expingest"
support "github.com/stellar/go/support/config"
Expand Down Expand Up @@ -122,16 +123,54 @@ var dbReingestCmd = &cobra.Command{
},
}

var reingestForce bool
var (
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
)
var reingestRangeCmdOpts = []*support.ConfigOption{
&support.ConfigOption{
{
Name: "force",
ConfigKey: &reingestForce,
OptType: types.Bool,
Required: false,
FlagDefault: false,
Usage: "[optional] if this flag is set, horizon will be blocked " +
"from ingesting until the reingestion command completes",
"from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)",
},
{
Name: "parallel-workers",
ConfigKey: &parallelWorkers,
OptType: types.Uint,
Required: false,
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
OptType: types.Uint,
Required: false,
FlagDefault: uint(0),
Usage: "[optional] number of reingest retries",
},
{
Name: "retry-backoff-seconds",
ConfigKey: &retryBackoffSeconds,
OptType: types.Uint,
Required: false,
FlagDefault: uint(5),
Usage: "[optional] backoff seconds between reingest retries",
},
}

Expand All @@ -144,6 +183,9 @@ var dbReingestRangeCmd = &cobra.Command{
co.Require()
co.SetValue()
}
if reingestForce && parallelWorkers > 1 {
log.Fatal("--force is incompatible with --parallel-workers > 1")
}

if len(args) != 2 {
cmd.Usage()
Expand Down Expand Up @@ -173,25 +215,41 @@ var dbReingestRangeCmd = &cobra.Command{
}

ingestConfig := expingest.Config{
CoreSession: coreSession,
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
CoreSession: coreSession,
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
MaxReingestRetries: int(retries),
ReingesRetryBackoffSeconds: int(retryBackoffSeconds),
}
if config.EnableCaptiveCoreIngestion {
ingestConfig.StellarCorePath = config.StellarCoreBinaryPath
}

system, err := expingest.NewSystem(ingestConfig)
if err != nil {
log.Fatal(err)
if parallelWorkers < 2 {
system, systemErr := expingest.NewSystem(ingestConfig)
if err != nil {
log.Fatal(systemErr)
}

err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
reingestForce,
)
} else {
system, systemErr := expingest.NewParallelSystems(ingestConfig, parallelWorkers)
if err != nil {
log.Fatal(systemErr)
}

err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
parallelJobSize,
)
}

err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
reingestForce,
)
if err == nil {
hlog.Info("Range run successfully!")
return
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type App struct {
orderBookStream *expingest.OrderBookStream
submitter *txsub.System
paths paths.Finder
expingester *expingest.System
expingester expingest.System
reaper *reap.System
ticks *time.Ticker

Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/expingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type BuildStateTestSuite struct {
suite.Suite
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
system *system
runner *mockProcessorsRunner
stellarCoreClient *mockStellarCoreClient
checkpointLedger uint32
Expand All @@ -33,7 +33,7 @@ func (s *BuildStateTestSuite) SetupTest() {
s.stellarCoreClient = &mockStellarCoreClient{}
s.checkpointLedger = uint32(63)
s.lastLedger = 0
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/expingest/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type DBTestSuite struct {
sequence uint32
ledgerBackend *ledgerbackend.MockDatabaseBackend
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
system *system
tt *test.T
}

Expand All @@ -77,14 +77,15 @@ func (s *DBTestSuite) SetupTest() {
s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
var err error
s.system, err = NewSystem(Config{
sIface, err := NewSystem(Config{
CoreSession: s.tt.CoreSession(),
HistorySession: s.tt.HorizonSession(),
HistoryArchiveURL: "http://ignore.test",
MaxStreamRetries: 3,
DisableStateVerification: false,
})
s.Assert().NoError(err)
s.system = sIface.(*system)

s.sequence = uint32(28660351)
s.setupMocksForBuildState()
Expand Down
28 changes: 14 additions & 14 deletions services/horizon/internal/expingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
)

type stateMachineNode interface {
run(*System) (transition, error)
run(*system) (transition, error)
String() string
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (stopState) String() string {
return "stop"
}

func (stopState) run(s *System) (transition, error) {
func (stopState) run(s *system) (transition, error) {
return stop(), errors.New("Cannot run terminal state")
}

Expand All @@ -102,7 +102,7 @@ func (startState) String() string {
return "start"
}

func (startState) run(s *System) (transition, error) {
func (startState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (b buildState) String() string {
return fmt.Sprintf("buildFromCheckpoint(checkpointLedger=%d)", b.checkpointLedger)
}

func (b buildState) run(s *System) (transition, error) {
func (b buildState) run(s *system) (transition, error) {
if b.checkpointLedger == 0 {
return start(), errors.New("unexpected checkpointLedger value")
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func (r resumeState) String() string {
return fmt.Sprintf("resume(latestSuccessfullyProcessedLedger=%d)", r.latestSuccessfullyProcessedLedger)
}

func (r resumeState) run(s *System) (transition, error) {
func (r resumeState) run(s *system) (transition, error) {
if r.latestSuccessfullyProcessedLedger == 0 {
return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value")
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (r resumeState) run(s *System) (transition, error) {
}

duration := time.Since(startTime)
s.Metrics.LedgerIngestionTimer.Update(duration)
s.Metrics().LedgerIngestionTimer.Update(duration)
log.
WithFields(changeStats.Map()).
WithFields(ledgerTransactionStats.Map()).
Expand Down Expand Up @@ -436,7 +436,7 @@ func (h historyRangeState) String() string {
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *System) (transition, error) {
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
Expand Down Expand Up @@ -478,7 +478,7 @@ func (h historyRangeState) run(s *System) (transition, error) {
return start(), nil
}

func runTransactionProcessorsOnLedger(s *System, ledger uint32) error {
func runTransactionProcessorsOnLedger(s *system, ledger uint32) error {
log.WithFields(logpkg.F{
"sequence": ledger,
"state": false,
Expand Down Expand Up @@ -520,7 +520,7 @@ func (h reingestHistoryRangeState) String() string {
)
}

func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger uint32) error {
func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}
Expand Down Expand Up @@ -549,7 +549,7 @@ func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger u
}

// reingestHistoryRangeState is used as a command to reingest historical data
func (h reingestHistoryRangeState) run(s *System) (transition, error) {
func (h reingestHistoryRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
Expand Down Expand Up @@ -634,7 +634,7 @@ func (waitForCheckpointState) String() string {
return "waitForCheckpoint"
}

func (waitForCheckpointState) run(*System) (transition, error) {
func (waitForCheckpointState) run(*system) (transition, error) {
log.Info("Waiting for the next checkpoint...")
time.Sleep(10 * time.Second)
return start(), nil
Expand All @@ -655,7 +655,7 @@ func (v verifyRangeState) String() string {
)
}

func (v verifyRangeState) run(s *System) (transition, error) {
func (v verifyRangeState) run(s *system) (transition, error) {
if v.fromLedger == 0 || v.toLedger == 0 ||
v.fromLedger > v.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", v.fromLedger, v.toLedger)
Expand Down Expand Up @@ -754,7 +754,7 @@ func (stressTestState) String() string {
return "stressTest"
}

func (stressTestState) run(s *System) (transition, error) {
func (stressTestState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
err = errors.Wrap(err, "Error starting a transaction")
return stop(), err
Expand Down Expand Up @@ -813,7 +813,7 @@ func (stressTestState) run(s *System) (transition, error) {
return stop(), nil
}

func (s *System) completeIngestion(ledger uint32) error {
func (s *system) completeIngestion(ledger uint32) error {
if ledger == 0 {
return errors.New("ledger must be positive")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ type IngestHistoryRangeStateTestSuite struct {
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
runner *mockProcessorsRunner
system *System
system *system
}

func (s *IngestHistoryRangeStateTestSuite) SetupTest() {
s.historyQ = &mockDBQ{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
s.runner = &mockProcessorsRunner{}
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down Expand Up @@ -168,15 +168,15 @@ type ReingestHistoryRangeStateTestSuite struct {
historyAdapter *adapters.MockHistoryArchiveAdapter
ledgerBackend *mockLedgerBackend
runner *mockProcessorsRunner
system *System
system *system
}

func (s *ReingestHistoryRangeStateTestSuite) SetupTest() {
s.historyQ = &mockDBQ{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
s.ledgerBackend = &mockLedgerBackend{}
s.runner = &mockProcessorsRunner{}
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/expingest/init_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type InitStateTestSuite struct {
suite.Suite
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
system *system
}

func (s *InitStateTestSuite) SetupTest() {
s.historyQ = &mockDBQ{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down
Loading

0 comments on commit 4419539

Please sign in to comment.