-
Notifications
You must be signed in to change notification settings - Fork 499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
services/horizon: Parallelize db reingest range
#2724
Conversation
867cc8a
to
a56a33d
Compare
case <-ps.shutdown: | ||
return | ||
case reingestRange := <-ps.reingestJobQueue: | ||
err := s.ReingestRange(reingestRange.from, reingestRange.to, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReingestRange
is blocking so in case of the shutdown signal it will still need to finish it's job. Is it by design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the alternative?
requestedRange ledgerRange | ||
} | ||
|
||
type ParallelSystems struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the code related to ParallelSystems
to a separate file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it, but it's odd that a separate file imports symbols from main.go
@bartekn PTAL. I addressed your comments, reincorporated |
07200dc
to
64f44a6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment with a proposal to make it simpler.
|
||
const ( | ||
historyCheckpointLedgerInterval = 64 | ||
minBatchSize = historyCheckpointLedgerInterval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only needed in case of state ingestion (like in verify-range). It's totally fine to ingest ranges smaller than 64 ledgers. We should remove len(range) >= 64
restriction.
} | ||
|
||
return firstErr | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simplify it more:
- A single shutdown signal should be enough to handle everything.
- We don't really need
reingestJobResult
this can be handled inside a worker function. - We don't need global wait group.
Shutdown
should just sent a shutdown signal and makeReingestRange
method to wrap up. - We don't need to start workers when creating a method. Maybe we'll need workers in other methods than
ReingestRange
but now we don't.
After implementing the changes I have the following code:
func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error {
var reingestJobQueue = make(chan ledgerRange)
var erroredMutex sync.Mutex
var errored bool
func markError() {
erroredMutex.Lock()
errored = true
erroredMutex.Unlock()
}
wg.Add(1)
// can be moved to a method
go func() {
defer wg.Done()
for subRangeFrom := fromLedger; subRangeFrom < toLedger; {
// job queuing
subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch
if subRangeTo > toLedger {
subRangeTo = toLedger
}
select {
case <-ps.shutdown:
return
case reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}:
}
subRangeFrom = subRangeTo + 1
}
}()
for i := 0; i < workers; i++ {
wg.Add(1)
// can be moved to a method
go func() {
defer wg.Done()
s, err := systemFactory(config)
if err != nil {
log.Error("...")
s.Shutdown()
markError()
return
}
for {
select {
case <-ps.shutdown:
return
case reingestRange := <-reingestJobQueue:
err := s.ReingestRange(reingestRange.from, reingestRange.to, false)
if err != nil {
log.Error("...")
s.Shutdown()
markError()
return
}
}
}
}
}
wg.Wait()
close(reingestJobQueue)
if errored {
return errors.New("one or more jobs failed")
}
return nil
}
func (ps *ParallelSystems) shutdown() {
close(ps.shutdown)
}
func (ps *ParallelSystems) Shutdown() error {
// sync.Once
ps.shutdownOnce.Do(msr.shutdown)
return nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- A single shutdown signal should be enough to handle everything.
Not if you have persistent workers (read below)
- We don't really need
reingestJobResult
this can be handled inside a worker function.
Your example has oversimplified error management. If you want to give account of up until what point things where ingested properly (which I will implement shortly after this PR is merged), you need to know what ranges where successful.
- We don't need global wait group.
Shutdown
should just sent a shutdown signal and makeReingestRange
method to wrap up.
Not if you have persistent workers (read below)
- We don't need to start workers when creating a method. Maybe we'll need workers in other methods than
ReingestRange
but now we don't.
I did it this way to that we can parallelize stress testing and normal history reingestion later on.
After implementing the changes I have the following code
I am not sure how msr
or shutdownOnce
are handled. Also, error management is oversimplified. As I mentioned above, you do need to consume the results to know what ranges where processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bartekn and I agreed that he will commit his suggestion and we will take it from there.
OK, I pushed my code as discussed with @fons. I haven't updated tests, will do it after 👍 from you. I also added a code that return a suggested range to restart a job in case of failure. There's a comment above |
|
||
} | ||
|
||
func (ps *ParallelSystems) Shutdown() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes sense to have this anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, having the ability to shutdown an in-progress reingestion will break error reporting (there won't be any guarantee of the lowest failing ledger range being reported since any remaining jobs can be aborted without reporting an error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing shutdown channel won't abort running jobs. If there's a running ReingestRange
method in any worker it will complete (succeed or fail but will complete). Also, because the queue channel is unbuffered you won't have any remaining jobs in a buffer (the go routine adding new jobs will return due to <- ps.shutdown
). Even if this was true, all jobs will complete because there's a wait group that blocks until all go routines return.
Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, but I believe it's a bit brittle (prone to break if the code changes). It's probably a good idea to add a test to make it future-proof (a test in which there is an error and one of the pending jobs also errors). I am happy to add that.
Leaving that aside, I think we agree that Shutdown()
is not needed anymore since there is no need to cancel running operations or cleanup anything.
} | ||
lowestRangeErrMutex.Unlock() | ||
} | ||
ps.Shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's awkward that an operation shuts the whole ParallelSystems
down. I would remove the global shutdown channel, create one here and pass it to the workers.
}, nil | ||
} | ||
|
||
func (ps *ParallelSystems) reingestWorker(reingestJobQueue <-chan ledgerRange) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use a verb reingestWorker
(e.g. runReingestWorker()
, doReingestWork()
)
) | ||
|
||
wg.Add(1) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a reason to do this in a separate goroutine. You could run the inner for loop after spawning the workers.
// Because of this when we reach `wg.Wait()` all jobs previously sent to a channel are processed (either success | ||
// or failure). In case of a failure we save the range with the smallest sequence number because this is where | ||
// the user needs to start again to prevent the gaps. | ||
lowestRangeErr *rangeError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't need to be a pointer (no strong opinions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it wasn't a pointer we'd need another bool
variable to determine if there was an error or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use the error inside the rangeError
(if it's nil then no error happened). But, as I said, no strong opinions here.
@fons thanks for a quick review! I added the requested changes and fixed tests. It's great that we're at the same page when it comes to aborting jobs. I removed I won't be available in the morning tomorrow so please update the code if there's something that's blocking you from merging it. |
5d41abd
to
db51fba
Compare
I think we are done. This PR targets the 1.5.0 branch, but my plan is to hold off from merging to avoid disrupting the 1.5.0 release, and just wait until 1.5.0 is merged into master. But, if @ire-and-curses / @abuiles are OK with it, I will merge now. |
Let's target this for 1.6 release. |
24f08a5
to
9713de2
Compare
This change breaks down the ledger range to reingest in subranges which are submitted to a pre-defined number of workers, processing the subranges in parallel. For now, the workers are simply Go routines using their own `System` (with their own DB connections etc ...). In the future workers could be fully fledged Horizon instances running in multiple machines (e.g. orchestrated through Kubernetes Jobs or AWS Batch Jobs). New flags: --parallel-workers: [optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers --parallel-job-size: [optional] parallel workers will run jobs processing ledger batches of the supplied size
This reverts commit 2cd8e4e.
9713de2
to
526d03d
Compare
PR Checklist
PR Structure
otherwise).
services/friendbot
, orall
ordoc
if the changes are broad or impact manypackages.
Thoroughness
.md
files, etc... affected by this change). Take a look in the
docs
folder for a given service,like this one.
Release planning
needed with deprecations, added features, breaking changes, and DB schema changes.
semver, or if it's mainly a patch change. The PR is targeted at the next
release branch if it's not a patch change.
What
This change breaks down the ledger range to reingest in subranges which are submitted to a pre-defined number of workers, processing the subranges in parallel.
For now, the workers are simply Go routines using their own
System
(with their own DB connections etc ...).In the future workers could be fully fledged Horizon instances running in multiple machines (e.g. orchestrated through Kubernetes Jobs or AWS Batch Jobs).
New flags:
--parallel-workers: [optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers
--parallel-job-size: [optional] parallel workers will run jobs processing ledger batches of the supplied size
Why
We want reingestion to be faster. Addresses #2552
Known limitations
As I mentioned, this change only applies to a single machine. Running it in multiple machines will require further work.
This PR is targetting
release-1.5.0
(since that's the base branch I used). We should retarget it to master (oncerelease-1.5.0
is merged) before merging.