Skip to content

Commit

Permalink
provides for multiple executors operating under a single runner
Browse files Browse the repository at this point in the history
  • Loading branch information
robacarp committed Nov 29, 2023
1 parent 39ddc84 commit 478221a
Show file tree
Hide file tree
Showing 18 changed files with 397 additions and 120 deletions.
17 changes: 16 additions & 1 deletion demo/run.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ require "../src/mosquito"

Mosquito.configure do |settings|
settings.redis_url = ENV["REDIS_URL"]? || "redis://localhost:6379/3"
settings.idle_wait = 1.second
end

Mosquito.configuration.backend.flush

Log.setup do |c|
backend = Log::IOBackend.new

c.bind "redis.*", :debug, backend
c.bind "mosquito.*", :trace, backend
end

require "./jobs/*"

def expect_run_count(klass, expected)
Expand All @@ -17,8 +25,15 @@ def expect_run_count(klass, expected)
end
end

stopping = false
Signal::INT.trap do
if stopping
puts "SIGINT received again, crash-exiting."
exit 1
end

Mosquito::Runner.stop
stopping = true
end

Mosquito::Runner.start(spin: false)
Expand All @@ -29,7 +44,7 @@ while count <= 20 && Mosquito::Runner.keep_running
count += 1
end

Mosquito::Runner.stop
Mosquito::Runner.stop(wait: true)

puts "End of demo."
puts "----------------------------------"
Expand Down
4 changes: 2 additions & 2 deletions src/mosquito/configuration.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ module Mosquito
property failed_job_ttl : Int32 = 86400

@[Deprecated("cron scheduling can now handled automatically. See https://github.com/mosquito-cr/mosquito/pull/108")]
property run_cron_scheduler : Bool = true
property use_distributed_lock : Bool = false
property run_cron_scheduler : Bool = false
property use_distributed_lock : Bool = true

property run_from : Array(String) = [] of String
property backend : Mosquito::Backend.class = Mosquito::RedisBackend
Expand Down
37 changes: 18 additions & 19 deletions src/mosquito/runner.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,46 @@ module Mosquito
class Runner
Log = ::Log.for self

# Should mosquito continue working?
class_property keep_running : Bool = true

# Start the mosquito runner.
#
# If spin = true (default) the function will not return until the runner is
# shut down. Otherwise it will return immediately.
#
def self.start(spin = true) Log.notice { "Mosquito is buzzing..." }
def self.start(spin = true)
Log.notice { "Mosquito is buzzing..." }
instance.run

while spin && @@keep_running
sleep 1
while spin && keep_running
Fiber.yield
end
end

def self.keep_running : Bool
instance.state.starting? || instance.state.running?
end

# Request the mosquito runner stop. The runner will not abort the current job
# but it will not start any new jobs.
def self.stop
def self.stop(wait = false)
Log.notice { "Mosquito is shutting down..." }
self.keep_running = false
instance.stop
finished_notifier = instance.stop

if wait
finished_notifier.receive
end
end

private def self.instance : self
@@instance ||= new
end

delegate run, stop, state, to: @overseer
delegate running?, to: @overseer.state
getter overseer : Runners::Overseer

def initialize
Mosquito.configuration.validate
@overseer = Runners::Overseer.new
end

def run
spawn do
@overseer.run
end
end

def stop
@overseer.stop
end
end
end
8 changes: 6 additions & 2 deletions src/mosquito/runners/coordinator.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Mosquito::Runners
# primer? loader?
# primer? loader? _scheduler_
class Coordinator
Log = ::Log.for self
LockTTL = 10.seconds
Expand All @@ -15,7 +15,11 @@ module Mosquito::Runners
@emitted_scheduling_deprecation_runtime_message = false
end

def bloop
def runnable_name : String
"Coordinator<#{object_id}>"
end

def schedule : Nil
only_if_coordinator do
enqueue_periodic_jobs
enqueue_delayed_jobs
Expand Down
28 changes: 19 additions & 9 deletions src/mosquito/runners/executor.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
require "./run_at_most"
require "./runnable"

module Mosquito::Runners
# An Executor is responsible for building Job classes with deserialized
# parameters and calling #run on them. It measures the time it takes to
Expand All @@ -7,6 +10,7 @@ module Mosquito::Runners
# if it will return only after a relative eternity.
class Executor
include RunAtMost
include Runnable

Log = ::Log.for self

Expand All @@ -15,22 +19,28 @@ module Mosquito::Runners

# How long a job config is persisted after failure
property failed_job_ttl : Int32 { Mosquito.configuration.failed_job_ttl }
property state : State = State::Starting

