Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Sidekiq Pro super_fetch #178

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ignore:
# Requires Sidekiq-Pro
- lib/sidekiq/throttled/patches/super_fetch.rb
- spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jobs:
--health-timeout 5s
--health-retries 5

env:
BUNDLE_GEMS__CONTRIBSYS__COM: ${{ secrets.BUNDLE_GEMS__CONTRIBSYS__COM }}

strategy:
fail-fast: false
matrix:
Expand Down
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
--require simplecov
--require spec_helper
--tag ~sidekiq_pro
2 changes: 2 additions & 0 deletions .rspec.sidekiq-pro
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--require simplecov
--require spec_helper
24 changes: 24 additions & 0 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,41 @@ end
appraise "sidekiq-7.0.x" do
group :test do
gem "sidekiq", "~> 7.0.0"

# Sidekiq Pro license must be set in global bundler config
# or in BUNDLE_GEMS__CONTRIBSYS__COM env variable
install_if "-> { Bundler.settings['gems.contribsys.com']&.include?(':') || ENV['BUNDLE_GEMS__CONTRIBSYS__COM'] }" do
source "https://gems.contribsys.com/" do
gem "sidekiq-pro", "~> 7.0.0"
end
end
end
end

appraise "sidekiq-7.1.x" do
group :test do
gem "sidekiq", "~> 7.1.0"

# Sidekiq Pro license must be set in global bundler config
# or in BUNDLE_GEMS__CONTRIBSYS__COM env variable
install_if "-> { Bundler.settings['gems.contribsys.com']&.include?(':') || ENV['BUNDLE_GEMS__CONTRIBSYS__COM'] }" do
source "https://gems.contribsys.com/" do
gem "sidekiq-pro", "~> 7.1.0"
end
end
end
end

appraise "sidekiq-7.2.x" do
group :test do
gem "sidekiq", "~> 7.2.0"

# Sidekiq Pro license must be set in global bundler config
# or in BUNDLE_GEMS__CONTRIBSYS__COM env variable
install_if "-> { Bundler.settings['gems.contribsys.com']&.include?(':') || ENV['BUNDLE_GEMS__CONTRIBSYS__COM'] }" do
source "https://gems.contribsys.com/" do
gem "sidekiq-pro", "~> 7.2.0"
end
end
end
end
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Add Sidekiq Pro 7.0, 7.1, and 7.2 support
- Add Ruby 3.3 support

## [1.2.0] - 2023-12-18
Expand Down
10 changes: 10 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ This library aims to support and work with following Sidekiq versions:
* Sidekiq 7.1.x
* Sidekiq 7.2.x

And the following Sidekiq Pro versions:

* Sidekiq Pro 7.0.x
* Sidekiq Pro 7.1.x
* Sidekiq Pro 7.2.x

== Development

Expand All @@ -303,6 +308,11 @@ This library aims to support and work with following Sidekiq versions:
bundle exec appraisal install
bundle exec rake

=== Sidekiq-Pro

If you're working on Sidekiq-Pro support make sure to copy `.rspec-sidekiq-pro`
to `.rspec-local` and that you have Sidekiq-Pro license in the global config,
or in the `BUNDLE_GEMS__CONTRIBSYS__COM` env variable.

== Contributing

Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require_relative "./throttled/job"
require_relative "./throttled/middlewares/server"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/patches/super_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/version"
require_relative "./throttled/worker"
Expand Down
52 changes: 52 additions & 0 deletions lib/sidekiq/throttled/patches/super_fetch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require "sidekiq"

require_relative "./throttled_retriever"

module Sidekiq
module Throttled
module Patches
module SuperFetch
def self.prepended(base)
base.prepend(ThrottledRetriever)
end

private

# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
#
# @note This is triggered when job is throttled.
#
# @return [void]
def requeue_throttled(work)
# SuperFetch UnitOfWork's requeue will remove it from the temporary
# queue and then requeue it, so no acknowledgement call is needed.
work.requeue
end

# Returns list of non-paused queues to try to fetch jobs from.
#
# @note It may return an empty array.
# @return [Array<Array(String, String)>]
def active_queues
# Create a hash of throttled queues for fast lookup
throttled_queues = Throttled.cooldown&.queues&.to_h { |queue| [queue, true] }
return super if throttled_queues.nil? || throttled_queues.empty?

# Reject throttled queues from the list of active queues
super.reject { |queue, _private_queue| throttled_queues[queue] }
end
end
end
end
end

begin
require "sidekiq/pro/super_fetch"
Sidekiq::Pro::SuperFetch.prepend(Sidekiq::Throttled::Patches::SuperFetch)
rescue LoadError
# Sidekiq Pro is not available
end
23 changes: 0 additions & 23 deletions spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,6 @@
require "sidekiq/throttled/patches/basic_fetch"

RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do
def stub_job_class(name, &block)
klass = stub_const(name, Class.new)

klass.include(Sidekiq::Job)
klass.include(Sidekiq::Throttled::Job)

klass.instance_exec do
def perform(*); end
end

klass.instance_exec(&block) if block
end

def enqueued_jobs(queue)
Sidekiq.redis do |conn|
conn.lrange("queue:#{queue}", 0, -1).map do |job|
JSON.parse(job).then do |payload|
[payload["class"], *payload["args"]]
end
end
end
end

let(:fetch) do
if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0")
Sidekiq.instance_variable_set(:@config, Sidekiq::DEFAULTS.dup)
Expand Down
98 changes: 98 additions & 0 deletions spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# frozen_string_literal: true

require "sidekiq/throttled/patches/super_fetch"

RSpec.describe Sidekiq::Throttled::Patches::SuperFetch, :sidekiq_pro do
let(:base_queue) { "default" }
let(:critical_queue) { "critical" }
let(:config) do
config = Sidekiq.instance_variable_get(:@config)
config.super_fetch!
config.queues = [base_queue, critical_queue]
config
end
let(:fetch) do
config.default_capsule.fetcher
end

before do
Sidekiq::Throttled.configure { |config| config.cooldown_period = nil }

bq = base_queue
cq = critical_queue
stub_job_class("TestJob") { sidekiq_options(queue: bq) }
stub_job_class("AnotherTestJob") { sidekiq_options(queue: cq) }

allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC).and_return(0.0)

# Give super_fetch a chance to finish its initialization, but also check that there are no pre-existing jobs
pre_existing_job = fetch.retrieve_work
raise "Found pre-existing job: #{pre_existing_job.inspect}" if pre_existing_job

# Sidekiq is FIFO queue, with head on right side of the list,
# meaning jobs below will be stored in 3, 2, 1 order.
TestJob.perform_bulk([[1], [2], [3]])
AnotherTestJob.perform_async(4)
end

describe "#retrieve_work" do
context "when job is not throttled" do
it "returns unit of work" do
expect(Array.new(4) { fetch.retrieve_work }).to all be_an_instance_of(Sidekiq::Pro::SuperFetch::UnitOfWork)
end
end

shared_examples "requeues throttled job" do
it "returns nothing" do
fetch.retrieve_work

expect(fetch.retrieve_work).to be(nil)
end

it "pushes job back to the head of the queue" do
fetch.retrieve_work

expect { fetch.retrieve_work }
.to change { enqueued_jobs(base_queue) }.to([["TestJob", 2], ["TestJob", 3]])
.and(keep_unchanged { enqueued_jobs(critical_queue) })
end

context "when queue cooldown kicks in" do
before do
Sidekiq::Throttled.configure do |config|
config.cooldown_period = 2.0
config.cooldown_threshold = 1
end

fetch.retrieve_work
end

it "updates cooldown queues" do
expect { fetch.retrieve_work }
.to change { enqueued_jobs(base_queue) }.to([["TestJob", 2], ["TestJob", 3]])
.and(change { Sidekiq::Throttled.cooldown.queues }.to(["queue:#{base_queue}"]))
end

it "excludes the queue from polling" do
fetch.retrieve_work

expect { fetch.retrieve_work }
.to change { enqueued_jobs(critical_queue) }.to([])
.and(keep_unchanged { enqueued_jobs(base_queue) })
end
end
end

context "when job was throttled due to concurrency" do
before { TestJob.sidekiq_throttle(concurrency: { limit: 1 }) }

include_examples "requeues throttled job"
end

context "when job was throttled due to threshold" do
before { TestJob.sidekiq_throttle(threshold: { limit: 1, period: 60 }) }

include_examples "requeues throttled job"
end
end
end
82 changes: 74 additions & 8 deletions spec/support/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,81 @@
require "sidekiq"
require "sidekiq/cli"

$TESTING = false # rubocop:disable Style/GlobalVars
begin
require "sidekiq-pro"
rescue LoadError
# Sidekiq Pro is not available
end

$TESTING = true # rubocop:disable Style/GlobalVars

REDIS_URL = ENV.fetch("REDIS_URL", "redis://localhost:6379")

module JidGenerator
module SidekiqThrottledHelper
def new_sidekiq_config
cfg = Sidekiq::Config.new
cfg.redis = { url: REDIS_URL }
cfg.logger = PseudoLogger.instance
cfg.logger.level = Logger::WARN
cfg.server_middleware do |chain|
chain.add(Sidekiq::Throttled::Middlewares::Server)
end
cfg
end

def reset_redis!
if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0")
reset_redis_v6!
else
reset_redis_v7!
end
end

def reset_redis_v6!
Sidekiq.redis do |conn|
conn.flushdb
conn.script("flush")
end
end

# Resets Sidekiq config between tests like mperham does in Sidekiq tests:
# https://github.com/sidekiq/sidekiq/blob/7df28434f03fa1111e9e2834271c020205369f94/test/helper.rb#L30
def reset_redis_v7!
if Sidekiq.default_configuration.instance_variable_defined?(:@redis)
existing_pool = Sidekiq.default_configuration.instance_variable_get(:@redis)
existing_pool&.shutdown(&:close)
end

RedisClient.new(url: REDIS_URL).call("flushall")

# After resetting redis, create a new Sidekiq::Config instance to avoid ConnectionPool::PoolShuttingDownError
Sidekiq.instance_variable_set :@config, new_sidekiq_config
new_sidekiq_config
end

def stub_job_class(name, &block)
klass = stub_const(name, Class.new)

klass.include(Sidekiq::Job)
klass.include(Sidekiq::Throttled::Job)

klass.instance_exec do
def perform(*); end
end

klass.instance_exec(&block) if block
end

def enqueued_jobs(queue)
Sidekiq.redis do |conn|
conn.lrange("queue:#{queue}", 0, -1).map do |job|
JSON.parse(job).then do |payload|
[payload["class"], *payload["args"]]
end
end
end
end

def jid
SecureRandom.hex 12
end
Expand Down Expand Up @@ -54,15 +124,11 @@ def output
end

RSpec.configure do |config|
config.include JidGenerator
config.extend JidGenerator
config.include SidekiqThrottledHelper

config.before do
PseudoLogger.instance.reset!

Sidekiq.redis do |conn|
conn.flushdb
conn.script("flush")
end
reset_redis!
end
end