Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce concurrency controls #38

Merged
merged 32 commits into from
Nov 27, 2023
Merged

Introduce concurrency controls #38

merged 32 commits into from
Nov 27, 2023

Conversation

rosa
Copy link
Member

@rosa rosa commented Nov 7, 2023

This PR introduces concurrency controls in Solid Queue. These controls are effective only when performing jobs, and are specified in the job class, like this:

class ExampleJob < ApplicationJob
  limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit

We'll ensure that at most the number of jobs (indicated as to) that yield the same key will be performed concurrently, and this guarantee will last for duration since the last job started to run. Note that there's no guarantee about the order of execution, only about jobs being performed at the same time (overlapping).

For example:

class ExampleJob < ApplicationJob
  limits_concurrency to: 2, key: ->(arg, **) { arg }, duration: 5.minutes

In this case, we'll ensure that at most two job of kind ExampleJob with the same value for the first argument will be run concurrently. If, for any reason, one of those jobs takes longer than 5 minutes or doesn't release its concurrency lock within 5 minutes of acquiring it, a new job with the same key might gain the lock.

To implement the locks, this uses an approach based on semaphores. When a job that defines concurrency limits is going to be enqueued, we wait on the semaphore. If the semaphore is available (i.e., doesn't exist yet, or exists, but its value is lower than the concurrency limit), we update the semaphore and enqueue the job. If the semaphore is not available, instead of moving the job to the ready_executions table, we move it to a new blocked_executions table. All jobs that can't acquire semaphores go to this table. When a job whose concurrency is limited finishes, it will signal the semaphore and try to unblock the first blocked execution, if any.

It's possible something goes wrong when unblocking the next blocked execution, so there's a fallback for that: periodically, we check for semaphores that have expired (based on the configured duration) and clear them. Then, we check blocked executions that could be unblocked and promote them to ready, one by one, as each one needs to acquire the lock as well. This periodic check is currently done by the scheduler process, as I wanted to avoid overloading the workers with more stuff, plus we'll have fewer instances of the scheduler trying to do this.

In this way, there's a fallback for both actions that a job with concurrency limited needs to do upon finishing:

  • Release the semaphore: expiry time and periodic cleanup.
  • Unblock the next execution, in case the job does update the semaphore but fails to unblock the next execution: periodic check of executions that are no longer blocked by semaphores and thus, could be promoted to ready.

More info on how this selection of blocked executions to release is done here.

I'll write some more comments on the code in the relevant parts. Note that, for now, this only applies to jobs that are enqueued for immediate execution, not for jobs scheduled in the future, since these are moved to ready in batches. We don't need this for HEY, so I prefer to delay that until after we've moved all jobs.

cc @djmb @jorgemanrubia

@rosa rosa force-pushed the concurrency-controls branch 2 times, most recently from cad5b99 to 9823c32 Compare November 13, 2023 19:04
@rosa rosa force-pushed the concurrency-controls branch 3 times, most recently from 1964198 to 2aca66d Compare November 16, 2023 22:20
@rosa rosa marked this pull request as ready for review November 20, 2023 16:52
@@ -31,6 +31,8 @@ def perform
else
failed_with(result.error)
end
ensure
job.unblock_blocked_jobs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails but the rest before this goes well (that's it, the job finishes), the claimed execution would be already gone via finished or failed_with, which means we wouldn't be left with a failed job that actually finished or failed, but somehow the system doesn't recognise as such just because the unblocking of the next execution failed.

@rosa rosa force-pushed the concurrency-controls branch 5 times, most recently from 8955770 to af8ce79 Compare November 20, 2023 21:36
Copy link

@jorgemanrubia jorgemanrubia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks amazing @rosa 💯 . Added a couple of stylistic comments, but the code looks fantastic.

scope :expired, -> { where(expires_at: ...Time.current)}

class << self
def wait(job)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For your consideration: the semaphore static interface where it receives a job and then pass several jobs' attributes around feels like it could benefit from an extracting a method object. I understand why you did this, since it's a special class that acts on group of records, not a regular active record object. Still you may consider to use an additional inner class with a name like Control or Proxy that exposes the interface you want for a given job. I haven't tested this, but it could look something like:

# When using a semaphore

   private
      def acquire_concurrency_lock
        semaphore.wait
      end

      def semaphore
        Semaphore.for(job)
      end

# The exposed proxy

class SolidQueue::Semaphore < SolidQueue::Record
  scope :available, -> { where("value > 0") }
  scope :expired, -> { where(expires_at: ...Time.current)}

  class << self
    def for(job)
      Control.new(job)
    end
  end
  
  class Control
    attr_reader :job
    
    def initialize(job)
      @job = job
    end
    
    def wait
      if semaphore = self.class.find_by(key: key)
        semaphore.value > 0 && attempt_decrement
      else
        attempt_creation
      end
    end
    
    def signal
      attempt_increment
    end
    
    private
      def key
        job.concurrency_key
      end
      
      def limit
        job.concurrency_limit
      end
      
      def duration
        job.concurrency_duration
      end

      def attempt_creation
        self.class.create!(key: key, value: limit - 1, expires_at: duration.from_now)
        true
      rescue ActiveRecord::RecordNotUnique
        self.class.attempt_decrement(key, duration)
      end

      def attempt_decrement
        self.class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0
      end

      def attempt_increment
        where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0
      end
  end
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jorgemanrubia! I'll think about it and see how it could look like. I agree with you, passing the job attributes around doesn't look so nice, and you're spot on about why I did it this way.

Thanks so much for your suggestion, I'll play with it 🙏

Copy link
Member Author

@rosa rosa Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgemanrubia, I took a stab at it in 61fec0d, what do you think? I think I like it! I chose to keep the methods Semaphore.signal(job) and Semaphore.wait(job), but encapsulate the rest. I like to use the static interface from outside, although I don't feel strongly.


class << self
def unblock(count)
release_many releasable.select(:concurrency_key).distinct.limit(count).pluck(:concurrency_key)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove the select because pluck already does it for you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes! A left over indeed 😅

super.merge(batch_size: batch_size, polling_interval: polling_interval)
end
def launch_concurrency_maintenance
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to worry about a thundering herd on deployment here (and maybe on the scheduler too)? I think ideally we would start each initial task at after a random amount of time between 0 and polling_interval

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Normally we'd run one or two instances of the scheduler, but do you mean with even a single instance, getting suddenly too many unblocked jobs or too many jobs scheduled at once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm think about if the scheduler or concurrent maintenance task is deployed across many hosts, then on a deployment the first run of the timer tasks will roughly sync up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes... that's true, same with the scheduling itself like you said 🤔 I'll think about how to introduce that random delay in the best way. Before starting the schedule it's easy, as I could just call interruptible_sleep with a random value before starting but for the concurrency maintenance task, there's no way of passing a value to start then, it's either now or after the execution_interval. I think it'll need to sleep within the task, and only the first time, maybe via a variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djmb, I'm going to address this one together with the scheduler, on a separate PR, as part of this to-do. We're currently running a single instance of the scheduler, so it shouldn't be a problem for now.

end

def unblock_blocked_executions
BlockedExecution.unblock(batch_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be interested to see an example of the query plan for this. I think it might want concurrency_key prefixed index on BlockedExecution (which the index I suggested above would cover).

We might also want an index on key, value on Semaphore so we can avoid querying the table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I'll add the index on key, value, and I'll get that query plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djmb, here's how it looks with some synthetic data I created:

mysql> show create table solid_queue_blocked_executions\G
*************************** 1. row ***************************
       Table: solid_queue_blocked_executions
Create Table: CREATE TABLE `solid_queue_blocked_executions` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `job_id` bigint DEFAULT NULL,
  `queue_name` varchar(255) NOT NULL,
  `priority` int NOT NULL DEFAULT '0',
  `concurrency_key` varchar(255) NOT NULL,
  `created_at` datetime(6) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `index_solid_queue_blocked_executions_on_job_id` (`job_id`),
  KEY `index_solid_queue_blocked_executions_for_release` (`concurrency_key`,`priority`,`job_id`)
) ENGINE=InnoDB AUTO_INCREMENT=14259 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
1 row in set (0.00 sec)

mysql> show create table solid_queue_semaphores\G
*************************** 1. row ***************************
       Table: solid_queue_semaphores
Create Table: CREATE TABLE `solid_queue_semaphores` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `key` varchar(255) NOT NULL,
  `value` int NOT NULL DEFAULT '1',
  `expires_at` datetime(6) NOT NULL,
  `created_at` datetime(6) NOT NULL,
  `updated_at` datetime(6) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `index_solid_queue_semaphores_on_key` (`key`),
  KEY `index_solid_queue_semaphores_on_expires_at` (`expires_at`),
  KEY `index_solid_queue_semaphores_on_key_and_value` (`key`,`value`)
) ENGINE=InnoDB AUTO_INCREMENT=100102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
1 row in set (0.00 sec)

mysql> select count(*) from solid_queue_blocked_executions;
+----------+
| count(*) |
+----------+
|    19893 |
+----------+
1 row in set (0.01 sec)

mysql> select count(*) from solid_queue_semaphores;
+----------+
| count(*) |
+----------+
|   100101 |
+----------+
1 row in set (0.02 sec)

And here's how the query plan looks like:

mysql> EXPLAIN  SELECT DISTINCT `solid_queue_blocked_executions`.`concurrency_key` FROM `solid_queue_blocked_executions` LEFT OUTER JOIN `solid_queue_semaphores` ON `solid_queue_semaphores`.`key` = `solid_queue_blocked_executions`.`concurrency_key` WHERE (value > 0 OR `solid_queue_semaphores`.`id` IS NULL) LIMIT 100\G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: solid_queue_blocked_executions
   partitions: NULL
         type: index
possible_keys: index_solid_queue_blocked_executions_for_release
          key: index_solid_queue_blocked_executions_for_release
      key_len: 1035
          ref: NULL
         rows: 19871
     filtered: 100.00
        Extra: Using index; Using temporary
*************************** 2. row ***************************
           id: 1
  select_type: SIMPLE
        table: solid_queue_semaphores
   partitions: NULL
         type: eq_ref
possible_keys: index_solid_queue_semaphores_on_key,index_solid_queue_semaphores_on_key_and_value
          key: index_solid_queue_semaphores_on_key
      key_len: 1022
          ref: solid_queue_development.solid_queue_blocked_executions.concurrency_key
         rows: 1
     filtered: 40.00
        Extra: Using where; Distinct
2 rows in set, 1 warning (0.00 sec)

It doesn't pick the index on key, value 🤔 Not even when splitting the query into two and using this one:

mysql> EXPLAIN  SELECT DISTINCT `solid_queue_blocked_executions`.`concurrency_key` FROM `solid_queue_blocked_executions` LEFT OUTER JOIN `solid_queue_semaphores` ON `solid_queue_semaphores`.`key` = `solid_queue_blocked_executions`.`concurrency_key` WHERE value > 0 LIMIT 100\G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: solid_queue_blocked_executions
   partitions: NULL
         type: index
possible_keys: index_solid_queue_blocked_executions_for_release
          key: index_solid_queue_blocked_executions_for_release
      key_len: 1035
          ref: NULL
         rows: 19871
     filtered: 100.00
        Extra: Using index; Using temporary
*************************** 2. row ***************************
           id: 1
  select_type: SIMPLE
        table: solid_queue_semaphores
   partitions: NULL
         type: eq_ref
possible_keys: index_solid_queue_semaphores_on_key,index_solid_queue_semaphores_on_key_and_value
          key: index_solid_queue_semaphores_on_key
      key_len: 1022
          ref: solid_queue_development.solid_queue_blocked_executions.concurrency_key
         rows: 1
     filtered: 33.33
        Extra: Using where; Distinct
2 rows in set, 1 warning (0.01 sec)

Some other ideas I tried that don't seem to get a much better plan:

mysql> EXPLAIN SELECT DISTINCT `solid_queue_blocked_executions`.`concurrency_key` FROM `solid_queue_blocked_executions` WHERE `solid_queue_blocked_executions`.`concurrency_key` NOT IN (SELECT `solid_queue_semaphores`.`key` FROM `solid_queue_semaphores`) LIMIT 100\G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: solid_queue_blocked_executions
   partitions: NULL
         type: index
possible_keys: index_solid_queue_blocked_executions_for_release
          key: index_solid_queue_blocked_executions_for_release
      key_len: 1035
          ref: NULL
         rows: 19871
     filtered: 100.00
        Extra: Using index; Using temporary
*************************** 2. row ***************************
           id: 1
  select_type: SIMPLE
        table: solid_queue_semaphores
   partitions: NULL
         type: eq_ref
possible_keys: index_solid_queue_semaphores_on_key,index_solid_queue_semaphores_on_key_and_value
          key: index_solid_queue_semaphores_on_key
      key_len: 1022
          ref: solid_queue_development.solid_queue_blocked_executions.concurrency_key
         rows: 1
     filtered: 100.00
        Extra: Using where; Not exists; Using index; Distinct
2 rows in set, 1 warning (0.01 sec)

If these are too bad (considering that these shouldn't run that often, perhaps not even using the scheduler's polling interval, as this is a safety net to prevent stuck executions when something goes wrong for a job to unblock the next one, so something perhaps configured separately 🤔 ), I'll try to come up with another way to detect blocked executions, in a way that doesn't require joining two tables: eg. only try to unblock executions that have been blocked for over the concurrency_duration limit (storing a expires_at value there too). These won't necessarily be ready to be unblocked, as it's possible there have been more blocked executions bumping the semaphore's expires_at time, but they're more likely to. Then, we could just do simpler queries to the solid_queue_semaphores table with the candidate keys, and choose from there 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great that's really useful!

I've been wracking my brains and I don't see a particularly efficient way to do this as a single query, since the we need to filter on both tables. The query's performance will get worse with more rows in the table (whether there are jobs to unblock or not), so under a situation where there were a lot of jobs backed up it could become an issue (which is exactly when you don't need that problem!).

