Skip to content

Commit

Permalink
Improves the metrics broadcast interface, in step with the web browse…
Browse files Browse the repository at this point in the history
…r visualization project
  • Loading branch information
robacarp committed Nov 18, 2023
1 parent 39ddc84 commit 91214da
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 34 deletions.
29 changes: 21 additions & 8 deletions src/mosquito/backend.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
require "json"

module Mosquito
abstract class Backend
struct BroadcastMessage
include JSON::Serializable

property channel : String
property message : String

def initialize(@channel, @message)
end
end

QUEUES = %w(waiting scheduled pending dead)

KEY_PREFIX = {"mosquito"}
Expand Down Expand Up @@ -34,13 +46,16 @@ module Mosquito

abstract def get(key : String, field : String) : String?
abstract def set(key : String, field : String, value : String) : String
abstract def delete(key : String, in ttl : Time::Span = 0.seconds)
abstract def increment(key : String, field : String) : Int64
abstract def increment(key : String, field : String, by value : Int32) : Int64

abstract def flush : Nil

abstract def unlock(key : String, value : String) : Nil
abstract def lock?(key : String, value : String, ttl : Time::Span) : Bool
abstract def publish(key : String, value : String) : Nil
abstract def subscribe(key : String) : Channel(BroadcastMessage)
end

macro inherited
Expand All @@ -51,10 +66,12 @@ module Mosquito
QUEUES.first(2)
end

{% for q in QUEUES %}
def {{q.id}}_q
build_key {{q}}, name
{% for q_name in QUEUES %}
def {{q_name.id}}_q
build_key {{q_name}}, name
end

abstract def dump_{{q_name.id}}_q : Array(String)
{% end %}

def store(key : String, value : Hash(String, String)) : Nil
Expand All @@ -73,7 +90,6 @@ module Mosquito
self.class.expires_in key
end

# from queue.cr
abstract def enqueue(job_run : JobRun) : JobRun
abstract def dequeue : JobRun?
abstract def schedule(job_run : JobRun, at scheduled_time : Time) : JobRun
Expand All @@ -83,11 +99,8 @@ module Mosquito
abstract def flush : Nil
abstract def size(include_dead : Bool = true) : Int64

{% for name in ["waiting", "scheduled", "pending", "dead"] %}
abstract def dump_{{name.id}}_q : Array(String)
{% end %}

abstract def scheduled_job_run_time(job_run : JobRun) : String?

abstract def increment(key : String, by value : Int32 = 1) : Int64
end
end
7 changes: 7 additions & 0 deletions src/mosquito/configuration.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ module Mosquito
property run_from : Array(String) = [] of String
property backend : Mosquito::Backend.class = Mosquito::RedisBackend

# Toggles the behaviors to calculate and document metadata about the mosquito
# queues and runners.
property send_metrics : Bool = true

# How often a mosquito runner should emit a heartbeat metric.
property heartbeat_interval : Time::Span = 20.seconds

property validated = false

def idle_wait=(time_span : Float)
Expand Down
17 changes: 17 additions & 0 deletions src/mosquito/inspector.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
require "./inspector/*"

module Mosquito::Inspector
def self.list_queues : Array(Queue)
Mosquito.backend.list_queues
.map { |name| Queue.new name }
end

def self.list_runners : Array(Runner)
Mosquito.backend.list_runners
.map { |name| Runner.new name }
end

def self.event_receiver : Channel(Backend::BroadcastMessage)
Mosquito.backend.subscribe "mosquito:*"
end
end
24 changes: 24 additions & 0 deletions src/mosquito/inspector/queue.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module Mosquito::Inspector
class Queue
include Comparable(self)

getter name : String

private property backend : Mosquito::Backend

def initialize(@name)
@backend = Mosquito.backend.named name
end

{% for name in Mosquito::Backend::QUEUES %}
def {{name.id}}_job_runs : Array(JobRun)
backend.dump_{{name.id}}_q
.map { |task_id| JobRun.new task_id }
end
{% end %}

def <=>(other)
name <=> other.name
end
end
end
65 changes: 65 additions & 0 deletions src/mosquito/inspector/runner.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module Mosquito::Inspector
class Runner
include Comparable(self)

getter name : String

def initialize(name)
@name = name
end

def <=>(other)
name <=> other.name
end

def config
key = Mosquito.backend.build_key "runners", name
config = Mosquito.backend.retrieve key
end

def current_job : JobRun?
job_run = config["current_work"]?
return unless job_run && ! job_run.blank?
JobRun.new job_run
end

def last_heartbeat : Time?
unix_ms = config["heartbeat_at"]?
return unless unix_ms && ! unix_ms.blank?
Time.unix(unix_ms.to_i)
end

def last_active : String
if timestamp = last_heartbeat
seconds = (Time.utc - timestamp).total_seconds.to_i

if seconds < 21
colorize_by_last_heartbeat seconds, "online"
else
colorize_by_last_heartbeat seconds, "seen #{seconds}s ago"
end

else
colorize_by_last_heartbeat 301, "expired"
end
end

def colorize_by_last_heartbeat(seconds : Int32, word : String) : String
if seconds < 30
word.colorize(:green)
elsif seconds < 200
word.colorize(:yellow)
else
word.colorize(:red)
end.to_s
end

