From 478221a6bad81ed8887f717595ce332443175438 Mon Sep 17 00:00:00 2001 From: rob Date: Tue, 21 Nov 2023 07:12:57 -0700 Subject: [PATCH] provides for multiple executors operating under a single runner --- demo/run.cr | 17 +++- src/mosquito/configuration.cr | 4 +- src/mosquito/runner.cr | 37 ++++---- src/mosquito/runners/coordinator.cr | 8 +- src/mosquito/runners/executor.cr | 28 ++++-- src/mosquito/runners/idle_wait.cr | 14 +++ src/mosquito/runners/overseer.cr | 106 +++++++++++++++++------ src/mosquito/runners/queue_list.cr | 31 +++++-- src/mosquito/runners/runnable.cr | 64 ++++++++++++++ test/helpers/logging_helper.cr | 8 +- test/helpers/mock_coordinator.cr | 13 +++ test/helpers/mock_executor.cr | 18 ++++ test/helpers/mock_overseer.cr | 2 +- test/mosquito/queued_job_test.cr | 2 +- test/mosquito/runners/executor_test.cr | 20 +++-- test/mosquito/runners/overseer_test.cr | 102 ++++++++++++++-------- test/mosquito/runners/queue_list_test.cr | 13 +-- test/mosquito/runners/runnable_test.crx | 30 +++++++ 18 files changed, 397 insertions(+), 120 deletions(-) create mode 100644 src/mosquito/runners/idle_wait.cr create mode 100644 src/mosquito/runners/runnable.cr create mode 100644 test/mosquito/runners/runnable_test.crx diff --git a/demo/run.cr b/demo/run.cr index 0fb5d77b..15d4e23a 100644 --- a/demo/run.cr +++ b/demo/run.cr @@ -2,10 +2,18 @@ require "../src/mosquito" Mosquito.configure do |settings| settings.redis_url = ENV["REDIS_URL"]? || "redis://localhost:6379/3" + settings.idle_wait = 1.second end Mosquito.configuration.backend.flush +Log.setup do |c| + backend = Log::IOBackend.new + + c.bind "redis.*", :debug, backend + c.bind "mosquito.*", :trace, backend +end + require "./jobs/*" def expect_run_count(klass, expected) @@ -17,8 +25,15 @@ def expect_run_count(klass, expected) end end +stopping = false Signal::INT.trap do + if stopping + puts "SIGINT received again, crash-exiting." + exit 1 + end + Mosquito::Runner.stop + stopping = true end Mosquito::Runner.start(spin: false) @@ -29,7 +44,7 @@ while count <= 20 && Mosquito::Runner.keep_running count += 1 end -Mosquito::Runner.stop +Mosquito::Runner.stop(wait: true) puts "End of demo." puts "----------------------------------" diff --git a/src/mosquito/configuration.cr b/src/mosquito/configuration.cr index 7b4196f2..ae596a93 100644 --- a/src/mosquito/configuration.cr +++ b/src/mosquito/configuration.cr @@ -13,8 +13,8 @@ module Mosquito property failed_job_ttl : Int32 = 86400 @[Deprecated("cron scheduling can now handled automatically. See https://github.com/mosquito-cr/mosquito/pull/108")] - property run_cron_scheduler : Bool = true - property use_distributed_lock : Bool = false + property run_cron_scheduler : Bool = false + property use_distributed_lock : Bool = true property run_from : Array(String) = [] of String property backend : Mosquito::Backend.class = Mosquito::RedisBackend diff --git a/src/mosquito/runner.cr b/src/mosquito/runner.cr index 2e78fc07..5735747e 100644 --- a/src/mosquito/runner.cr +++ b/src/mosquito/runner.cr @@ -4,47 +4,46 @@ module Mosquito class Runner Log = ::Log.for self - # Should mosquito continue working? - class_property keep_running : Bool = true - # Start the mosquito runner. # # If spin = true (default) the function will not return until the runner is # shut down. Otherwise it will return immediately. # - def self.start(spin = true) Log.notice { "Mosquito is buzzing..." } + def self.start(spin = true) + Log.notice { "Mosquito is buzzing..." } instance.run - while spin && @@keep_running - sleep 1 + while spin && keep_running + Fiber.yield end end + def self.keep_running : Bool + instance.state.starting? || instance.state.running? + end + # Request the mosquito runner stop. The runner will not abort the current job # but it will not start any new jobs. - def self.stop + def self.stop(wait = false) Log.notice { "Mosquito is shutting down..." } - self.keep_running = false - instance.stop + finished_notifier = instance.stop + + if wait + finished_notifier.receive + end end private def self.instance : self @@instance ||= new end + delegate run, stop, state, to: @overseer + delegate running?, to: @overseer.state + getter overseer : Runners::Overseer + def initialize Mosquito.configuration.validate @overseer = Runners::Overseer.new end - - def run - spawn do - @overseer.run - end - end - - def stop - @overseer.stop - end end end diff --git a/src/mosquito/runners/coordinator.cr b/src/mosquito/runners/coordinator.cr index 84faba2c..afc3c562 100644 --- a/src/mosquito/runners/coordinator.cr +++ b/src/mosquito/runners/coordinator.cr @@ -1,5 +1,5 @@ module Mosquito::Runners - # primer? loader? + # primer? loader? _scheduler_ class Coordinator Log = ::Log.for self LockTTL = 10.seconds @@ -15,7 +15,11 @@ module Mosquito::Runners @emitted_scheduling_deprecation_runtime_message = false end - def bloop + def runnable_name : String + "Coordinator<#{object_id}>" + end + + def schedule : Nil only_if_coordinator do enqueue_periodic_jobs enqueue_delayed_jobs diff --git a/src/mosquito/runners/executor.cr b/src/mosquito/runners/executor.cr index 7e23cb44..e0a061cd 100644 --- a/src/mosquito/runners/executor.cr +++ b/src/mosquito/runners/executor.cr @@ -1,3 +1,6 @@ +require "./run_at_most" +require "./runnable" + module Mosquito::Runners # An Executor is responsible for building Job classes with deserialized # parameters and calling #run on them. It measures the time it takes to @@ -7,6 +10,7 @@ module Mosquito::Runners # if it will return only after a relative eternity. class Executor include RunAtMost + include Runnable Log = ::Log.for self @@ -15,22 +19,28 @@ module Mosquito::Runners # How long a job config is persisted after failure property failed_job_ttl : Int32 { Mosquito.configuration.failed_job_ttl } + property state : State = State::Starting - getter queue_list : QueueList + getter job_pipeline : Channel(Tuple(JobRun, Queue)) - def initialize(@queue_list) + def initialize(@job_pipeline) end - def dequeue_and_run_jobs - queue_list.each do |q| - run_next_job q - end + def runnable_name : String + "Executor<#{object_id}>" end - def run_next_job(q : Queue) - job_run = q.dequeue - return unless job_run + def each_run : Nil + dequeue = job_pipeline.receive? + return if dequeue.nil? + + job_run, queue = dequeue + @state = State::Working + execute job_run, queue + @state = State::Idle + end + def execute(job_run : JobRun, from_queue q : Queue) Log.notice { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" } duration = Time.measure do diff --git a/src/mosquito/runners/idle_wait.cr b/src/mosquito/runners/idle_wait.cr new file mode 100644 index 00000000..0483f38b --- /dev/null +++ b/src/mosquito/runners/idle_wait.cr @@ -0,0 +1,14 @@ +module Mosquito::Runners + module IdleWait + def with_idle_wait(idle_wait : Time::Span) + delta = Time.measure do + yield + end + + if delta < idle_wait + # Fiber.timeout(idle_wait - delta) + sleep(idle_wait - delta) + end + end + end +end diff --git a/src/mosquito/runners/overseer.cr b/src/mosquito/runners/overseer.cr index bd3aa07a..9b8bd284 100644 --- a/src/mosquito/runners/overseer.cr +++ b/src/mosquito/runners/overseer.cr @@ -1,3 +1,7 @@ +require "./run_at_most" +require "./runnable" +require "./idle_wait" + module Mosquito::Runners # The Overseer is responsible for managing: # - a `Coordinator` @@ -7,59 +11,105 @@ module Mosquito::Runners # # An overseer manages the loop that each thread or process runs. class Overseer + include IdleWait include RunAtMost + include Runnable Log = ::Log.for self - # Minimum time in seconds to wait between checking for jobs. - property idle_wait : Time::Span { + getter queue_list : QueueList + getter executors, coordinator, executor_channel + getter executor_count = 3 + + getter idle_wait : Time::Span { Mosquito.configuration.idle_wait } - property keep_running : Bool - - getter queue_list, executor, coordinator - def initialize @queue_list = QueueList.new @coordinator = Coordinator.new queue_list - @executor = Executor.new queue_list + @executors = [] of Executor + @executor_channel = Channel(Tuple(JobRun, Queue)).new - @keep_running = true + executor_count.times do + @executors << Executor.new(executor_channel) + end end - def worker_id - "Worker [#{coordinator.instance_id}]" + def runnable_name : String + "Overseer<#{object_id}>" end - def stop - Log.info { worker_id + " is done after this job." } - @keep_running = false + def pre_run : Nil + Log.info { "Starting #{@executors.size} executors." } + @queue_list.run + @executors.each(&.run) end - # Runs the overseer workflow. - # Infinite loop. - def run - Log.info { worker_id + " clocking in." } + def post_run : Nil + Log.info { "Stopping #{@executors.size} executors." } + stopped_notifiers = executors.map do |executor| + executor.stop + end + executor_channel.close + stopped_notifiers.each(&.receive) + Log.info { "All executors stopped." } + end + + def each_run : Nil + with_idle_wait(idle_wait) do + coordinator.schedule + + if executor_channel.closed? + Log.fatal { "Executor channel is closed, overseer will stop." } + stop + return + end + + # next unless executors.any?(&.state.idle?) - while keep_running - tick + # TODO: Unhandled exception in spawn: Channel is closed (Channel::ClosedError) + if next_job_run = dequeue_job? + job_run, _ = next_job_run + Log.debug { "Dequeued job: #{job_run.id} waiting for it to be received." } + executor_channel.send next_job_run + Log.debug { "#{job_run.id} received" } + else + Log.debug { "No job to dequeue" } + end end - Log.info { worker_id + " finished for now." } + # check_for_deceased_children end - def tick - delta = Time.measure do - queue_list.fetch - run_at_most every: 1.second, label: :coordinator do - coordinator.bloop + # todo this implementation starves queues because it doesn't + # round robin or anything else. + def dequeue_job? : Tuple(JobRun, Queue)? + queue_list.each do |q| + if job_run = q.dequeue + return { job_run, q } end - executor.dequeue_and_run_jobs + end + end + + def check_for_deceased_children : Nil + executors.select(&.dead?).each do |dead_executor| + Log.fatal do + <<-MSG + Executor #{dead_executor.runnable_name} died. + A new executor will be started. + MSG + end + executors.delete dead_executor + end + + (executor_count - executors.size).times do + executors << Executor.new(executor_channel).tap(&.run) end - if delta < idle_wait - sleep(idle_wait - delta) + if queue_list.dead? + Log.fatal { "QueueList has died, overseer will stop." } + stop end end end diff --git a/src/mosquito/runners/queue_list.cr b/src/mosquito/runners/queue_list.cr index 93d4b239..be92084c 100644 --- a/src/mosquito/runners/queue_list.cr +++ b/src/mosquito/runners/queue_list.cr @@ -1,24 +1,45 @@ +require "./run_at_most" +require "./runnable" +require "./idle_wait" + module Mosquito::Runners # QueueList handles searching the redis keyspace for named queues. class QueueList + Log = ::Log.for self + include RunAtMost + include Runnable + include IdleWait + + getter queues : Array(Queue) def initialize @queues = [] of Queue end - delegate each, to: @queues + def runnable_name : String + "QueueList<#{object_id}>" + end + + delegate each, to: @queues.shuffle + + def each_run : Nil + # This idle wait should be at most 1 second. Longer will cause periodic jobs + # which are specified at the second-level to be executed aperiodically. + # Shorter will generate excess noise in the redis connection. + with_idle_wait(1.seconds) do + @state = State::Working - def fetch - run_at_most every: 0.25.seconds, label: :fetch_queues do |t| candidate_queues = Mosquito.backend.list_queues.map { |name| Queue.new name } @queues = filter_queues candidate_queues - Log.for("fetch_queues").debug { + Log.trace { if @queues.size > 0 - "found #{@queues.size} queues: #{@queues.map(&.name).join(", ")}" + "found #{@queues.size} queues: #{@queues.map(&.name)}".colorize.magenta end } + + @state = State::Idle end end diff --git a/src/mosquito/runners/runnable.cr b/src/mosquito/runners/runnable.cr new file mode 100644 index 00000000..749b6480 --- /dev/null +++ b/src/mosquito/runners/runnable.cr @@ -0,0 +1,64 @@ +module Mosquito::Runners + enum State + Starting + Working + Idle + Stopping + Finished + + def running? + starting? || working? || idle? + end + end + + module Runnable + getter state : State = State::Starting + getter fiber : Fiber? + + def dead? : Bool + if fiber_ = fiber + fiber_.dead? + else + raise "Tried to check dead? status of a runnable that hasn't yet spawned." + end + end + + def run + @fiber = spawn do + log = Log.for("#{self.class.name.underscore.gsub("::", ".")}.#{self.object_id}") + log.info { "starting" } + + pre_run + @state = State::Working + + while state.running? + each_run + end + + post_run + @state = State::Finished + end + end + + def stop : Channel(Bool) + @state = State::Stopping if state.running? + notifier = Channel(Bool).new + + spawn do + start = Time.utc + while state.stopping? && (Time.utc - start) < 25.seconds + Fiber.yield + end + notifier.send state.finished? + end + + Log.info { runnable_name + " has stopped" } + notifier + end + + abstract def runnable_name : String + abstract def each_run : Nil + def pre_run : Nil ; end + def post_run : Nil ; end + end +end diff --git a/test/helpers/logging_helper.cr b/test/helpers/logging_helper.cr index 4f33e797..22146c51 100644 --- a/test/helpers/logging_helper.cr +++ b/test/helpers/logging_helper.cr @@ -1,3 +1,5 @@ +require "log" + class TestingLogBackend < Log::MemoryBackend def self.instance @@instance ||= new @@ -42,4 +44,8 @@ class Minitest::Test end end -Log.builder.bind "*", :debug, TestingLogBackend.instance +Log.setup do |config| + config.bind "*", :debug, TestingLogBackend.instance + config.bind "redis.*", :warn, TestingLogBackend.instance + config.bind "mosquito.*", :trace, TestingLogBackend.instance +end diff --git a/test/helpers/mock_coordinator.cr b/test/helpers/mock_coordinator.cr index b5c83f82..38ae131d 100644 --- a/test/helpers/mock_coordinator.cr +++ b/test/helpers/mock_coordinator.cr @@ -1,4 +1,12 @@ class MockCoordinator < Mosquito::Runners::Coordinator + getter schedule_count + + def initialize(queue_list : Mosquito::Runners::QueueList) + super + + @schedule_count = 0 + end + def only_if_coordinator : Nil if @always_coordinator yield @@ -14,4 +22,9 @@ class MockCoordinator < Mosquito::Runners::Coordinator def always_coordinator!(always = true) @always_coordinator = always end + + def schedule + @schedule_count += 1 + super + end end diff --git a/test/helpers/mock_executor.cr b/test/helpers/mock_executor.cr index 46edaa90..4adcac8b 100644 --- a/test/helpers/mock_executor.cr +++ b/test/helpers/mock_executor.cr @@ -1,2 +1,20 @@ class MockExecutor < Mosquito::Runners::Executor + def run + self.state = Mosquito::Runners::State::Working + end + + def stop + self.state = Mosquito::Runners::State::Stopping + Channel(Bool).new.tap do |notifier| + spawn { + self.state = Mosquito::Runners::State::Finished + notifier.send true + } + end + end + + def receive_job + job_run, queue = job_pipeline.receive + job_run + end end diff --git a/test/helpers/mock_overseer.cr b/test/helpers/mock_overseer.cr index 08139099..1162c626 100644 --- a/test/helpers/mock_overseer.cr +++ b/test/helpers/mock_overseer.cr @@ -1,3 +1,3 @@ class MockOverseer < Mosquito::Runners::Overseer - property queue_list, coordinator, executor + property queue_list, coordinator, executors, executor_channel end diff --git a/test/mosquito/queued_job_test.cr b/test/mosquito/queued_job_test.cr index a1c0cbe4..ca8489e7 100644 --- a/test/mosquito/queued_job_test.cr +++ b/test/mosquito/queued_job_test.cr @@ -36,7 +36,7 @@ describe Mosquito::QueuedJob do it "can be passed in" do clear_logs EchoJob.new("quack").perform - assert_includes logs, "quack" + assert_logs_match "quack" end it "can have a boolean false passed as a parameter (and it's not assumed to be a nil)" do diff --git a/test/mosquito/runners/executor_test.cr b/test/mosquito/runners/executor_test.cr index 09f14e76..83384a10 100644 --- a/test/mosquito/runners/executor_test.cr +++ b/test/mosquito/runners/executor_test.cr @@ -1,8 +1,9 @@ require "../../test_helper" describe "Mosquito::Runners::Executor" do + getter(executor_pipeline) { Channel(Tuple(JobRun, Queue)).new } getter(queue_list) { MockQueueList.new } - getter(executor) { Mosquito::Runners::Executor.new queue_list } + getter(executor) { Mosquito::Runners::Executor.new executor_pipeline } getter(coordinator) { Mosquito::Runners::Coordinator.new queue_list } def register(job_class : Mosquito::Job.class) @@ -13,10 +14,15 @@ describe "Mosquito::Runners::Executor" do def run_job(job_class : Mosquito::Job.class) register job_class job_class.reset_performance_counter! - job_class.new.enqueue - executor.run_next_job job_class.queue + job_run = job_class.new.enqueue + executor.execute job_run, from_queue: job_class.queue end + describe "status" do + it "starts as starting" do + assert_equal State::Starting, executor.state + end + end describe "running jobs" do it "runs a job from a queue" do @@ -37,7 +43,7 @@ describe "Mosquito::Runners::Executor" do FailingJob.queue.enqueue job_run Timecop.freeze now do - executor.run_next_job job.class.queue + executor.execute job_run, from_queue: job.class.queue end job_run.reload @@ -45,7 +51,7 @@ describe "Mosquito::Runners::Executor" do Timecop.freeze now + job.reschedule_interval(1) do coordinator.enqueue_delayed_jobs - executor.run_next_job job.class.queue + executor.execute job_run, from_queue: job.class.queue end job_run.reload @@ -62,7 +68,7 @@ describe "Mosquito::Runners::Executor" do job_run.store NonReschedulableFailingJob.queue.enqueue job_run - executor.run_next_job NonReschedulableFailingJob.queue + executor.execute job_run, from_queue: NonReschedulableFailingJob.queue actual_ttl = backend.expires_in job_run.config_key assert_equal executor.failed_job_ttl, actual_ttl @@ -78,7 +84,7 @@ describe "Mosquito::Runners::Executor" do job_run.store QueuedTestJob.queue.enqueue job_run - executor.run_next_job QueuedTestJob.queue + executor.execute job_run, from_queue: QueuedTestJob.queue assert_logs_match "Success" diff --git a/test/mosquito/runners/overseer_test.cr b/test/mosquito/runners/overseer_test.cr index bbadc667..4d8ab862 100644 --- a/test/mosquito/runners/overseer_test.cr +++ b/test/mosquito/runners/overseer_test.cr @@ -1,69 +1,95 @@ require "../../test_helper" describe "Mosquito::Runners::Overseer" do + getter(executor_pipeline) { Channel(Tuple(JobRun, Queue)).new } getter(queue_list) { MockQueueList.new } getter(coordinator) { MockCoordinator.new queue_list } - getter(executor) { MockExecutor.new queue_list } + getter(executor) { MockExecutor.new executor_pipeline } getter(overseer : MockOverseer) { MockOverseer.new.tap do |o| o.queue_list = queue_list o.coordinator = coordinator - o.executor = executor + o.executors = [executor.as(Mosquito::Runners::Executor)] end } - describe "tick" do - it "waits the proper amount of time between cycles" do - clean_slate do - tick_time = Time.measure do - overseer.tick - end + def register(job_class : Mosquito::Job.class) + Mosquito::Base.register_job_mapping job_class.name.underscore, job_class + queue_list.queues << job_class.queue + end - assert_in_epsilon( - overseer.idle_wait.total_seconds, - tick_time.total_seconds, - epsilon: 0.02 - ) + def run_job(job_class : Mosquito::Job.class) + register job_class + job_class.reset_performance_counter! + job_run = job_class.new.enqueue + executor.execute job_run, from_queue: job_class.queue + end + + describe "pre_run" do + it "runs all executors" do + overseer.executors.each do |executor| + assert_equal State::Starting, executor.state + end + overseer.pre_run + overseer.executors.each do |executor| + assert_equal State::Working, executor.state end end end - describe "run" do - it "should log a startup message" do - overseer.keep_running = false - clear_logs - overseer.run - assert_logs_match "clocking in." + describe "post_run" do + it "stops all executors" do + overseer.executors.each(&.run) + overseer.post_run + overseer.executors.each do |executor| + assert_equal State::Finished, executor.state + end end - it "should log a finished message" do - overseer.keep_running = false + it "logs messages about stopping the executors" do clear_logs - overseer.run - assert_logs_match "finished for now" + overseer.executors.each(&.run) + overseer.post_run + assert_logs_match "Stopping #{overseer.executor_count} executors." + assert_logs_match "All executors stopped." end end - describe "stop" do - it "should log a stop message" do - clear_logs - overseer.stop - assert_logs_match "is done after this job." - end + describe "each_run" do + it "dequeues a job and dispatches it to the pipeline" do + clean_slate do + register QueuedTestJob + expected_job_run = QueuedTestJob.new.enqueue + + overseer.executor_channel = Channel(Tuple(JobRun, Queue)).new - it "should set the stop flag" do - overseer.stop - assert_equal false, overseer.keep_running + # each_run will block until there's a receiver on the channel + spawn { overseer.each_run } + actual_job_run, queue = overseer.executor_channel.receive + assert_equal expected_job_run, actual_job_run + assert_equal QueuedTestJob.queue, queue + end end - end - describe "worker_id" do - it "should return a unique id" do - one = Mosquito::Runners::Overseer.new - two = Mosquito::Runners::Overseer.new + it "waits #idle_wait before checking the queue again" do + clean_slate do + tick_time = Time.measure do + overseer.each_run + end + + assert_in_epsilon( + overseer.idle_wait.total_seconds, + tick_time.total_seconds, + epsilon: 0.02 + ) + end + end - refute_equal one.worker_id, two.worker_id + it "triggers the scheduler" do + assert_equal 0, coordinator.schedule_count + overseer.each_run + assert_equal 1, coordinator.schedule_count end end end diff --git a/test/mosquito/runners/queue_list_test.cr b/test/mosquito/runners/queue_list_test.cr index c11cdae9..18c8fa8f 100644 --- a/test/mosquito/runners/queue_list_test.cr +++ b/test/mosquito/runners/queue_list_test.cr @@ -9,19 +9,20 @@ describe "Mosquito::Runners::QueueList" do EchoJob.new(text: "hello world").enqueue end - describe "fetch" do + describe "each_run" do it "returns a list of queues" do clean_slate do enqueue_jobs - queue_list.fetch + queue_list.each_run assert_equal ["failing_job", "io_queue", "passing_job"], queue_list.queues.map(&.name).sort end end it "logs a message about the number of fetched queues" do clean_slate do + clear_logs enqueue_jobs - queue_list.fetch + queue_list.each_run assert_logs_match "found 3 queues" end end @@ -33,7 +34,7 @@ describe "Mosquito::Runners::QueueList" do enqueue_jobs Mosquito.temp_config(run_from: ["io_queue", "passing_job"]) do - queue_list.fetch + queue_list.each_run end end @@ -45,7 +46,7 @@ describe "Mosquito::Runners::QueueList" do enqueue_jobs Mosquito.temp_config(run_from: ["test4"]) do - queue_list.fetch + queue_list.each_run end assert_logs_match "No watchable queues found." @@ -54,7 +55,7 @@ describe "Mosquito::Runners::QueueList" do it "doesnt log an error when no queues are present" do clean_slate do - queue_list.fetch + queue_list.each_run refute_logs_match "No watchable queues found." end end diff --git a/test/mosquito/runners/runnable_test.crx b/test/mosquito/runners/runnable_test.crx new file mode 100644 index 00000000..97f46951 --- /dev/null +++ b/test/mosquito/runners/runnable_test.crx @@ -0,0 +1,30 @@ + + + describe "run" do + it "should log a startup message", focus: true do + assert_equal overseer.state.to_s, "Starting" + clear_logs + overseer.run + assert_logs_match "starting" + end + + it "should log a finished message" do + overseer.keep_running = false + clear_logs + overseer.run + assert_logs_match "finished for now" + end + end + + describe "stop" do + it "should log a stop message" do + clear_logs + overseer.stop + assert_logs_match "is done after this job." + end + + it "should set the stop flag" do + overseer.stop + assert_equal overseer.state.to_s, "Stopping" + end + end