Skip to content

Commit

Permalink
provides for multiple executors operating under a single runner
Browse files Browse the repository at this point in the history
  • Loading branch information
robacarp committed Dec 18, 2023
1 parent 39ddc84 commit 559588d
Show file tree
Hide file tree
Showing 22 changed files with 571 additions and 139 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

<img src="https://mosquito-cr.github.io/images/amber-mosquito-small.png" align="right">

Mosquito is a generic background job runner written primarily for Crystal. Significant inspiration from experience with the successes and failings many Ruby gems in this vein.
Mosquito is a generic background job runner written primarily for Crystal. Significant inspiration from experience with the successes and failings many Ruby gems in this vein. Once compiled, a mosquito binary can start work in about 10 milliseconds.

Mosquito currently provides these features:
- Delayed execution
- Scheduled / Periodic execution

- Delayed execution (`SendEmailJob.new(email: :welcome, address: user.email).enqueue(in: 3.minutes)`)
- Scheduled / Periodic execution (`RunEveryHourJob.new`)
- Job Storage in Redis
- Automatic rescheduling of failed jobs
- Progressively increasing delay of failed jobs
- Progressively increasing delay of rescheduled failed jobs
- Dead letter queue of jobs which have failed too many times
- Rate limited jobs

Expand Down
18 changes: 17 additions & 1 deletion demo/run.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@ require "../src/mosquito"

Mosquito.configure do |settings|
settings.redis_url = ENV["REDIS_URL"]? || "redis://localhost:6379/3"
settings.idle_wait = 1.second
end

Mosquito.configuration.backend.flush

Log.setup do |c|
backend = Log::IOBackend.new

c.bind "redis.*", :warn, backend
c.bind "mosquito.*", :debug, backend
c.bind "mosquito.runners.overseer", :trace, backend
end

require "./jobs/*"

def expect_run_count(klass, expected)
Expand All @@ -17,8 +26,15 @@ def expect_run_count(klass, expected)
end
end

stopping = false
Signal::INT.trap do
if stopping
puts "SIGINT received again, crash-exiting."
exit 1
end

Mosquito::Runner.stop
stopping = true
end

Mosquito::Runner.start(spin: false)
Expand All @@ -29,7 +45,7 @@ while count <= 20 && Mosquito::Runner.keep_running
count += 1
end

Mosquito::Runner.stop
Mosquito::Runner.stop(wait: true)

puts "End of demo."
puts "----------------------------------"
Expand Down
4 changes: 2 additions & 2 deletions src/mosquito/configuration.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ module Mosquito
property failed_job_ttl : Int32 = 86400

@[Deprecated("cron scheduling can now handled automatically. See https://github.com/mosquito-cr/mosquito/pull/108")]
property run_cron_scheduler : Bool = true
property use_distributed_lock : Bool = false
property run_cron_scheduler : Bool = false
property use_distributed_lock : Bool = true

property run_from : Array(String) = [] of String
property backend : Mosquito::Backend.class = Mosquito::RedisBackend
Expand Down
3 changes: 3 additions & 0 deletions src/mosquito/periodic_job_run.cr
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ module Mosquito
if last_executed_at + interval <= now
execute

# Weaknesses:
# - If something interferes with the job run, it won't be correct that it was executed.
# - if the worker is backlogged, the start time will be different from the last executed time.
self.last_executed_at = now
true
else
Expand Down
4 changes: 4 additions & 0 deletions src/mosquito/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ module Mosquito
getter? empty : Bool
property backend : Mosquito::Backend

Log = ::Log.for self

def initialize(@name : String)
@empty = false
@backend = Mosquito.backend.named name
@config_key = @name
end

def enqueue(job_run : JobRun) : JobRun
Log.trace { "Enqueuing #{job_run} for immediate execution" }
backend.enqueue job_run
end

Expand All @@ -94,6 +97,7 @@ module Mosquito
end

def enqueue(job_run : JobRun, at execute_time : Time) : JobRun
Log.trace { "Enqueuing #{job_run} at #{execute_time}" }
backend.schedule job_run, execute_time
end

Expand Down
37 changes: 18 additions & 19 deletions src/mosquito/runner.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,46 @@ module Mosquito
class Runner
Log = ::Log.for self

# Should mosquito continue working?
class_property keep_running : Bool = true

# Start the mosquito runner.
#
# If spin = true (default) the function will not return until the runner is
# shut down. Otherwise it will return immediately.
#
def self.start(spin = true) Log.notice { "Mosquito is buzzing..." }
def self.start(spin = true)
Log.notice { "Mosquito is buzzing..." }
instance.run

while spin && @@keep_running
sleep 1
while spin && keep_running
Fiber.yield
end
end

def self.keep_running : Bool
instance.state.starting? || instance.state.running?
end

