Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Automatic Worker Process Recycling #262

Open
bmb1603 opened this issue Jul 18, 2024 · 14 comments
Open

Feature Request: Automatic Worker Process Recycling #262

bmb1603 opened this issue Jul 18, 2024 · 14 comments

Comments

@bmb1603
Copy link

bmb1603 commented Jul 18, 2024

Currently, Solid Queue worker processes run indefinitely, which can lead to memory bloat and potential resource leaks over time. This is especially problematic for long-running, high-volume systems.

Feature request:
Implement an automatic worker process recycling mechanism. This could:

  1. Restart processes after a configurable number of jobs processed
  2. Restart processes after a configurable time period
  3. Restart processes when memory usage exceeds a certain threshold

This feature would help maintain a clean state, prevent memory leaks, and ensure consistent performance without manual intervention.

Potential configuration options:

SolidQueue.configure do |config|
  config.max_jobs_per_worker = 1000
  config.max_worker_lifetime = 1.hour
  config.max_worker_memory = 512.megabytes
end
@rosa
Copy link
Member

rosa commented Jul 18, 2024

Hey @bmb1603, thanks for writing this up. We have something quite similar in our backlog: automatically restarting after a certain number of jobs. I'm not sure this will make it to v1.0 as I have other priorities but if you or anyone wants to take a stab a this, please do! I'd probably go for max number of jobs as a criteria to start because it seems the simplest, and it might make sense to be able to configure globally but also per worker.

@bmb1603
Copy link
Author

bmb1603 commented Jul 18, 2024

Thank you for your reply @rosa

In this context, is there currently a built-in mechanism to clean up unused queues created via wildcard patterns? Otherwise, could you advise what's the recommended way to manually shut down specific workers that are no longer needed? For example, if we have workers listening to object_*_queue, how can we shut down the worker for object_123_queue when it's no longer required?

@rosa
Copy link
Member

rosa commented Jul 18, 2024

Not sure I completely understand 🤔 Would it be enough to remove that worker from your configuration and restart?

@bmb1603
Copy link
Author

bmb1603 commented Jul 18, 2024

So if I configure

production:
  workers:
    - queues: "object_*_queue"
      processes: 1
      threads: 1

And then have a Job like that:

class ParallelJob < ApplicationJob
  queue_as do
    object_id = arguments.first.id
    "object_#{object_id}_queue"
  end

In this example, when enqueueing ParallelJob.perform_later(object), SolidQueue would "dynamically" start a new Worker for each Queue/Object, right? The question is, how do I stop Workers in case I don't plan to schedule any further Jobs for a Queue/Object.

@rosa
Copy link
Member

rosa commented Jul 18, 2024

Ahh, ok!

queues: "object_*_queue"

This won't quite work, * is only allowed at the end, for prefix matches. This would be ignored, so your worker won't fetch any queues. You'd need to specify this as:

queues: "object_*"

Assuming that, then

SolidQueue would "dynamically" start a new Worker for each Queue/Object, right?

Not exactly; it'll start just one worker for any queues that match that prefix. The same worker will process jobs in queue object_a than in queue object_b or object_123.

@bmb1603
Copy link
Author

bmb1603 commented Jul 18, 2024

Ahh got it, thanks for clarifying, it all makes sense now!

@hms
Copy link
Contributor

hms commented Sep 6, 2024

@rosa I'm taking a run at implementing this since I'm deployed on Heroku which necessitates periodically killing workers that have grown too big for their own good... (Heroku gives one a choice between large bills or ridiculously limited resources)

Before V0.5, I was able to use an around_perform block and send a SIGTERM to the worker when I hit my OOM criteria. After V0.5, signaling the worker has become less reliable (it doesn't reliably restart the worker and generates prune errors). Given how hacky that solution was to start with, I've decided to try and do things the Right Way(™) and submit a PR.

I've been trying to find the right alchemy to gracefully stop the worker. If I don't stop the worker cleanly, it seems some jobs can get left in a state where they don't get restarted (or if they do, it takes so long that I've not been patient enough to for the SolidQueue maintenance processes to pick them up again).

It looks like calling Worker#stop is almost right but it can leave Jobs in a claimed state that can take a fair amount of time to clear. Interestingly, while this should be equivalent to using Signals to terminate the worker, the workers reliably restart which wasn't the case with signaling from the around_perform block.

Any suggestions for a way to gracefully release outstanding claims and letting the worker finish any in-flight jobs would be appreciated.

@hms
Copy link
Contributor

hms commented Sep 7, 2024

@rosa I've tracked down the root cause of my confusion.

When I force restart a worker with an in-flight job, that leaves any blocked executions waiting for the concurrency expiry + the maintenance window in order to release the concurrency lock.

Is there any database backed book keeping that would let me identify the in-flight job that got killed without finishing? And then, is there any tooling that I could use to effectively trigger an immediate maintenance processing step based on those jobs or is this part of what I have to build to complete this feature?

Thanks.

@rosa
Copy link
Member

rosa commented Sep 8, 2024

Hey @hms! Sorry for the delay in reviewing this; I was busy with other recent Solid Queue stuff.

When I force restart a worker with an in-flight job, that leaves any blocked executions waiting for the concurrency expiry + the maintenance window in order to release the concurrency lock.

Aha! This seems like a bug/oversight related to some new behaviour I introduced so that in-flight jobs are marked as failed when their worker terminates without giving them time to finish. This is here when the process is pruned because it died without having the chance to deregister, and still had claimed jobs, and here when the supervisor detects the process has died and replaces it with another one. These ultimately call this method, that marks the job as failed but doesn't unblock other jobs waiting on that same concurrency key because that's done as part of perform here, but in this case failed_with is called outside perform.

This is something I need to fix... I need to do some changes around process pruning and processes dying/reviving as part of an issue that came up in #324, so I'll handle that. I think that'd answer your question:

And then, is there any tooling that I could use to effectively trigger an immediate maintenance processing step based on those jobs or is this part of what I have to build to complete this feature?


However, looking at this feature in particular, which is the main goal, I think I'd go with something simpler that avoids having to kill a worker mid-job from the job. I assume your workers' memory grows as they run more jobs... would it be simpler to configure a max number of jobs that a worker is allowed to perform before it terminates itself? It could be also a memory limit but the number of jobs is much easier to measure as you just need a counter. Then, it'd be up to the worker to count how many jobs it has processed, and just shut down gracefully when that's the case 🤔 . Then the supervisor will replace it. Do you think that could work for your case?

@hms
Copy link
Contributor

hms commented Sep 8, 2024

@rosa Thank you for getting back to me.

I found all of the bits of code reference above and even had some "fixes" for the issues at hand, but since it was very quickly changing from features to architectural / philosophical design level issues, I didn't want to go any further without A) coordinating with you; B) seeing where you want to head with some of these issues (if address them at all!); C) because some of the changes could end up being pretty fundamental, if you are ok with my taking a crack at them or if you felt it something that had to be handled by the SolidQueue core team.