eg. only try to unblock executions that have been blocked for over the concurrency_duration limit (storing a expires_at value there too). These won't necessarily be ready to be unblocked, as it's possible there have been more blocked executions bumping the semaphore's expires_at time, but they're more likely to. Then, we could just do simpler queries to the solid_queue_semaphores table with the candidate keys, and choose from there 🤔

Something like this might work! What we'd need to watch out for is that if the first 100 items are all potentially unblockable but not actually then we might not get round to checking items that should be unblocked. Not sure how likely that is in practice though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to try that. I also think we could have an alert on this table, which I'll wire up too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a to-do to discuss this, so I can keep all the work together in the Solid Queue project instead of all in GitHub, to reference more easily later.

rosa added 12 commits November 22, 2023 20:25
For now this just allows us to specify a Proc to build the key and
a limit for the concurrent execution.
At the time of enqueuing a job and creating a ready execution for it, we check
if the job has concurrency control settings. If it has, we use a semaphore to
determine whether the job can go to the ready_executions table or whether it needs
to wait in the new blocked_executions table. The semaphore is created for the job
concurrency key and has just a value that gets decremented avoiding race conditions,
only if it's still > 0.

This needs to be paired with jobs releasing/signaling the semaphore when they finish,
and the worker doing a pass over blocked executions in addition to polling.
They fit better there, as I'll write integration concurrency control
tests for the whole thing.
Just to test how it feels to do it in this way.
Either successfully or failing, and also release the semaphore.
…tion

Otherwise, if a job finishes just fine or fails, and something goes wrong when
dispatching blocked jobs, we'll fail to mark the job as finished or failed just
because we couldn't dispatch blocked jobs, but not because anything went wrong with the
job itself.

Also: test sequential jobs when some of them fail. We must continue processing jobs
normally.
If two processes try to release blocked executions for the same key, for example,
if the concurrency limit is 2, and 2 jobs finish at the same time, both would try
to release the first one, and one of them would fail to do that. To avoid that,
select the first one but lock the record, and have the SELECT ... FOR UPDATE
to use SKIP LOCKED so we don't have to wait on the lock in other processes.
When a job finishes or fails, we need to:
- Increment the semaphore if any, in its own transaction. This needs to
happen regardless of whether other blocked executions are unblocked by this
semaphore change.
- Try to unblock the next blocked execution. For this, we need to acquire the
semaphore again, as it's possible another job part of the same concurrency group
is just being enqueued at the same time. Then, we need to move the blocked
execution to "ready", and then delete the blocked execution. This all needs to
happen in the same transaction, without going throuhg the job. The previous
implementation could very well update the existing blocked execution and then
delete it, leaving the job in limbo.
That's it, blocked executions with an available semaphore.
Try to unblock at most one per concurrency key, to unblock
the whole group.
We'll have to clear semaphores eventually because they might get stuck,
and the simplest way is to delete them. Then, we might end up with blocked
executions without semaphore, so we also need to unblock these.
Add `expires_at` to semaphores, so we can easily expire them.
rosa added 10 commits November 22, 2023 20:26
…es_at`

We don't need to store the concurrency limit and max duration as the
Solid Queue job has the job class name available to constantize it and
get them from there.

Also, stop unblocking executions in the worker before polling, keep that to
polling only. We'll have the supervisor do this, as well as expiring semaphores.
This task is managed by the scheduler and uses its same polling interval to
run.
… sigantures

We can simply pass a job, since both Active Job and Solid Queue's Job respond to the
same methods, required for the semaphore to be waited on, or signaled. The "concurrency"
part of the "concurrency_key" attribute is redundant.
@rosa rosa force-pushed the concurrency-controls branch 2 times, most recently from 61fec0d to ab2d2f6 Compare November 22, 2023 21:34
rosa added 3 commits November 23, 2023 15:48
…phores

Also, include `job_id` in the ORDER when releasing blocked executions.

Thanks to @djmb for the suggestions.
…roups")

That's it, if we wanted for example, to limit concurrency for a group of jobs together
because they act on the same kind of record, and thus, we can't have the job class be part
of the concurrency key, as that would keep them all separated. With this change, we have two
parts to build the concurrency key:
- The concurrency group, which defaults to the job class name
- And the concurrency key itself, that defaults to the first argument

This also fixes the previous calculation of the key, that was working by chance because it was
being executed in the context of the included module, not the job class.
@rosa rosa force-pushed the concurrency-controls branch 2 times, most recently from 7f12f27 to 41a63f5 Compare November 23, 2023 15:02
@rosa rosa force-pushed the concurrency-controls branch from 41a63f5 to ee5fcc6 Compare November 23, 2023 19:10
rosa added 5 commits November 23, 2023 20:24
We'll use this to improve the queries performed by the concurrency
maintenance task.
…ased

Sempahores' expiry times are bumped when new jobs are successfully queued for
execution or when jobs finish and release the semaphores. This means that if a
blocked execution's expiry time is in the past, the semaphore's expiry time is
most likely in the past too. Here's why: we know that if we still have a
blocked job execution after its expiry time has passed, it's because:

1. A job holding the semaphore hasn't finished yet, and in that case, the
   semaphore's expiry time would have expired as well and would be cleared up
   right before checking blocked jobs.
2. The job holding the semaphore finished and
   released the semaphore but failed to unblock the next job. In that case, when
   we inspect the blocked job's concurrency key, we'll see the semaphore
   released.
   a. It's possible a new job is enqueued in the meantime and claims the
      semaphore, so we wouldn't be able to unblock that blocked job. However, if
      this happens, it's also more likely that this new job will succeed at
      unblocking it when it is finished. The more jobs that are enqueued and run,
      bumping the semaphore's expiry time, the more likely we are to unblock the
      blocked jobs via the normal method.
3. The job holding the semaphore finished but failed to release the semaphore:
   this case is the same as 1, the semaphore will be cleared before unblocking
   the execution.

We take advantage of this to select X (scheduler's batch size) distinct concurrency
keys from expired blocked executions, and for that we can use the index on
(expires_at, concurrency_key), that filters the elements, even if we have to scan
all of them to find the distcint concurrency keys using a temporary table that would
get at most X items long. Then, we'll check whether these (up to) X concurrency keys
are releasable and try to release them.

A potential problem would be if we happen to select X concurrency keys that are expired
but turn out not to be releasable. I think this should be very unlikely because for this
to happen, we'd have to failed to unblock X jobs via the regular method and that other
jobs using the same concurrency keys were enqueued, claiming the semaphore (case 2.a.
described above), before we had the chance to unblock them, for all of them.

We'd need two exceptional things to happen at once: a large backlog of concurrent jobs
(using different keys) and a large amount of failed unblocked jobs.

Thanks to @djmb for thinking through this with me and all the help!
…reported

Via SolidQueue.on_thread_error, in test environment.
@rosa rosa merged commit 1579e75 into main Nov 27, 2023
4 checks passed
@rosa rosa deleted the concurrency-controls branch November 27, 2023 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants