-
-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deliver Messages Async #588
Deliver Messages Async #588
Conversation
Prior to this change delivering 5000 messages took on average: user system total real 6.351633 0.535708 6.887341 ( 42.842282) After this change, I am able to delivery 5000 messages in: user system total real 7.160261 1.237327 8.397588 ( 7.676820) In my rough benchmarks I see a 5-7x increase. This is valuable for us because we enqueue a large number of jobs at once in a loop, something akin to: ```ruby MyModel.expensive_query.each { |obj| MyJob.perform_later(obj) } ``` Usable by setting: ```ruby config.active_job.queue_adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new ``` Handlers are available for both success and failure and must respond to call. Example: ```ruby adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new adapter.success_handler = ->(job, options) { StatsD.increment(job.class.name + "success") } adapter.error_handler = ->(err, (job, options)) { StatsD.increment(job.class.name + "failure") } config.active_job.queue_adapter = adapter ```
@@ -1,7 +1,7 @@ | |||
module Shoryuken | |||
class DefaultWorkerRegistry < WorkerRegistry | |||
def initialize | |||
@workers = {} | |||
@workers = Concurrent::Hash.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this because I'm worried https://github.com/allcentury/shoryuken/blob/712b35c4dd253ef275d7756760b604cd184d45bd/lib/shoryuken/extensions/active_job_adapter.rb#L34 is not thread safe.
# | ||
# config.active_job.queue_adapter = adapter | ||
class ShoryukenConcurrentSendAdapter < ShoryukenAdapter | ||
attr_accessor :error_handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to moving this as constructor requirement but the advantage to leaving this as an attr_accessor
is callers can switch the handlers for different tasks. For instance, we might care about an error handler for a mission critical code path and not in others. Allowing callers to change this at run time has value for us but I'm open to alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing callers to change this at run time has value for us but I'm open to alternatives.
@allcentury Would change at runtime be thread-safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@phstc - probably not, I'm also naively thinking of my own use case, where a k8s cron task is run. I can change this in the task though so I think using dependency injection (like your added commit does) is the way to go.
@allcentury This looks great 🎉 I will QA it locally and then We should eventually combine some code for the active job adapters with the standard workers, but I believe this is outside of the scope of this pull request. |
Hi @allcentury Awesome work on this change 🎉 I did some minor changes in #589 and merged it into master. |
@phstc - wonderful! Thanks again |
@allcentury 5.0.3 is out with your changes, can you test it out? Thank you 🎉 |
Hi @allcentury Do you want to add a section in Sending a message |
@allcentury good call. Since it is only supported with AJ, the wiki you mentioned makes more sense! thanks for checking 🎉 |
I updated the docs here: https://github.com/phstc/shoryuken/wiki/Rails-Integration-Active-Job thanks @phstc ! |
Prior to this change delivering 5000 messages took on average:
After this change, I am able to delivery 5000 messages in:
In my rough benchmarks I see a 5-7x increase. This is valuable for us because we enqueue a large number of jobs at once in a loop, something akin to:
Usable by setting:
Handlers are available for both success and failure and must respond to
call.
Example: