Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dramatically decreases the time spent listing queues #120

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions src/mosquito/redis_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ module Mosquito
end

class RedisBackend < Mosquito::Backend
LIST_OF_QUEUES_KEY = "queues"

Log = ::Log.for(self)

{% for name, script in Scripts::SCRIPTS %}
def self.{{ name.id }}(*, keys = [] of String, args = [] of String, loadscripts = true)
script = {{ script }}
Expand Down Expand Up @@ -117,15 +121,12 @@ module Mosquito
end

def self.list_queues : Array(String)
search_queues.map do |search_queue|
key = build_key search_queue, "*"
long_names = redis.keys key
queue_prefix = build_key(search_queue) + ":"
key = build_key(LIST_OF_QUEUES_KEY)
list_queues = redis.zrange(key, 0, -1).as(Array)

long_names.map(&.to_s).map do |long_name|
long_name.sub(queue_prefix, "")
end
end.flatten.uniq
return [] of String unless list_queues.any?

list_queues.compact_map(&.as(String))
end

def self.list_runners : Array(String)
Expand Down Expand Up @@ -167,7 +168,13 @@ module Mosquito
end

def enqueue(job_run : JobRun) : JobRun
redis.lpush waiting_q, job_run.id
redis.pipeline do |pipe|
# Pushes the job onto the waiting queue.
pipe.lpush waiting_q, job_run.id

# Updates the list of queues to include the current queue
pipe.zadd build_key(LIST_OF_QUEUES_KEY), Time.utc.to_unix.to_s, name
end
job_run
end

Expand Down
9 changes: 0 additions & 9 deletions test/mosquito/backend/queueing_test.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,6 @@ describe "Backend Queues" do
assert_equal %w|test1 test2 test3 test4|, backend.list_queues.sort
end
end

it "includes queues prefixed with scheduled and waiting but not pending or dead" do
clean_slate do
fill_queues
fill_uncounted_queues

assert_equal %w|test1 test2 test3 test4|, backend.list_queues.sort
end
end
end

describe "schedule" do
Expand Down
7 changes: 7 additions & 0 deletions test/mosquito/queue_test.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ describe Queue do
end

describe "enqueue" do
it "adds the queue name to the list of queues" do
clean_slate do
test_queue.enqueue job_run
assert_includes backend.class.list_queues, test_queue.name
end
end

it "can enqueue a job_run for immediate processing" do
clean_slate do
test_queue.enqueue job_run
Expand Down
23 changes: 11 additions & 12 deletions test/mosquito/runners/queue_list_test.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@ require "../../test_helper"
describe "Mosquito::Runners::QueueList" do
getter(queue_list) { MockQueueList.new }

def mock_queues
backend = Mosquito.configuration.backend
backend.set "mosquito:waiting:test1", "key", "value"
backend.set "mosquito:waiting:test2", "key", "value"
backend.set "mosquito:waiting:test3", "key", "value"
def enqueue_jobs
PassingJob.new.enqueue
FailingJob.new.enqueue
EchoJob.new(text: "hello world").enqueue
end

describe "fetch" do
it "returns a list of queues" do
clean_slate do
mock_queues
enqueue_jobs
queue_list.fetch
assert_equal ["test1", "test2", "test3"], queue_list.queues.map(&.name).sort
assert_equal ["failing_job", "io_queue", "passing_job"], queue_list.queues.map(&.name).sort
end
end

it "logs a message about the number of fetched queues" do
clean_slate do
mock_queues
enqueue_jobs
queue_list.fetch
assert_logs_match "found 3 queues"
end
Expand All @@ -31,19 +30,19 @@ describe "Mosquito::Runners::QueueList" do
describe "queue filtering" do
it "filters the list of queues when a whitelist is present" do
clean_slate do
mock_queues
enqueue_jobs

Mosquito.temp_config(run_from: ["test1", "test3"]) do
Mosquito.temp_config(run_from: ["io_queue", "passing_job"]) do
queue_list.fetch
end
end

assert_equal %w|test1 test3|, queue_list.queues.map(&.name).sort
assert_equal ["io_queue", "passing_job"], queue_list.queues.map(&.name).sort
end

it "logs an error when all queues are filtered out" do
clean_slate do
mock_queues
enqueue_jobs

Mosquito.temp_config(run_from: ["test4"]) do
queue_list.fetch
Expand Down
Loading