Skip to content

Commit

Permalink
stores metadata about executors and what they're working on
Browse files Browse the repository at this point in the history
  • Loading branch information
robacarp committed Feb 6, 2024
1 parent c103d35 commit 68c1932
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 81 deletions.
4 changes: 3 additions & 1 deletion src/mosquito/backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ module Mosquito
abstract def list_queues : Array(String)
abstract def expiring_list_push(key : String, value : String) : Nil
abstract def expiring_list_fetch(key : String, expire_items_older_than : Time) : Array(String)
abstract def list_runners : Array(String)
abstract def list_overseers : Array(String)
abstract def register_overseer(name : String) : Nil

abstract def delete(key : String, in ttl : Int64 = 0) : Nil
abstract def delete(key : String, in ttl : Time::Span) : Nil
abstract def expires_in(key : String) : Int64

abstract def get(key : String, field : String) : String?
abstract def set(key : String, field : String, value : String) : String
abstract def delete_field(key : String, field : String) : Nil
abstract def delete(key : String, in ttl : Time::Span = 0.seconds)
abstract def increment(key : String, by value : Int32) : Int64
abstract def increment(key : String, field : String) : Int64
Expand Down
10 changes: 9 additions & 1 deletion src/mosquito/inspector.cr
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
require "./inspector/*"

module Mosquito::Inspector
def self.overseer(id : String) : Overseer
Overseer.new id
end

def self.executor(id : String) : Executor
Executor.new id
end

def self.list_queues : Array(Inspector::Queue)
Mosquito.backend.list_queues
.map { |name| Inspector::Queue.new name }
end

def self.list_overseers : Array(Runner)
def self.list_overseers : Array(Overseer)
Mosquito.backend.list_overseers
.map { |name| Overseer.new name }
end
Expand Down
22 changes: 22 additions & 0 deletions src/mosquito/inspector/executor.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module Mosquito::Inspector
class Executor
getter instance_id : String

def initialize(@instance_id)
@metadata = Metadata.new Mosquito::Runners::Executor.metadata_key(@instance_id), readonly: true
end

def config
key = Mosquito.backend.build_key "runners", name
config = Mosquito.backend.retrieve key
end

def current_job : String?
@metadata["current_job"]?
end

def current_job_queue : String?
@metadata["current_job_queue"]?
end
end
end
36 changes: 36 additions & 0 deletions src/mosquito/inspector/overseer.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Mosquito::Inspector
class Overseer
Log = ::Log.for self

getter metadata : Metadata
getter instance_id : String

def self.all : Array(self)
Mosquito.backend.list_overseers.map do |name|
new name
end
end

def initialize(@instance_id : String)
@metadata = Metadata.new Mosquito::Runners::Overseer.metadata_key(@instance_id), readonly: true
end

def executors : Array(Executor)
if executor_list = @metadata["executors"]?
executor_list.split(",").map do |name|
Executor.new name
end
else
[] of Executor
end
end

def <=>(other)
name <=> other.name
end

def last_heartbeat : Time?
metadata.heartbeat?
end
end
end
2 changes: 0 additions & 2 deletions src/mosquito/inspector/queue.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Mosquito::Inspector
class Queue
include Comparable(self)

getter name : String

private property backend : Mosquito::Backend
Expand Down
65 changes: 0 additions & 65 deletions src/mosquito/inspector/runner.cr

This file was deleted.

24 changes: 24 additions & 0 deletions src/mosquito/metadata.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ module Mosquito
Mosquito.backend.retrieve root_key
end

def [](key : String) : String
{%
raise "Use #[]? instead"
%}
end

def []=(key : String, value : Nil)
Mosquito.backend.delete_field root_key, key
end

# Reads a single key from the metadata.
def []?(key : String) : String?
Mosquito.backend.get root_key, key
Expand Down Expand Up @@ -56,6 +66,20 @@ module Mosquito
Mosquito.backend.increment root_key, key, by: -1
end

# Sets a heartbeat timestamp in the metadata.
def heartbeat!
self["heartbeat"] = Time.utc.to_unix.to_s
end

# Returns the heartbeat timestamp from the metadata.
def heartbeat? : Time?
if time = self["heartbeat"]?
Time.unix(time.to_i)
else
nil
end
end

delegate to_s, inspect, to: to_h
end
end
10 changes: 7 additions & 3 deletions src/mosquito/redis_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ module Mosquito
value
end

def self.delete_field(key : String, field : String) : Nil
redis.hdel key, field
end

def self.increment(key : String, by value : Int32 = 1) : Int64
redis.incrby key, value
end
Expand All @@ -130,14 +134,14 @@ module Mosquito
expiring_list_fetch key, Time.utc - 1.day
end

def self.register_overseer(name : String) : Nil
def self.register_overseer(id : String) : Nil

Check warning on line 137 in src/mosquito/redis_backend.cr

View workflow job for this annotation

GitHub Actions / Build (latest, false)

positional parameter 'id' corresponds to parameter 'name' of the overridden method Mosquito::Backend::ClassMethods#register_overseer(name : String), which has a different name and may affect named argument passing

Check warning on line 137 in src/mosquito/redis_backend.cr

View workflow job for this annotation

GitHub Actions / Build (nightly, true)

