Skip to content

Commit

Permalink
Fix deprecated uses of Redis#pipelined and Redis#multi
Browse files Browse the repository at this point in the history
Context: redis/redis-rb#1059

The following is deprecated
```ruby
redis.pipelined do
  redis.get(key)
end
```

And should be rewritten as:
```ruby
redis.pipelined do |pipeline|
  pipeline.get(key)
end
```

Functionally it makes no difference.
  • Loading branch information
byroot committed Jan 24, 2022
1 parent 34d081f commit 3f4a2e5
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 84 deletions.
74 changes: 37 additions & 37 deletions lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def queues
# O(1) redis calls
def fetch_stats_fast!
pipe1_res = Sidekiq.redis { |conn|
conn.pipelined do
conn.get("stat:processed")
conn.get("stat:failed")
conn.zcard("schedule")
conn.zcard("retry")
conn.zcard("dead")
conn.scard("processes")
conn.lrange("queue:default", -1, -1)
conn.pipelined do |pipeline|
pipeline.get("stat:processed")
pipeline.get("stat:failed")
pipeline.zcard("schedule")
pipeline.zcard("retry")
pipeline.zcard("dead")
pipeline.scard("processes")
pipeline.lrange("queue:default", -1, -1)
end
}

Expand Down Expand Up @@ -101,9 +101,9 @@ def fetch_stats_slow!
}

pipe2_res = Sidekiq.redis { |conn|
conn.pipelined do
processes.each { |key| conn.hget(key, "busy") }
queues.each { |queue| conn.llen("queue:#{queue}") }
conn.pipelined do |pipeline|
processes.each { |key| pipeline.hget(key, "busy") }
queues.each { |queue| pipeline.llen("queue:#{queue}") }
end
}

Expand Down Expand Up @@ -147,9 +147,9 @@ def lengths
Sidekiq.redis do |conn|
queues = conn.sscan_each("queues").to_a