getter queue_list : QueueList
getter job_pipeline : Channel(Tuple(JobRun, Queue))

def initialize(@queue_list)
def initialize(@job_pipeline)
end

def dequeue_and_run_jobs
queue_list.each do |q|
run_next_job q
end
def runnable_name : String
"Executor<#{object_id}>"
end

def run_next_job(q : Queue)
job_run = q.dequeue
return unless job_run
def each_run : Nil
dequeue = job_pipeline.receive?
return if dequeue.nil?

job_run, queue = dequeue
@state = State::Working
execute job_run, queue
@state = State::Idle
end

def execute(job_run : JobRun, from_queue q : Queue)
Log.notice { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }

duration = Time.measure do
Expand Down
14 changes: 14 additions & 0 deletions src/mosquito/runners/idle_wait.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Mosquito::Runners
module IdleWait
def with_idle_wait(idle_wait : Time::Span)
delta = Time.measure do
yield
end

if delta < idle_wait
# Fiber.timeout(idle_wait - delta)
sleep(idle_wait - delta)
end
end
end
end
106 changes: 78 additions & 28 deletions src/mosquito/runners/overseer.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
require "./run_at_most"
require "./runnable"
require "./idle_wait"

module Mosquito::Runners
# The Overseer is responsible for managing:
# - a `Coordinator`
Expand All @@ -7,59 +11,105 @@ module Mosquito::Runners
#
# An overseer manages the loop that each thread or process runs.
class Overseer
include IdleWait
include RunAtMost
include Runnable

Log = ::Log.for self

# Minimum time in seconds to wait between checking for jobs.
property idle_wait : Time::Span {
getter queue_list : QueueList
getter executors, coordinator, executor_channel
getter executor_count = 3

getter idle_wait : Time::Span {
Mosquito.configuration.idle_wait
}

property keep_running : Bool

getter queue_list, executor, coordinator

def initialize
@queue_list = QueueList.new
@coordinator = Coordinator.new queue_list
@executor = Executor.new queue_list
@executors = [] of Executor
@executor_channel = Channel(Tuple(JobRun, Queue)).new

@keep_running = true
executor_count.times do
@executors << Executor.new(executor_channel)
end
end

def worker_id
"Worker [#{coordinator.instance_id}]"
def runnable_name : String
"Overseer<#{object_id}>"
end

def stop
Log.info { worker_id + " is done after this job." }
@keep_running = false
def pre_run : Nil
Log.info { "Starting #{@executors.size} executors." }
@queue_list.run
@executors.each(&.run)
end

# Runs the overseer workflow.
# Infinite loop.
def run
Log.info { worker_id + " clocking in." }
def post_run : Nil
Log.info { "Stopping #{@executors.size} executors." }
stopped_notifiers = executors.map do |executor|
executor.stop
end
executor_channel.close
stopped_notifiers.each(&.receive)
Log.info { "All executors stopped." }
end

def each_run : Nil
with_idle_wait(idle_wait) do
coordinator.schedule

if executor_channel.closed?
Log.fatal { "Executor channel is closed, overseer will stop." }
stop
return
end

# next unless executors.any?(&.state.idle?)

while keep_running
tick
# TODO: Unhandled exception in spawn: Channel is closed (Channel::ClosedError)
if next_job_run = dequeue_job?
job_run, _ = next_job_run
Log.debug { "Dequeued job: #{job_run.id} waiting for it to be received." }
executor_channel.send next_job_run
Log.debug { "#{job_run.id} received" }
else
Log.debug { "No job to dequeue" }
end
end

Log.info { worker_id + " finished for now." }
# check_for_deceased_children
end

def tick
delta = Time.measure do
queue_list.fetch
run_at_most every: 1.second, label: :coordinator do
coordinator.bloop
# todo this implementation starves queues because it doesn't
# round robin or anything else.
def dequeue_job? : Tuple(JobRun, Queue)?
queue_list.each do |q|
if job_run = q.dequeue
return { job_run, q }
end
executor.dequeue_and_run_jobs
end
end

def check_for_deceased_children : Nil
executors.select(&.dead?).each do |dead_executor|
Log.fatal do
<<-MSG
Executor #{dead_executor.runnable_name} died.
A new executor will be started.
MSG
end
executors.delete dead_executor
end

(executor_count - executors.size).times do
executors << Executor.new(executor_channel).tap(&.run)
end

if delta < idle_wait
sleep(idle_wait - delta)
if queue_list.dead?
Log.fatal { "QueueList has died, overseer will stop." }
stop
end
end
end
Expand Down
Loading

0 comments on commit 478221a

Please sign in to comment.