-
Notifications
You must be signed in to change notification settings - Fork 144
/
Copy pathrecurring_schedule.rb
69 lines (54 loc) · 1.76 KB
/
recurring_schedule.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# frozen_string_literal: true
module SolidQueue
class Dispatcher::RecurringSchedule
include AppExecutor
attr_reader :configured_tasks, :scheduled_tasks
def initialize(tasks)
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@scheduled_tasks = Concurrent::Hash.new
end
def empty?
configured_tasks.empty?
end
def schedule_tasks
wrap_in_app_executor do
persist_tasks
reload_tasks
end
configured_tasks.each do |task|
schedule_task(task)
end
end
def schedule_task(task)
scheduled_tasks[task.key] = schedule(task)
end
def unschedule_tasks
scheduled_tasks.values.each(&:cancel)
scheduled_tasks.clear
end
def task_keys
configured_tasks.map(&:key)
end
private
def persist_tasks
SolidQueue::RecurringTask.create_or_update_all configured_tasks
end
def reload_tasks
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys)
end
def schedule(task)
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
thread_schedule.schedule_task(thread_task)
wrap_in_app_executor do
thread_task.enqueue(at: thread_task_run_at)
end
end
scheduled_task.add_observer do |_, _, error|
# Don't notify on task cancellation before execution, as this will happen normally
# as part of unloading tasks
handle_thread_error(error) if error && !error.is_a?(Concurrent::CancelledOperationError)
end
scheduled_task.tap(&:execute)
end
end
end