def status : String
if job_run = current_job
"job run: #{job_run.type}"
else
"idle"
end
end
end
end
16 changes: 16 additions & 0 deletions src/mosquito/inspector/task.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module Mosquito::Inspector
class JobRun
getter id : String

def initialize(@id : String)
end

def config_key
Mosquito.backend.build_key Mosquito::JobRun::CONFIG_KEY_PREFIX, id
end

def type : String
Mosquito.backend.retrieve(config_key)["type"]? || "unknown"
end
end
end
77 changes: 77 additions & 0 deletions src/mosquito/metrics.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
module Mosquito
class PublishContext
property originator : String
property context : String

def initialize(context : Array(String | Symbol))
@context = KeyBuilder.build context
@originator = KeyBuilder.build "mosquito", @context
end

def initialize(parent : self, context : Array(String | Symbol))
@context = KeyBuilder.build context
@originator = KeyBuilder.build "mosquito", parent.context, context
end
end

class Metrics
Log = ::Log.for self

module Shorthand
def metric
if Mosquito.configuration.send_metrics
with Metrics.instance yield
end
end
end

property send_metrics : Bool

def self.instance
@@instance ||= new
end

def initialize
@send_metrics = Mosquito.configuration.send_metrics
end

def beat_heart(metadata : Metadata) : Nil
return unless send_metrics
Log.info { "Beating Heart" }

# update the timestamp
metadata["heartbeat_at"] = Time.utc.to_unix.to_s

metadata.delete(Mosquito.configuration.heartbeat_interval * 10)
end

def publish(context : PublishContext, data : NamedTuple) : Nil
Mosquito.backend.publish(
context.originator,
data.to_json
)
end

def tick_metrics(stage : String) : Nil
time = Time.utc

Mosquito.backend.tap do |backend|
daily_bucket = metrics_key({stage, "daily", time.month, time.day})
hourly_bucket = metrics_key({stage, "hourly", time.hour})
minutely_bucket = metrics_key({stage, "minutely", time.hour, time.minute})

backend.increment daily_bucket, Backend.build_key(time.month, time.day)
backend.increment hourly_bucket, Backend.build_key(time.day, time.hour)
backend.increment minutely_bucket, Backend.build_key(time.hour, time.minute)

backend.delete daily_bucket, in: 2.days
backend.delete hourly_bucket, in: 24.hours
backend.delete minutely_bucket, in: 1.hour
end
end

def metrics_key(key_parts : Tuple)
Backend.build_key "metrics", "runner"
end
end
end
8 changes: 8 additions & 0 deletions src/mosquito/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ module Mosquito
# ```
#
class Queue
include Metrics::Shorthand

getter name, config_key
getter? empty : Bool
property backend : Mosquito::Backend
Expand All @@ -83,9 +85,11 @@ module Mosquito
@empty = false
@backend = Mosquito.backend.named name
@config_key = @name
@publish_context = PublishContext.new [:queue, name]
end

def enqueue(job_run : JobRun) : JobRun
metric { publish @publish_context, {title: "enqueue-job", job_run: job_run.id} }
backend.enqueue job_run
end

Expand All @@ -94,13 +98,17 @@ module Mosquito
end

def enqueue(job_run : JobRun, at execute_time : Time) : JobRun
metric { publish @publish_context, {title: "defer-job", job_run: job_run.id, until: execute_time} }

backend.schedule job_run, execute_time
end

def dequeue : JobRun?
return if empty?

if job_run = backend.dequeue
metric { publish @publish_context, {title: "dequeue", job_run: job_run.id} }

job_run
else
@empty = true
Expand Down
31 changes: 29 additions & 2 deletions src/mosquito/redis_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ module Mosquito
end
end

def self.delete(key : String, in ttl : Time::Span) : Nil
def self.delete(key : String, in ttl : Time::Span = 0.seconds) : Nil
delete key, ttl.to_i
end

Expand Down Expand Up @@ -131,7 +131,7 @@ module Mosquito

def self.list_runners : Array(String)
runner_prefix = "mosquito:runners:"
Redis.instance.keys("#{runner_prefix}*")
redis.keys("#{runner_prefix}*")
.map(&.as(String))
.map(&.sub(runner_prefix, ""))
end
Expand All @@ -150,6 +150,29 @@ module Mosquito
remove_matching_key keys: [key], args: [value]
end

def self.publish(key : String, value : String) : Nil
redis.publish key, value
end

def self.subscribe(key : String) : Channel(Backend::BroadcastMessage)
stream = Channel(Backend::BroadcastMessage).new

spawn do
redis.psubscribe(key) do |subscription, connection|
subscription.on_message do |channel, message|
stream.send(
Backend::BroadcastMessage.new(
channel: channel,
message: message
)
)
end
end
end

stream
end

def schedule(job_run : JobRun, at scheduled_time : Time) : JobRun
redis.zadd scheduled_q, scheduled_time.to_unix_ms.to_s, job_run.id
job_run
Expand Down Expand Up @@ -233,5 +256,9 @@ module Mosquito
def scheduled_job_run_time(job_run : JobRun) : String?
redis.zscore(scheduled_q, job_run.id).as?(String)
end

def increment(key : String, by value : Int32 = 1) : Int64
redis.incrby key, value
end
end
end
Loading

0 comments on commit 91214da

Please sign in to comment.