Skip to content

Commit

Permalink
Merge pull request #1 from Toro-TMS/support-redis-client
Browse files Browse the repository at this point in the history
Support redis client
  • Loading branch information
Ataraxic authored Jan 3, 2023
2 parents 663debd + dacfbe8 commit 09b3a3b
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 14 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ gemspec
group :test do
gem "simplecov"
gem "codeclimate-test-reporter", "~> 1.0.0"
gem "pry"
end
14 changes: 7 additions & 7 deletions lib/sidekiq/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def jobs

Sidekiq.redis do |r|
r.multi do |pipeline|
pipeline.hset(@bidkey, "created_at", @created_at)
pipeline.hset(@bidkey, "created_at", Time.now.to_s)
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
end
Expand Down Expand Up @@ -96,7 +96,7 @@ def jobs
pipeline.hincrby(@bidkey, "total", @ready_to_queue.size)
pipeline.expire(@bidkey, BID_EXPIRE_TTL)

pipeline.sadd(@bidkey + "-jids", [@ready_to_queue])
pipeline.sadd(@bidkey + "-jids", @ready_to_queue)
pipeline.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
end
end
Expand All @@ -113,7 +113,7 @@ def increment_job_queue(jid)

def invalidate_all
Sidekiq.redis do |r|
r.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1)
r.call("SETEX","invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1)
end
end

Expand All @@ -130,7 +130,7 @@ def parent
end

def valid?(batch = self)
valid = !Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }
valid = !Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}").to_i > 0 }
batch.parent ? valid && valid?(batch.parent) : valid
end

Expand All @@ -139,7 +139,7 @@ def valid?(batch = self)
def persist_bid_attr(attribute, value)
Sidekiq.redis do |r|
r.multi do |pipeline|
pipeline.hset(@bidkey, attribute, value)
pipeline.hset(@bidkey, attribute, value.to_s)
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
end
end
Expand Down Expand Up @@ -208,8 +208,8 @@ def enqueue_callbacks(event, bid)
callback_key = "#{batch_key}-callbacks-#{event_name}"
already_processed, _, callbacks, queue, parent_bid, callback_batch = Sidekiq.redis do |r|
r.multi do |pipeline|
pipeline.hget(batch_key, event_name)
pipeline.hset(batch_key, event_name, true)
pipeline.call('HGET', batch_key, event_name)
pipeline.hset(batch_key, event_name, 'true')
pipeline.smembers(callback_key)
pipeline.hget(batch_key, "callback_queue")
pipeline.hget(batch_key, "parent_bid")
Expand Down
4 changes: 4 additions & 0 deletions lib/sidekiq/batch/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Sidekiq
class Batch
module Middleware
class ClientMiddleware
include ::Sidekiq::ClientMiddleware

def call(_worker, msg, _queue, _redis_pool = nil)
if (batch = Thread.current[:batch])
batch.increment_job_queue(msg['jid']) if (msg[:bid] = batch.bid)
Expand All @@ -13,6 +15,8 @@ def call(_worker, msg, _queue, _redis_pool = nil)
end

class ServerMiddleware
include ::Sidekiq::ServerMiddleware

def call(_worker, msg, _queue)
if (bid = msg['bid'])
begin
Expand Down
4 changes: 2 additions & 2 deletions sidekiq-batch.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]

spec.add_dependency "sidekiq", ">= 3"
spec.add_dependency "sidekiq", ">= 7"

spec.add_development_dependency "bundler", "~> 2.1"
spec.add_development_dependency "rake", "~> 13.0"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "fakeredis", "~> 0.8.0"
spec.add_development_dependency "redis-client", "~> 0.11"
end
2 changes: 1 addition & 1 deletion spec/sidekiq/batch/middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
end

describe Sidekiq::Batch::Middleware do
let(:config) { class_double(Sidekiq) }
let(:config) { instance_double(Sidekiq::Config) }
let(:client_middleware) { double(Sidekiq::Middleware::Chain) }

context 'client' do
Expand Down
10 changes: 10 additions & 0 deletions spec/sidekiq/batch/status_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
require 'spec_helper'

class TestWorker
include Sidekiq::Worker
def perform
end
end

describe Sidekiq::Batch::Status do
let(:bid) { 'BID' }
let(:batch) { Sidekiq::Batch.new(bid) }
subject { described_class.new(bid) }

before do
Sidekiq.redis do |r| r.flushdb end
end

describe '#join' do
it 'raises info' do
expect { subject.join }.to raise_error('Not supported')
Expand Down
8 changes: 6 additions & 2 deletions spec/sidekiq/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ def perform
end

describe Sidekiq::Batch do
before do
Sidekiq.redis do |r| r.flushdb end
end

it 'has a version number' do
expect(Sidekiq::Batch::VERSION).not_to be nil
end
Expand Down Expand Up @@ -170,7 +174,7 @@ def was_performed; end
Sidekiq::Batch.process_failed_job(bid, 'failed-job-id')
Sidekiq::Batch.process_failed_job(bid, failed_jid)
failed = Sidekiq.redis { |r| r.smembers("BID-#{bid}-failed") }
expect(failed).to eq(['xxx', 'failed-job-id'])
expect(failed).to match_array(['xxx', 'failed-job-id'])
end
end
end
Expand Down Expand Up @@ -245,7 +249,7 @@ def was_performed; end
it 'returns and does not enqueue callbacks' do
batch = Sidekiq::Batch.new
batch.on(event, SampleCallback)
Sidekiq.redis { |r| r.hset("BID-#{batch.bid}", event, true) }
Sidekiq.redis { |r| r.hset("BID-#{batch.bid}", event, 'true') }

expect(Sidekiq::Client).not_to receive(:push)
Sidekiq::Batch.enqueue_callbacks(event, batch.bid)
Expand Down
2 changes: 0 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
SimpleCov.start

$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__)
require 'fakeredis/rspec'
require 'sidekiq/batch'

redis_opts = { url: "redis://127.0.0.1:6379/1" }
redis_opts[:driver] = Redis::Connection::Memory if defined?(Redis::Connection::Memory)

Sidekiq.configure_client do |config|
config.redis = redis_opts
Expand Down

0 comments on commit 09b3a3b

Please sign in to comment.