Skip to content

Commit

Permalink
Deprecate Redis#queue, Redis#commit and Redis#pipelined without…
Browse files Browse the repository at this point in the history
… block.

All theses make a lot of assumptions on the threading model and are barely
thread safe.

The new favored API is:

```ruby
redis.pipelined do |pipeline|
  pipeline.get("foo")
  pipeline.del("bar")
end
```

This API allow multiple threads to build pipelines concurrently on the same
connection, and is more friendly to Fiber based concurrency.

Fix: #1057
  • Loading branch information
byroot committed Jan 20, 2022
1 parent cbdb53e commit 57f5053
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 62 deletions.
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ Metrics/BlockNesting:

Style/HashTransformValues:
Enabled: false

Style/SymbolProc:
Exclude:
- 'test/**/*'
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ commands to Redis and gathers their replies. These replies are returned
by the `#pipelined` method.

```ruby
redis.pipelined do
redis.set "foo", "bar"
redis.incr "baz"
redis.pipelined do |pipeline|
pipeline.set "foo", "bar"
pipeline.incr "baz"
end
# => ["OK", 1]
```
Expand All @@ -210,15 +210,15 @@ end
### Futures

Replies to commands in a pipeline can be accessed via the *futures* they
emit (since redis-rb 3.0). All calls inside a pipeline block return a
emit (since redis-rb 3.0). All calls on the pipeline object return a
`Future` object, which responds to the `#value` method. When the
pipeline has successfully executed, all futures are assigned their
respective replies and can be used.

```ruby
redis.pipelined do
@set = redis.set "foo", "bar"
@incr = redis.incr "baz"
redis.pipelined do |pipeline|
@set = pipeline.set "foo", "bar"
@incr = pipeline.incr "baz"
end

@set.value
Expand Down
27 changes: 23 additions & 4 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "redis/commands"

class Redis
BASE_PATH = __dir__
@exists_returns_integer = true

class << self
Expand Down Expand Up @@ -104,6 +105,11 @@ def close
# See http://redis.io/topics/pipelining for more details.
#
def queue(*command)
Kernel.warn(
"Redis#queue is deprecated and will be removed in Redis 5.0.0. Use Redis#pipelined instead." \
"(called from: #{caller(1, 1).first})"
)

synchronize do
@queue[Thread.current.object_id] << command
end
Expand All @@ -114,6 +120,11 @@ def queue(*command)
# See http://redis.io/topics/pipelining for more details.
#
def commit
Kernel.warn(
"Redis#commit is deprecated and will be removed in Redis 5.0.0. Use Redis#pipelined instead. " \
"(called from: #{Kernel.caller(1, 1).first})"
)

synchronize do |client|
begin
pipeline = Pipeline.new(client)
Expand Down Expand Up @@ -193,12 +204,20 @@ def unwatch
end
end

def pipelined
def pipelined(&block)
deprecation_displayed = false
if block&.arity == 0
Pipeline.deprecation_warning(Kernel.caller_locations(1, 5))
deprecation_displayed = true
end

synchronize do |prior_client|
begin
@client = Pipeline.new(prior_client)
yield(self)
prior_client.call_pipeline(@client)
pipeline = Pipeline.new(prior_client)
@client = deprecation_displayed ? pipeline : DeprecatedPipeline.new(pipeline)
pipelined_connection = PipelinedConnection.new(pipeline)
yield pipelined_connection
prior_client.call_pipeline(pipeline)
ensure
@client = prior_client
end
Expand Down
78 changes: 78 additions & 0 deletions lib/redis/pipeline.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,70 @@
# frozen_string_literal: true

require "delegate"

class Redis
class PipelinedConnection
def initialize(pipeline)
@pipeline = pipeline
end

include Commands

def db
@pipeline.db
end

def db=(db)
@pipeline.db = db
end

def pipelined
yield self
end

private

def synchronize
yield self
end

def send_command(command, &block)
@pipeline.call(command, &block)
end

def send_blocking_command(command, timeout, &block)
@pipeline.call_with_timeout(command, timeout, &block)
end
end

class Pipeline
REDIS_INTERNAL_PATH = File.expand_path("..", __dir__).freeze
# Redis use MonitorMixin#synchronize and this class use DelegateClass which we want to filter out.
# Both are in the stdlib so we can simply filter the entire stdlib out.
STDLIB_PATH = File.expand_path("..", MonitorMixin.instance_method(:synchronize).source_location.first).freeze

class << self
def deprecation_warning(caller_locations) # :nodoc:
callsite = caller_locations.find { |l| !l.path.start_with?(REDIS_INTERNAL_PATH, STDLIB_PATH) }
callsite ||= caller_locations.last # The caller_locations should be large enough, but just in case.
Kernel.warn <<~MESSAGE
Pipelining commands on a Redis instance is deprecated and will be removed in Redis 5.0.0.
redis.pipelined do
redis.get("key")
end
should be replaced by
redis.pipelined do |pipeline|
pipeline.get("key")
end
(called from #{callsite}}
MESSAGE
end
end

attr_accessor :db
attr_reader :client

Expand Down Expand Up @@ -124,6 +187,21 @@ def commands
end
end

DeprecatedPipeline = DelegateClass(Pipeline) do
def initialize(pipeline)
super(pipeline)
@deprecation_displayed = false
end

def __getobj__
unless @deprecation_displayed
Pipeline.deprecation_warning(Kernel.caller_locations(1, 10))
@deprecation_displayed = true
end
@delegate_dc_obj
end
end

class FutureNotReady < RuntimeError
def initialize
super("Value will be available once the pipeline executes.")
Expand Down
Loading

0 comments on commit 57f5053

Please sign in to comment.