From 3f4a2e5ab053b4382b79529299608d282478f411 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 24 Jan 2022 14:58:06 +0100 Subject: [PATCH] Fix deprecated uses of `Redis#pipelined` and `Redis#multi` Context: https://github.com/redis/redis-rb/pull/1059 The following is deprecated ```ruby redis.pipelined do redis.get(key) end ``` And should be rewritten as: ```ruby redis.pipelined do |pipeline| pipeline.get(key) end ``` Functionally it makes no difference. --- lib/sidekiq/api.rb | 74 ++++++++++++++++++++-------------------- lib/sidekiq/client.rb | 4 +-- lib/sidekiq/fetch.rb | 4 +-- lib/sidekiq/launcher.rb | 50 +++++++++++++-------------- lib/sidekiq/paginator.rb | 14 ++++---- test/test_api.rb | 14 ++++---- test/test_web.rb | 8 ++--- 7 files changed, 84 insertions(+), 84 deletions(-) diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 89ffac9ed3..ac4dbd0a7b 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -54,14 +54,14 @@ def queues # O(1) redis calls def fetch_stats_fast! pipe1_res = Sidekiq.redis { |conn| - conn.pipelined do - conn.get("stat:processed") - conn.get("stat:failed") - conn.zcard("schedule") - conn.zcard("retry") - conn.zcard("dead") - conn.scard("processes") - conn.lrange("queue:default", -1, -1) + conn.pipelined do |pipeline| + pipeline.get("stat:processed") + pipeline.get("stat:failed") + pipeline.zcard("schedule") + pipeline.zcard("retry") + pipeline.zcard("dead") + pipeline.scard("processes") + pipeline.lrange("queue:default", -1, -1) end } @@ -101,9 +101,9 @@ def fetch_stats_slow! } pipe2_res = Sidekiq.redis { |conn| - conn.pipelined do - processes.each { |key| conn.hget(key, "busy") } - queues.each { |queue| conn.llen("queue:#{queue}") } + conn.pipelined do |pipeline| + processes.each { |key| pipeline.hget(key, "busy") } + queues.each { |queue| pipeline.llen("queue:#{queue}") } end } @@ -147,9 +147,9 @@ def lengths Sidekiq.redis do |conn| queues = conn.sscan_each("queues").to_a - lengths = conn.pipelined { + lengths = conn.pipelined { |pipeline| queues.each do |queue| - conn.llen("queue:#{queue}") + pipeline.llen("queue:#{queue}") end } @@ -287,9 +287,9 @@ def find_job(jid) def clear Sidekiq.redis do |conn| - conn.multi do - conn.unlink(@rname) - conn.srem("queues", name) + conn.multi do |transaction| + transaction.unlink(@rname) + transaction.srem("queues", name) end end end @@ -519,9 +519,9 @@ def error? def remove_job Sidekiq.redis do |conn| - results = conn.multi { - conn.zrangebyscore(parent.name, score, score) - conn.zremrangebyscore(parent.name, score, score) + results = conn.multi { |transaction| + transaction.zrangebyscore(parent.name, score, score) + transaction.zremrangebyscore(parent.name, score, score) }.first if results.size == 1 @@ -542,9 +542,9 @@ def remove_job yield msg if msg # push the rest back onto the sorted set - conn.multi do + conn.multi do |transaction| nonmatched.each do |message| - conn.zadd(parent.name, score.to_f.to_s, message) + transaction.zadd(parent.name, score.to_f.to_s, message) end end end @@ -731,10 +731,10 @@ def initialize def kill(message, opts = {}) now = Time.now.to_f Sidekiq.redis do |conn| - conn.multi do - conn.zadd(name, now.to_s, message) - conn.zremrangebyscore(name, "-inf", now - self.class.timeout) - conn.zremrangebyrank(name, 0, - self.class.max_jobs) + conn.multi do |transaction| + transaction.zadd(name, now.to_s, message) + transaction.zremrangebyscore(name, "-inf", now - self.class.timeout) + transaction.zremrangebyrank(name, 0, - self.class.max_jobs) end end @@ -782,9 +782,9 @@ def cleanup count = 0 Sidekiq.redis do |conn| procs = conn.sscan_each("processes").to_a.sort - heartbeats = conn.pipelined { + heartbeats = conn.pipelined { |pipeline| procs.each do |key| - conn.hget(key, "info") + pipeline.hget(key, "info") end } @@ -806,9 +806,9 @@ def each # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way - conn.pipelined do + conn.pipelined do |pipeline| procs.each do |key| - conn.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us") + pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us") end end } @@ -922,9 +922,9 @@ def stopping? def signal(sig) key = "#{identity}-signals" Sidekiq.redis do |c| - c.multi do - c.lpush(key, sig) - c.expire(key, 60) + c.multi do |transaction| + transaction.lpush(key, sig) + transaction.expire(key, 60) end end end @@ -958,9 +958,9 @@ def each(&block) Sidekiq.redis do |conn| procs = conn.sscan_each("processes").to_a procs.sort.each do |key| - valid, workers = conn.pipelined { - conn.exists?(key) - conn.hgetall("#{key}:workers") + valid, workers = conn.pipelined { |pipeline| + pipeline.exists?(key) + pipeline.hgetall("#{key}:workers") } next unless valid workers.each_pair do |tid, json| @@ -988,9 +988,9 @@ def size if procs.empty? 0 else - conn.pipelined { + conn.pipelined { |pipeline| procs.each do |key| - conn.hget(key, "busy") + pipeline.hget(key, "busy") end }.sum(&:to_i) end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 2b991a2ded..9a79ba22f4 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -189,8 +189,8 @@ def enqueue_in(interval, klass, *args) def raw_push(payloads) @redis_pool.with do |conn| - conn.pipelined do - atomic_push(conn, payloads) + conn.pipelined do |pipeline| + atomic_push(pipeline, payloads) end end true diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index c64d7bb314..f3d1fbb81e 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -59,9 +59,9 @@ def bulk_requeue(inprogress, options) end Sidekiq.redis do |conn| - conn.pipelined do + conn.pipelined do |pipeline| jobs_to_requeue.each do |queue, jobs| - conn.rpush(queue, jobs) + pipeline.rpush(queue, jobs) end end end diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index e3062ae6a2..73a62468f5 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -84,9 +84,9 @@ def clear_heartbeat # Note we don't stop the heartbeat thread; if the process # doesn't actually exit, it'll reappear in the Web UI. Sidekiq.redis do |conn| - conn.pipelined do - conn.srem("processes", identity) - conn.unlink("#{identity}:workers") + conn.pipelined do |pipeline| + pipeline.srem("processes", identity) + pipeline.unlink("#{identity}:workers") end end rescue @@ -107,14 +107,14 @@ def self.flush_stats nowdate = Time.now.utc.strftime("%Y-%m-%d") begin Sidekiq.redis do |conn| - conn.pipelined do - conn.incrby("stat:processed", procd) - conn.incrby("stat:processed:#{nowdate}", procd) - conn.expire("stat:processed:#{nowdate}", STATS_TTL) - - conn.incrby("stat:failed", fails) - conn.incrby("stat:failed:#{nowdate}", fails) - conn.expire("stat:failed:#{nowdate}", STATS_TTL) + conn.pipelined do |pipeline| + pipeline.incrby("stat:processed", procd) + pipeline.incrby("stat:processed:#{nowdate}", procd) + pipeline.expire("stat:processed:#{nowdate}", STATS_TTL) + + pipeline.incrby("stat:failed", fails) + pipeline.incrby("stat:failed:#{nowdate}", fails) + pipeline.expire("stat:failed:#{nowdate}", STATS_TTL) end end rescue => ex @@ -138,16 +138,16 @@ def ❤ nowdate = Time.now.utc.strftime("%Y-%m-%d") Sidekiq.redis do |conn| - conn.multi do - conn.incrby("stat:processed", procd) - conn.incrby("stat:processed:#{nowdate}", procd) - conn.expire("stat:processed:#{nowdate}", STATS_TTL) + conn.multi do |transaction| + transaction.incrby("stat:processed", procd) + transaction.incrby("stat:processed:#{nowdate}", procd) + transaction.expire("stat:processed:#{nowdate}", STATS_TTL) - conn.incrby("stat:failed", fails) - conn.incrby("stat:failed:#{nowdate}", fails) - conn.expire("stat:failed:#{nowdate}", STATS_TTL) + transaction.incrby("stat:failed", fails) + transaction.incrby("stat:failed:#{nowdate}", fails) + transaction.expire("stat:failed:#{nowdate}", STATS_TTL) - conn.unlink(workers_key) + transaction.unlink(workers_key) curstate.each_pair do |tid, hash| conn.hset(workers_key, tid, Sidekiq.dump_json(hash)) end @@ -161,17 +161,17 @@ def ❤ kb = memory_usage(::Process.pid) _, exists, _, _, msg = Sidekiq.redis { |conn| - conn.multi { - conn.sadd("processes", key) - conn.exists?(key) - conn.hmset(key, "info", to_json, + conn.multi { |transaction| + transaction.sadd("processes", key) + transaction.exists?(key) + transaction.hmset(key, "info", to_json, "busy", curstate.size, "beat", Time.now.to_f, "rtt_us", rtt, "quiet", @done, "rss", kb) - conn.expire(key, 60) - conn.rpop("#{key}-signals") + transaction.expire(key, 60) + transaction.rpop("#{key}-signals") } } diff --git a/lib/sidekiq/paginator.rb b/lib/sidekiq/paginator.rb index c1eacb43ba..e941742034 100644 --- a/lib/sidekiq/paginator.rb +++ b/lib/sidekiq/paginator.rb @@ -16,22 +16,22 @@ def page(key, pageidx = 1, page_size = 25, opts = nil) case type when "zset" - total_size, items = conn.multi { - conn.zcard(key) + total_size, items = conn.multi { |transaction| + transaction.zcard(key) if rev - conn.zrevrange(key, starting, ending, with_scores: true) + transaction.zrevrange(key, starting, ending, with_scores: true) else - conn.zrange(key, starting, ending, with_scores: true) + transaction.zrange(key, starting, ending, with_scores: true) end } [current_page, total_size, items] when "list" - total_size, items = conn.multi { + total_size, items = conn.multi { |transaction| conn.llen(key) if rev - conn.lrange(key, -ending - 1, -starting - 1) + transaction.lrange(key, -ending - 1, -starting - 1) else - conn.lrange(key, starting, ending) + transaction.lrange(key, starting, ending) end } items.reverse! if rev diff --git a/test/test_api.rb b/test/test_api.rb index 65a1e8bc21..5342ebd123 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -568,10 +568,10 @@ class WorkerWithTags time = Time.now.to_f Sidekiq.redis do |conn| - conn.multi do - conn.sadd('processes', odata['key']) - conn.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'beat', time) - conn.sadd('processes', 'fake:pid') + conn.multi do |transaction| + transaction.sadd('processes', odata['key']) + transaction.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'beat', time) + transaction.sadd('processes', 'fake:pid') end end @@ -621,9 +621,9 @@ class WorkerWithTags s = "#{key}:workers" data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) }) Sidekiq.redis do |c| - c.multi do - c.hmset(s, '5678', data) - c.hmset("b#{s}", '5678', data) + c.multi do |transaction| + transaction.hmset(s, '5678', data) + transaction.hmset("b#{s}", '5678', data) end end diff --git a/test/test_web.rb b/test/test_web.rb index d3c4b9a398..794b47f693 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -718,10 +718,10 @@ def add_worker key = "#{hostname}:#{$$}" msg = "{\"queue\":\"default\",\"payload\":{\"retry\":true,\"queue\":\"default\",\"timeout\":20,\"backtrace\":5,\"class\":\"HardWorker\",\"args\":[\"bob\",10,5],\"jid\":\"2b5ad2b016f5e063a1c62872\"},\"run_at\":1361208995}" Sidekiq.redis do |conn| - conn.multi do - conn.sadd("processes", key) - conn.hmset(key, 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f, "queues" => []), 'at', Time.now.to_f, 'busy', 4) - conn.hmset("#{key}:workers", Time.now.to_f, msg) + conn.multi do |transaction| + transaction.sadd("processes", key) + transaction.hmset(key, 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f, "queues" => []), 'at', Time.now.to_f, 'busy', 4) + transaction.hmset("#{key}:workers", Time.now.to_f, msg) end end end