lengths = conn.pipelined {
lengths = conn.pipelined { |pipeline|
queues.each do |queue|
conn.llen("queue:#{queue}")
pipeline.llen("queue:#{queue}")
end
}

Expand Down Expand Up @@ -287,9 +287,9 @@ def find_job(jid)

def clear
Sidekiq.redis do |conn|
conn.multi do
conn.unlink(@rname)
conn.srem("queues", name)
conn.multi do |transaction|
transaction.unlink(@rname)
transaction.srem("queues", name)
end
end
end
Expand Down Expand Up @@ -519,9 +519,9 @@ def error?

def remove_job
Sidekiq.redis do |conn|
results = conn.multi {
conn.zrangebyscore(parent.name, score, score)
conn.zremrangebyscore(parent.name, score, score)
results = conn.multi { |transaction|
transaction.zrangebyscore(parent.name, score, score)
transaction.zremrangebyscore(parent.name, score, score)
}.first

if results.size == 1
Expand All @@ -542,9 +542,9 @@ def remove_job
yield msg if msg

# push the rest back onto the sorted set
conn.multi do
conn.multi do |transaction|
nonmatched.each do |message|
conn.zadd(parent.name, score.to_f.to_s, message)
transaction.zadd(parent.name, score.to_f.to_s, message)
end
end
end
Expand Down Expand Up @@ -731,10 +731,10 @@ def initialize
def kill(message, opts = {})
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd(name, now.to_s, message)
conn.zremrangebyscore(name, "-inf", now - self.class.timeout)
conn.zremrangebyrank(name, 0, - self.class.max_jobs)
conn.multi do |transaction|
transaction.zadd(name, now.to_s, message)
transaction.zremrangebyscore(name, "-inf", now - self.class.timeout)
transaction.zremrangebyrank(name, 0, - self.class.max_jobs)
end
end

Expand Down Expand Up @@ -782,9 +782,9 @@ def cleanup
count = 0
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a.sort
heartbeats = conn.pipelined {
heartbeats = conn.pipelined { |pipeline|
procs.each do |key|
conn.hget(key, "info")
pipeline.hget(key, "info")
end
}

Expand All @@ -806,9 +806,9 @@ def each
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
conn.pipelined do
conn.pipelined do |pipeline|
procs.each do |key|
conn.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
end
end
}
Expand Down Expand Up @@ -922,9 +922,9 @@ def stopping?
def signal(sig)
key = "#{identity}-signals"
Sidekiq.redis do |c|
c.multi do
c.lpush(key, sig)
c.expire(key, 60)
c.multi do |transaction|
transaction.lpush(key, sig)
transaction.expire(key, 60)
end
end
end
Expand Down Expand Up @@ -958,9 +958,9 @@ def each(&block)
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
procs.sort.each do |key|
valid, workers = conn.pipelined {
conn.exists?(key)
conn.hgetall("#{key}:workers")
valid, workers = conn.pipelined { |pipeline|
pipeline.exists?(key)
pipeline.hgetall("#{key}:workers")
}
next unless valid
workers.each_pair do |tid, json|
Expand Down Expand Up @@ -988,9 +988,9 @@ def size
if procs.empty?
0
else
conn.pipelined {
conn.pipelined { |pipeline|
procs.each do |key|
conn.hget(key, "busy")
pipeline.hget(key, "busy")
end
}.sum(&:to_i)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def enqueue_in(interval, klass, *args)

def raw_push(payloads)
@redis_pool.with do |conn|
conn.pipelined do
atomic_push(conn, payloads)
conn.pipelined do |pipeline|
atomic_push(pipeline, payloads)
end
end
true
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq/fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def bulk_requeue(inprogress, options)
end

Sidekiq.redis do |conn|
conn.pipelined do
conn.pipelined do |pipeline|
jobs_to_requeue.each do |queue, jobs|
conn.rpush(queue, jobs)
pipeline.rpush(queue, jobs)
end
end
end
Expand Down
50 changes: 25 additions & 25 deletions lib/sidekiq/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ def clear_heartbeat
# Note we don't stop the heartbeat thread; if the process
# doesn't actually exit, it'll reappear in the Web UI.
Sidekiq.redis do |conn|
conn.pipelined do
conn.srem("processes", identity)
conn.unlink("#{identity}:workers")
conn.pipelined do |pipeline|
pipeline.srem("processes", identity)
pipeline.unlink("#{identity}:workers")
end
end
rescue
Expand All @@ -107,14 +107,14 @@ def self.flush_stats
nowdate = Time.now.utc.strftime("%Y-%m-%d")
begin
Sidekiq.redis do |conn|
conn.pipelined do
conn.incrby("stat:processed", procd)
conn.incrby("stat:processed:#{nowdate}", procd)
conn.expire("stat:processed:#{nowdate}", STATS_TTL)

conn.incrby("stat:failed", fails)
conn.incrby("stat:failed:#{nowdate}", fails)
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
conn.pipelined do |pipeline|
pipeline.incrby("stat:processed", procd)
pipeline.incrby("stat:processed:#{nowdate}", procd)
pipeline.expire("stat:processed:#{nowdate}", STATS_TTL)

pipeline.incrby("stat:failed", fails)
pipeline.incrby("stat:failed:#{nowdate}", fails)
pipeline.expire("stat:failed:#{nowdate}", STATS_TTL)
end
end
rescue => ex
Expand All @@ -138,16 +138,16 @@ def ❤
nowdate = Time.now.utc.strftime("%Y-%m-%d")

Sidekiq.redis do |conn|
conn.multi do
conn.incrby("stat:processed", procd)
conn.incrby("stat:processed:#{nowdate}", procd)
conn.expire("stat:processed:#{nowdate}", STATS_TTL)
conn.multi do |transaction|
transaction.incrby("stat:processed", procd)
transaction.incrby("stat:processed:#{nowdate}", procd)
transaction.expire("stat:processed:#{nowdate}", STATS_TTL)

conn.incrby("stat:failed", fails)
conn.incrby("stat:failed:#{nowdate}", fails)
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
transaction.incrby("stat:failed", fails)
transaction.incrby("stat:failed:#{nowdate}", fails)
transaction.expire("stat:failed:#{nowdate}", STATS_TTL)

conn.unlink(workers_key)
transaction.unlink(workers_key)
curstate.each_pair do |tid, hash|
conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
end
Expand All @@ -161,17 +161,17 @@ def ❤
kb = memory_usage(::Process.pid)

_, exists, _, _, msg = Sidekiq.redis { |conn|
conn.multi {
conn.sadd("processes", key)
conn.exists?(key)
conn.hmset(key, "info", to_json,
conn.multi { |transaction|
transaction.sadd("processes", key)
transaction.exists?(key)
transaction.hmset(key, "info", to_json,
"busy", curstate.size,
"beat", Time.now.to_f,
"rtt_us", rtt,
"quiet", @done,
"rss", kb)
conn.expire(key, 60)
conn.rpop("#{key}-signals")
transaction.expire(key, 60)
transaction.rpop("#{key}-signals")
}
}

Expand Down
14 changes: 7 additions & 7 deletions lib/sidekiq/paginator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ def page(key, pageidx = 1, page_size = 25, opts = nil)

case type
when "zset"
total_size, items = conn.multi {
conn.zcard(key)
total_size, items = conn.multi { |transaction|
transaction.zcard(key)
if rev
conn.zrevrange(key, starting, ending, with_scores: true)
transaction.zrevrange(key, starting, ending, with_scores: true)
else
conn.zrange(key, starting, ending, with_scores: true)
transaction.zrange(key, starting, ending, with_scores: true)
end
}
[current_page, total_size, items]
when "list"
total_size, items = conn.multi {
total_size, items = conn.multi { |transaction|
conn.llen(key)
if rev
conn.lrange(key, -ending - 1, -starting - 1)
transaction.lrange(key, -ending - 1, -starting - 1)
else
conn.lrange(key, starting, ending)
transaction.lrange(key, starting, ending)
end
}
items.reverse! if rev
Expand Down
14 changes: 7 additions & 7 deletions test/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,10 @@ class WorkerWithTags

time = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.sadd('processes', odata['key'])
conn.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'beat', time)
conn.sadd('processes', 'fake:pid')
conn.multi do |transaction|
transaction.sadd('processes', odata['key'])
transaction.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'beat', time)
transaction.sadd('processes', 'fake:pid')
end
end

Expand Down Expand Up @@ -621,9 +621,9 @@ class WorkerWithTags
s = "#{key}:workers"
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) })
Sidekiq.redis do |c|
c.multi do
c.hmset(s, '5678', data)
c.hmset("b#{s}", '5678', data)
c.multi do |transaction|
transaction.hmset(s, '5678', data)
transaction.hmset("b#{s}", '5678', data)
end
end

Expand Down
8 changes: 4 additions & 4 deletions test/test_web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,10 @@ def add_worker
key = "#{hostname}:#{$$}"
msg = "{\"queue\":\"default\",\"payload\":{\"retry\":true,\"queue\":\"default\",\"timeout\":20,\"backtrace\":5,\"class\":\"HardWorker\",\"args\":[\"bob\",10,5],\"jid\":\"2b5ad2b016f5e063a1c62872\"},\"run_at\":1361208995}"
Sidekiq.redis do |conn|
conn.multi do
conn.sadd("processes", key)
conn.hmset(key, 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f, "queues" => []), 'at', Time.now.to_f, 'busy', 4)
conn.hmset("#{key}:workers", Time.now.to_f, msg)
conn.multi do |transaction|
transaction.sadd("processes", key)
transaction.hmset(key, 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f, "queues" => []), 'at', Time.now.to_f, 'busy', 4)
transaction.hmset("#{key}:workers", Time.now.to_f, msg)
end
end
end
Expand Down

0 comments on commit 3f4a2e5

Please sign in to comment.