From 2cd8e4ee41623150b3a631c2f3df766df1428bd2 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 22 Jun 2020 17:24:51 +0200 Subject: [PATCH] Remove job size (the larger, the faster it is) --- services/horizon/cmd/db.go | 11 +---- .../horizon/internal/expingest/main_test.go | 44 +++++++++++++++++++ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 591619b28b..94c91f1b94 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -126,7 +126,6 @@ var dbReingestCmd = &cobra.Command{ var ( reingestForce bool parallelWorkers uint - parallelJobSize uint32 ) var reingestRangeCmdOpts = []*support.ConfigOption{ { @@ -146,14 +145,6 @@ var reingestRangeCmdOpts = []*support.ConfigOption{ FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, - { - Name: "parallel-job-size", - ConfigKey: ¶llelJobSize, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(256), - Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", - }, } var dbReingestRangeCmd = &cobra.Command{ @@ -227,7 +218,7 @@ var dbReingestRangeCmd = &cobra.Command{ err = system.ReingestRange( argsInt32[0], argsInt32[1], - parallelJobSize, + argsInt32[1]-argsInt32[0], ) } diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index 120e085be8..132a637b0c 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -112,6 +112,50 @@ func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from } func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func TestParallelReingestRange(t *testing.T) { + config := Config{} + var ( + rangesCalled sorteableRanges + shutdowns int + m sync.Mutex + ) + factory := func(c Config) (System, error) { + result := &mockSystem{} + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( + func(args mock.Arguments) { + r := ledgerRange{ + from: args.Get(0).(uint32), + to: args.Get(1).(uint32), + } + m.Lock() + defer m.Unlock() + rangesCalled = append(rangesCalled, r) + // simulate call + time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) + }).Return(error(nil)) + result.On("Shutdown").Run(func(mock.Arguments) { + m.Lock() + defer m.Unlock() + shutdowns++ + }) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.NoError(t, err) + + sort.Sort(rangesCalled) + expected := sorteableRanges{ + {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, + {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, + } + assert.Equal(t, expected, rangesCalled) + system.Shutdown() + assert.Equal(t, 3, shutdowns) + +} + +func TestParallelReingestRangeError(t *testing.T) { config := Config{} var ( systems []*mockSystem