positional parameter 'id' corresponds to parameter 'name' of the overridden method Mosquito::Backend::ClassMethods#register_overseer(name : String), which has a different name and may affect named argument passing
key = build_key LIST_OF_OVERSEERS_KEY
expiring_list_push key, name
expiring_list_push key, id
end

def self.list_overseers : Array(String)
key = build_key LIST_OF_OVERSEERS_KEY
expiring_list_fetch key, Time.utc - 1.day
expiring_list_fetch(key, Time.utc - 1.day)
end

def self.expiring_list_push(key : String, value : String) : Nil
Expand Down
17 changes: 14 additions & 3 deletions src/mosquito/runners/executor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module Mosquito::Runners

# Used to notify the overseer that this executor is idle.
getter idle_bell : Channel(Bool)
getter metric_data : Metadata
getter metadata : Metadata
getter instance_id : String

private def state=(state : State)
Expand All @@ -51,10 +51,15 @@ module Mosquito::Runners
super
end

def initialize(@job_pipeline, @idle_bell, overseer_context, @metric_data)
def self.metadata_key(id : String) : String
Mosquito::Backend.build_key "executor", id
end

def initialize(@job_pipeline, @idle_bell, overseer_context)
@log = Log.for(object_id.to_s)
@instance_id = Random::Secure.hex(8)
@publish_context = PublishContext.new overseer_context, [:executor, instance_id]
@metadata = Metadata.new self.class.metadata_key(instance_id)
end

# :nodoc:
Expand All @@ -81,6 +86,9 @@ module Mosquito::Runners
execute job_run, queue
log.trace { "Finished #{job_run} from #{queue.name}" }
self.state = State::Idle

@metadata.heartbeat!
@metadata.delete in: 1.hour
end

# Runs a job from a Queue.
Expand All @@ -91,7 +99,8 @@ module Mosquito::Runners
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }

metric {
metric_data["current_work"] = job_run.id.to_s
@metadata["current_job_queue"] = q.name
@metadata["current_job"] = job_run.id
publish @publish_context, {
event: "starting",
job_run: job_run.id,
Expand All @@ -111,6 +120,7 @@ module Mosquito::Runners

metric {
publish @publish_context, {event: "job-finished", job_run: job_run.id}
@metadata["current_job"] = nil

count [@publish_context.context, :success]
count [:queue, q.name, :success]
Expand Down Expand Up @@ -149,6 +159,7 @@ module Mosquito::Runners

metric {
publish @publish_context, {event: "job-failed", job_run: job_run.id, reschedulable: job_run.rescheduleable? }
@metadata["current_job"] = nil

count [@publish_context.context, :failed]
count [:queue, q.name, :failed]
Expand Down
25 changes: 20 additions & 5 deletions src/mosquito/runners/overseer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ module Mosquito::Runners
}

property state : State = State::Starting
getter instance_id, queue_list, executors, coordinator, metric_data
getter instance_id, queue_list, executors, coordinator
getter metadata : Metadata

def self.metadata_key(instance_id : String) : String
Mosquito::Backend.build_key "overseer", instance_id
end

def initialize
@idle_notifier = Channel(Bool).new
Expand All @@ -48,7 +53,7 @@ module Mosquito::Runners
@work_handout = Channel(Tuple(JobRun, Queue)).new

@instance_id = Random::Secure.hex(8)
@metric_data = metric_data = Metadata.new Mosquito::Backend.build_key("runners", instance_id)
@metadata = Metadata.new self.class.metadata_key(instance_id)
@publish_context = PublishContext.new([:overseer, instance_id])

@queue_list = QueueList.new @publish_context
Expand All @@ -57,10 +62,16 @@ module Mosquito::Runners
executor_count.times do
@executors << build_executor
end

update_executor_list
end

def build_executor : Executor
Executor.new work_handout, idle_notifier, @publish_context, metric_data
Executor.new work_handout, idle_notifier, @publish_context
end

def update_executor_list : Nil
@metadata["executors"] = @executors.map(&.instance_id).join(",")
end

def runnable_name : String
Expand All @@ -70,7 +81,7 @@ module Mosquito::Runners
# (Re)registers the overseer with the backend.
# This is like a heartbeat and is used to acquire metadata about the overseer fleet.
def heartbeat : Nil
Mosquito.backend.register_overseer self.runnable_name
Mosquito.backend.register_overseer self.instance_id
end

def sleep
Expand Down Expand Up @@ -98,8 +109,8 @@ module Mosquito::Runners
executor.stop
end
work_handout.close
stopped_notifiers.each(&.receive)
metric { publish @publish_context, {event: "stopping-work"} }
stopped_notifiers.each(&.receive)
Log.info { "All executors stopped." }

Log.info { "Overseer #{instance_id} finished for now." }
Expand Down Expand Up @@ -174,6 +185,8 @@ module Mosquito::Runners
end

check_for_deceased_runners
metadata.heartbeat!
metadata.delete(in: 1.hour)
end

# Weaknesses: This implementation sometimes starves queues because it doesn't
Expand Down Expand Up @@ -205,6 +218,8 @@ module Mosquito::Runners
executors << build_executor.tap(&.run)
end

update_executor_list

if queue_list.dead?
Log.fatal { "QueueList has died, overseer will stop." }
stop
Expand Down
Loading

0 comments on commit 68c1932

Please sign in to comment.