# Request the mosquito runner stop. The runner will not abort the current job
# but it will not start any new jobs.
def self.stop
def self.stop(wait = false)
Log.notice { "Mosquito is shutting down..." }
self.keep_running = false
instance.stop
finished_notifier = instance.stop

if wait
finished_notifier.receive
end
end

private def self.instance : self
@@instance ||= new
end

delegate run, stop, state, to: @overseer
delegate running?, to: @overseer.state
getter overseer : Runners::Overseer

def initialize
Mosquito.configuration.validate
@overseer = Runners::Overseer.new
end

def run
spawn do
@overseer.run
end
end

def stop
@overseer.stop
end
end
end
16 changes: 8 additions & 8 deletions src/mosquito/runners/coordinator.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Mosquito::Runners
# primer? loader?
# primer? loader? _scheduler_
class Coordinator
Log = ::Log.for self
LockTTL = 10.seconds
Expand All @@ -15,7 +15,11 @@ module Mosquito::Runners
@emitted_scheduling_deprecation_runtime_message = false
end

def bloop
def runnable_name : String
"Coordinator<#{object_id}>"
end

def schedule : Nil
only_if_coordinator do
enqueue_periodic_jobs
enqueue_delayed_jobs
Expand All @@ -41,13 +45,13 @@ module Mosquito::Runners
end

if Mosquito.backend.lock? lock_key, instance_id, LockTTL
Log.debug { "Coordinator lock acquired" }
Log.trace { "Coordinator lock acquired" }
duration = Time.measure do
yield
end

Mosquito.backend.unlock lock_key, instance_id
Log.debug { "Coordinator lock released" }
Log.trace { "Coordinator lock released" }
end

return unless duration > LockTTL
Expand All @@ -57,10 +61,6 @@ module Mosquito::Runners
def enqueue_periodic_jobs
Base.scheduled_job_runs.each do |scheduled_job_run|
enqueued = scheduled_job_run.try_to_execute

Log.for("enqueue_periodic_jobs").info {
"enqueued #{scheduled_job_run.class}" if enqueued
}
end
end

Expand Down
55 changes: 42 additions & 13 deletions src/mosquito/runners/executor.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
require "./run_at_most"
require "./runnable"

module Mosquito::Runners
# An Executor is responsible for building Job classes with deserialized
# parameters and calling #run on them. It measures the time it takes to
Expand All @@ -7,38 +10,64 @@ module Mosquito::Runners
# if it will return only after a relative eternity.
class Executor
include RunAtMost
include Runnable

Log = ::Log.for self
getter log : ::Log

# How long a job config is persisted after success
property successful_job_ttl : Int32 { Mosquito.configuration.successful_job_ttl }

# How long a job config is persisted after failure
property failed_job_ttl : Int32 { Mosquito.configuration.failed_job_ttl }

getter queue_list : QueueList
getter job_pipeline : Channel(Tuple(JobRun, Queue))
getter idle_bell : Channel(Bool)

private def state=(state : State)
if state == State::Idle
spawn { idle_bell.send true }
end

def initialize(@queue_list)
super
end

def dequeue_and_run_jobs
queue_list.each do |q|
run_next_job q
end
def initialize(@job_pipeline, @idle_bell)
@log = Log.for(object_id.to_s)
end

def run_next_job(q : Queue)
job_run = q.dequeue
return unless job_run
def runnable_name : String
"Executor<#{object_id}>"
end

Log.notice { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }
def pre_run : Nil
# Overseer won't try to dequeue and send any jobs unless it
# knows that an executor is idle, so the first thing to do
# is mark this executor as idle.
self.state = State::Idle
end

def each_run : Nil
dequeue = job_pipeline.receive?
return if dequeue.nil?

self.state = State::Working
job_run, queue = dequeue
log.trace { "Dequeued #{job_run} from #{queue.name}" }
execute job_run, queue
log.trace { "Finished #{job_run} from #{queue.name}" }
self.state = State::Idle
end

def execute(job_run : JobRun, from_queue q : Queue)
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }

duration = Time.measure do
job_run.run
end.total_seconds

if job_run.succeeded?
Log.notice { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" }
log.info { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" }
q.forget job_run
job_run.delete in: successful_job_ttl

Expand All @@ -60,14 +89,14 @@ module Mosquito::Runners
message << " (at "
message << next_execution
message << ")"
log.warn { message.to_s }
else
q.banish job_run
job_run.delete in: failed_job_ttl

message << "cannot be rescheduled".colorize.yellow
log.error { message.to_s }
end

Log.warn { message.to_s }
end
end

Expand Down
14 changes: 14 additions & 0 deletions src/mosquito/runners/idle_wait.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Mosquito::Runners
module IdleWait
def with_idle_wait(idle_wait : Time::Span)
delta = Time.measure do
yield
end

if delta < idle_wait
# Fiber.timeout(idle_wait - delta)
sleep(idle_wait - delta)
end
end
end
end
Loading

0 comments on commit 559588d

Please sign in to comment.