Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency not properly putting jobs in the queue #603

Closed
baka-san opened this issue May 20, 2022 · 5 comments
Closed

Concurrency not properly putting jobs in the queue #603

baka-san opened this issue May 20, 2022 · 5 comments

Comments

@baka-san
Copy link
Contributor

baka-san commented May 20, 2022

My setup

I'm not sure if this is a bug or if I'm not understanding how to use concurrency with GoodJob. I'm using jobs to manage a web crawler which grabs posts from index pages. I will queue 100s or 1000s of jobs at a time (there are many post pages), but I only want a few of those jobs to be executed at the same time, as to not get blocked by the websites I am crawling. So, setting a concurrency value seemed like the way to go. I'm testing with a very simple job:

class TestJob < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  queue_as :default
  good_job_control_concurrency_with(perform_limit: 1, enqueue_limit: 10000, key: -> { "TestJob" })

  def perform(...)
    # Crawl a post on a website
    sleep 20
  end
end

Then, from the console, I run something like 5.times TestJob.perform_later.

Expected behavior

I would expect that only 1 job would be performed at a time, as specified by perform_limit: 1. I'd expect the other jobs to be put in the queue, but never tried. Once the first job finishes (after 20 seconds), I'd expect the next job in the queue to immediately run and so on until all the queued jobs are finished.

In the dashboard, I'd expect the state of one job to be "running" while the others would be "scheduled/queued."

Actual behavior

What actually happens is that ALL of the jobs are tried. One succeeds, as expected, however the others are tried and fail with the error GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError. I'd expect them not to be tried at all and just wait in a queue. The inactive jobs are tried 2 or 3 times quickly and then over time they are retried due to the standard retry functionality. Eventually all of the jobs do run, but they do not run immediately after a job finishes running and only if they are retried when another TestJob is not running.

Screen Shot 2022-05-20 at 11 37 48

Screen Shot 2022-05-20 at 11 37 59

Screen Shot 2022-05-20 at 11 38 12

Screen Shot 2022-05-20 at 11 38 32

Screen Shot 2022-05-20 at 12 03 54

Potential solutions

I'm hoping there is a way to do this with the concurrency setting, as that would be simple, scalable, and clean. I've thought about using around_perform and recursively creating jobs after each one finishes, but that sounds like mess to track and do error handling.

@bkeepers
Copy link
Contributor

I've been experiencing this too and been meaning to ask about it.

I worked around it with a really crude Throttle class that ensures something only happens once within a given duration.

require "throttle"

class MyJob < ApplicationJob
  class_attribute :throttle, default: Throttle.new(1.second)
  queue_as :low

  def perform
    throttle.call do
      # api requests here
    end
  end
end

Throttle is basically just a mutex with a sleep to ensure enough time has elapsed:

# Only allow a thing to happen once in the given duration
class Throttle
  def initialize(duration = 1.second)
    @duration = duration
    @mutex = Mutex.new
    @clock = Time.new(0)
  end

  def call(&block)
    @mutex.synchronize do
      # How long do we need to wait for the next call?
      wait = @duration - (Time.now - @clock)

      # Sleep if the desired duration has not elapsed
      sleep wait if wait > 0.0

      # Reset clock for next call
      @clock = Time.now
    end

    # Have a nice day, sir
    block.call
  end
end

It works as long as API requests are coming from a single process, but also blocks the thread, so it slows processing of jobs. I've worked around this by ensuring it's the absolute lowest priority.

@bensheldon
Copy link
Owner

bensheldon commented May 20, 2022

@baka-san You've accurately described the Concurrency extensions implementation:

What actually happens is that ALL of the jobs are tried. One succeeds, as expected, however the others are tried and fail with the error GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError. .... The inactive jobs are tried 2 or 3 times quickly and then over time they are retried due to the standard retry functionality. Eventually all of the jobs do run, but they do not run immediately after a job finishes running and only if they are retried when another TestJob is not running.

You can go back to the original PR or just read the code (it's about ~100 LOC).

Fine-grained concurrency control is, imo, going to be a tradeoff with performance, and I'm open to ideas for how to implement it less stochastically than an optimistic-retry-with-noisy-incremental-backoff strategy.

I'm also open to ideas for both how to make the current implementation less unexpected, as well if you believe that the concurrency contract isn't being met.

Just as advice, because you mention webcrawling, I think you should store your link tree in its own table(s), and then have the jobs enqueue recursively from that rather than enqueuing all found links at once. Thinking about it now, I think it's a mismatch between GoodJob's optimistic concurrency strategy, and a pessimistic scenario (which maybe means that should also go in the documentation).

@baka-san
Copy link
Contributor Author

@bensheldon

You can go back to the original #281 or just read the code (it's about ~100 LOC).

Ok, I thought this might be the case. I think it's a completely logical way to do it, I just wasn't sure how the enqueuing worked.

I'm also open to ideas for both how to make the current implementation less unexpected, as well if you believe that the concurrency contract isn't being met.

I also considered saving the URLs to the DB and grabbing them from there with cron job or something. This would also solve the uniqueness issue of URLs rather than trying to make sure the jobs are unique by URL as well (I don't want to crawl the same page twice). I think that's going to be a better solution in my case, so thanks for discussing it with me. At this point, I don't have any feature requests for concurrency, as it works as intended. Everything got run eventually and perform_limit was obeyed.

@bensheldon
Copy link
Owner

@baka-san thank you! I should write this into the documentation because there are probably lots of people who had the same question/worry, and I super appreciate you opening the Issue to raise that to me 🙏

@baka-san
Copy link
Contributor Author

@bensheldon Yea, thanks for making a great gem. Very excited to use this in the business I'm building 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants