-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(dot/sync): improve worker pool #4258
base: development
Are you sure you want to change the base?
Conversation
Created as a draft for two reasons:
|
4c0d5cb
to
c875d08
Compare
dot/sync/service.go
Outdated
workerPool: NewWorkerPool(WorkerPoolConfig{ | ||
MaxRetries: 5, | ||
// TODO: This should depend on the actual configuration of the currently used sync strategy. | ||
Capacity: defaultNumOfTasks * 10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why times 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an arbitrary value that "should be enough" while I was testing the branch on the Westend genesis instance. Apart from wanting to do this testing, this TODO and the other one about back pressure are the reasons why this is still a draft.
Do you think it makes sense to use the actual value for number of tasks configured in the sync strategy? And should we add some extra space here or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok got it, we can move it to a constant to self explain the usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to try and make this depend on the configuration of the strategy. But if that's too invasive, I'll move it to a constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done this in b6cf4d7. It required extending the Strategy
interface and delaying initialization of the worker pool.
}, | ||
}) | ||
} | ||
task, ok := result.Task.(*syncTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we define result.Task
over a generic? so we can skip this casting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered making this generic and decided against it, but I can't remember why. Will give it a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tried and the reason I didn't do this is because SyncService
is supposed to maintain the worker pool and work with any kind of Strategy
. This won't work if the worker pool is a generic type instantiated with *syncTask
.
With the current implementation, only the strategies themselves know the type of the tasks processed by the worker pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, that makes me think that the strategies should be responsible for creating their own workerpool maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still in favor of having the service manage one pool that is used for all strategies. I think it's easier to manage in terms of concurrency and good separation of concern. Is it worth giving that up to avoid a few casts? The strategies definitely know what concrete type they use as sync tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree that the service should be responsible for the working pool, but I'm still trying to find a way to improve the types. We can keep the casts in the meanwhile
0588ed0
to
a828b43
Compare
I've addressed the TODO in f4a9ccc. Not sure if this is the right approach, but I think it is better than always adding new tasks, risking a blocked goroutine. |
5b1f455
to
03f66ec
Compare
e10cc50
to
b64d894
Compare
The main difference in the worker pool API is that SubmitBatch() does not block until the whole batch has been processed. Instead, it returns an ID which can be used to retrieve the current state of the batch. In addition, Results() returns a channel over which task results are sent as they become available. The main improvement this brings is increased concurrency, since results can be processed before the whole batch has been completed. What has not changed is the overall flow of the Strategy interface; getting a new batch of tasks with NextActions() and processing the results with Process(). Closes #4232
When the worker pool falls behind processing tasks, the service won't ask the strategy for more tasks and instead directly runs Process() again.
It doesn't make sense to give up since we need the blocks in that request to progress.
1b5b722
to
3502cd4
Compare
3502cd4
to
136bcc2
Compare
The main difference in the worker pool API is that
SubmitBatch()
does not block until the whole batch has been processed. Instead, it returns an ID which can be used to retrieve the current state of the batch. In addition,Results()
returns a channel over which task results are sent as they become available.The main improvement this brings is increased concurrency, since results can be processed before the whole batch has been completed.
What has not changed is the overall flow of the Strategy interface; getting a new batch of tasks with
NextActions()
and processing the results withProcess()
.Changes
dot/sync/worker_pool.go
SyncService
to the API changes of the new worker poolTests
go test github.com/ChainSafe/gossamer/dot/sync
Issues
Closes #4198