Skip to content

Commit

Permalink
Remove job size (the larger, the faster it is)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jun 23, 2020
1 parent a50fd42 commit 2cd8e4e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
11 changes: 1 addition & 10 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ var dbReingestCmd = &cobra.Command{
var (
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
)
var reingestRangeCmdOpts = []*support.ConfigOption{
{
Expand All @@ -146,14 +145,6 @@ var reingestRangeCmdOpts = []*support.ConfigOption{
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(256),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
}

var dbReingestRangeCmd = &cobra.Command{
Expand Down Expand Up @@ -227,7 +218,7 @@ var dbReingestRangeCmd = &cobra.Command{
err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
parallelJobSize,
argsInt32[1]-argsInt32[0],
)
}

Expand Down
44 changes: 44 additions & 0 deletions services/horizon/internal/expingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,50 @@ func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from }
func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

func TestParallelReingestRange(t *testing.T) {
config := Config{}
var (
rangesCalled sorteableRanges
shutdowns int
m sync.Mutex
)
factory := func(c Config) (System, error) {
result := &mockSystem{}
result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run(
func(args mock.Arguments) {
r := ledgerRange{
from: args.Get(0).(uint32),
to: args.Get(1).(uint32),
}
m.Lock()
defer m.Unlock()
rangesCalled = append(rangesCalled, r)
// simulate call
time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50)))
}).Return(error(nil))
result.On("Shutdown").Run(func(mock.Arguments) {
m.Lock()
defer m.Unlock()
shutdowns++
})
return result, nil
}
system, err := newParallelSystems(config, 3, factory)
assert.NoError(t, err)
err = system.ReingestRange(0, 2050, 258)
assert.NoError(t, err)

sort.Sort(rangesCalled)
expected := sorteableRanges{
{from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280},
{from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050},
}
assert.Equal(t, expected, rangesCalled)
system.Shutdown()
assert.Equal(t, 3, shutdowns)

}

func TestParallelReingestRangeError(t *testing.T) {
config := Config{}
var (
systems []*mockSystem
Expand Down

0 comments on commit 2cd8e4e

Please sign in to comment.