At the high level, I think the notion of worker recycling is a pretty important feature. But looking under the hood, it reveals a couple of significant questions and potentially touches a lot of the overall implementation:

  • If we allow for worker recycling, then do we want to support the notion of "wait until the Pool is empty / drained" before shutdown? (Given I have some long running and very resource expensive jobs that are not decomposable any further, I'd prefer to have the choice/control).

    • For a single threaded worker, choosing to wait for any existing thread to complete is no different than letting the thread complete without workers restarting -- it's blocking the queue until completion.
    • For multithreaded workers, it's a more complex issue since each thread potentially represents a lot of lost resources / work while shutdown waits on for the Pool to drain.
  • The worker constraints under consideration and if they are worth the effort:

    • Memory constraints: Many of us are resource constrained for one reason or another and this can have significant DevOps issues on some deployment platforms (I'm taking about you Heroku). This is not hard to implement if you are open to a new SolidQueue config -- a Proc that returns the current memory in bytes. This way, SolidQueue doesn't have to implement something that works across platforms/OSs or take on a Gem dependency. I would be inclined to implement reporting both soft OOM and hard OOM, and restart on hard OOM.
    • Job count: Restart worker after N jobs have been run. It's a little more effort than the others, but not that big a deal.
    • Worker aging: Restart the worker every N units of time.

Both Job count and Worker aging address Jobs that have issues with resource leaks (possibly out of the developers control via 3rd party Gems / APIs). Also, Job count can be a less than ideal way to implement the memory control as restart with N = 1 absolutely controls memory. So, all three are interesting and potentially valuable to implement. While the configuration for these options I think should fall under the worker by queue(s) section, they would be tracked, implemented on a per process basis without cross process accounting.

  • How to communicate with the Supervisor that this worker stopped based on a shutdown request Vs. crash and with Vs. without inflight jobs. This would allow for easy decision to run or skip def handle_claimed_jobs_by(terminated_fork, status).

    • Given the current implementation, I think adding columns to the Process table: controlled_shutdown: boolean, default: false and terminated_jobs: integer, default: 0 that got populated as part of the ensure would allow removing any ambiguity in the upstream processing in the supervisor during worker restarts.

@hms

@rosa
Copy link
Member

rosa commented Sep 9, 2024

This is not hard to implement if you are open to a new SolidQueue config -- a Proc that returns the current memory in bytes. This way, SolidQueue doesn't have to implement something that works across platforms/OSs or take on a Gem dependency.

Ah, that's a good idea! +1 to a Proc you provide to return the memory, combined with a configured threshold.
We can start with that one and not do job count. It was just an idea to do it in a simpler way, but if a Proc works, then that's better. And yes, this could be configurable per worker. I'd keep it as simple as possible now and would only have a single threshold, no reporting threshold, just the termination one.

If we allow for worker recycling, then do we want to support the notion of "wait until the Pool is empty / drained" before shutdown?

I think I'd rely on the currently existing SolidQueue.shutdown_timeout for this, the time a worker has to terminate in-flight work before exiting. I'd have that apply to this threshold as well. When the memory threshold is reached, we initiate the shutdown as if it was triggered by a TERM or INT signal, and this timeout applies.

How to communicate with the Supervisor that this worker stopped based on a shutdown request Vs. crash and with Vs. without inflight jobs.

Do you think the current way the supervisor detects this wouldn't work in this case?

@hms
Copy link
Contributor

hms commented Sep 9, 2024

@rosa

Do you think the current way the supervisor detects this wouldn't work in this case?

The issue I sort of backed into with the supervisor is that it currently fails claimed jobs via handle_claimed_jobs_by(terminated_fork, status). I assumed this was intentional and that I didn't understand the use-case(s), so I was looking for tooling to allow disambiguating between: workers that were pruned and where the job should be failed to maintain the existing behavior and workers that are being recycled intentionally.

@rosa
Copy link
Member

rosa commented Sep 11, 2024

The issue I sort of backed into with the supervisor is that it currently fails claimed jobs via handle_claimed_jobs_by(terminated_fork, status). I assumed this was intentional

Yes! But this only happens if the supervisor thinks the process exited ungraciously. I added it because before, we would have problematic jobs that caused a worker to get killed and then these jobs would be released and put back in the queue, only to be picked up by another worker that would be killed and so on 😅 Not desirable. In that case, we want the jobs to fail. In the regular situation, however, if a worker has time to shut down orderly, it should run this callback, which in turn would destroy the process record and release claimed jobs.

Are you finding a scenario where this is not happening correctly?

@hms
Copy link
Contributor

hms commented Sep 11, 2024

@rosa

Are you finding a scenario where this is not happening correctly?

Yes, I am running into this as an issue. Because SolidQueue is doing a lot via ConcurrentRuby; can have many jobs with their "fingers" in the database (introducing lock waits and random ordering of database writes); and the shutdown process is, by design, async and and with soft and hard time limits that might not align with a running Jobs time to complete, it seems it's easy to leave a Job in a state that gets reaped by the Supervisor.

Since the fork_reaping is there to specifically address a real-world problem, rather than remove it, I'd like enhance it by making orderly / disorderly shutdown information available to the supervisor. This way, jobs that source from an orderly shutdown are 100% safe to not be failed. For a quick, let's touch as little as code possible, I could update the Process.metadata hash with an orderly shutdown flag. Then, the supervisor can interrogate that value and fail or restart the job accordingly.

On to separate issues where I would appreciate a little feedback and/or guidance from you:

My current approach for recycle on OOM is:
Mix-in a recyclable concern into the worker and the pass the worker into pool.post (since the logic for recycling belongs to the worker). The worker makes recycle decisions as close to actual completion of the Job that's run via the Pool.thread. As long as there is the possibility of other threads still running, I'm uncomfortable with #exit! on job completion and OOM.
To this end, what I'm doing is:

def post(execution)
      available_threads.decrement

      future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
        wrap_in_app_executor do
          thread_execution.perform
        ensure
          shutdown if worker.recycle_on_oom!
          available_threads.increment
          mutex.synchronize { on_idle.try(:call) if idle? }
        end
      end

    ....
 end

Worker.recycle_on_oom! here calls #stop on itself. This way, the pool drains of resources (just in case someone was to use multi-threading as an option with worker recycling -- which offers the opportunity to forever trap a slow, but memory inexpensive Job behind a fast, but memory intensive Job. That would be Very Bad(™).

@rosa
Unfortunately, I'm still missing something and this is not quite right. Randomly, Jobs don't finish processing (despite reaching the code in the ensure). Even worse, the Worker becomes inoperative, where it continues to poll, but never gets new work and ignores signals. If I stop SolidQueue, this worker will become a Zombie. Any help or suggestions would be great.

On a side note, I would love some feedback on the tradeoff of simply documenting the risks of multithreaded workers that can recycle on OOM Vs. simply not allowing workers to recycle if they are set to use threads. It's one heck of a foot-gun and it will be more than "just a flesh wound" if the application falls into the wrong timing pattern.

Lastly, I'm running into what looks like a SolidQueue deadlock on Job completion and the worker gets into a bad state where it polls in a tight loop and ignores shutdown requests/demands from the supervisor. Should the Supervisor stop, it becomes a Unix zombie process (Not sure what happens on windows). Unix zombies are a real DevOps issues. So, I've added this for your consideration:

def prune
  deregister(pruned: true)
  kill_pruned_process(self) # New
end

# We only get there if a Process has stopped heart beating and just was just removed from the 
# Process table (i.e.,. it's dead to SolidQueue). Unfortunately, that only makes it invisible and 
# can eaves it with runnable access to the solid_queue tables but doing who knows what or when.
#
# This makes sure if we remove it from the database (above via deregister), we remove it from the world.
# Note: when in this state, it's become a petulant child(process) and ignores SIGTERM and SIGQUIT, so
# we are reaching for a bigger.... signal
def kill_pruned_process(pruned_process)
   SolidQueue.logger.debug { "Killing pruned processes #{pruned_process.inspect}" }
      ::Process.kill :SIGKILL, pruned_process.pid
    rescue SystemCallError
      # Ignored
end

hms added a commit to ikyn-inc/solid_queue that referenced this issue Sep 17, 2024
This PR adds two new configuration parameters:
* recycle_on_oom to the Worker (via queue.yml)
* calc_memory_usage as a global parameter (via application.rb,
  environment/*.rb, or an initializer)

There are no specific unit requirements placed on either of these new
parameters. What's important is: They use the same order of magnitude
and they are comparable.

For example, if the calc_memory_usage proc returns 300Mb as 300 (as
in Megabytes) then the recycle_on_oom set on the work should be 300 too.

Any worker without recycle_on_oom is not impacted in anyway.
If the calc_memory_usage is nil (default), then this oom
checking it off for workers under the control of this Supervisor.

The check for OOM is made after the Job has run to completion and
before the SolidQueue worker does any additional processing.

The single biggest change to SolidQueue, that probably requires
the most review is moving the job.unblock_next_blocked_job out of
ClaimedExecution and up one level into Pool.  The rational
for this change is that the ensure block on the Job execution
is not guarrenteed to run if the system / thread is forcibly shutdown
while the job is inflight.  However, the Thread.ensure *does* seem
to get called reliably on forced shutdowns.

Give my almost assuredly incomplete understanding of the concurrency
implementation despite Rosa working very hard to help me to grok it,
there is some risk here that this change is wrong.

My logic for this change is as follows:
* A job that complete successfully would have release its lock -- no
  change
* A job that completes by way of an unhandled exception would have
  released its lock -- no change
* A job that was killed inflight because of a worker recycle_on_oom
  (or an ugly restart out of the users control -- again, looking
  at you Heroku) needs to release its lock -- there is no guarantee
  that its going to be the job that starts on the worker restart.  If
  release its lock in this use-case, then it doesn't, then that worker
  could find itself waiting on the dispatcher (I think) to expire
  Semaphores before it is able to take on new work.

Small fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants