Skip to content

Add an explicit #watch method to RedisClient::Cluster #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,16 @@ def pipelined
pipeline.execute
end

def multi(watch: nil)
def multi(watch: nil, &block)
if watch.nil? || watch.empty?
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
yield transaction
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
watcher.watch(watch) do
watcher.multi(&block)
end
end

Expand All @@ -128,6 +125,17 @@ def close

private

# This API is called by redis-clustering/redis-rb, but requries further refinement before we commit
# to making it part of redis-cluster-client's official public API.
def watch(keys)
raise ArgumentError, "#{self.class.name}#watch requires a block for the initial watch" unless block_given?

watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
watcher.watch(keys) do
yield watcher
end
end

def process_with_arguments(key, hashtag) # rubocop:disable Metrics/CyclomaticComplexity
raise ArgumentError, 'Only one of key or hashtag may be provided' if key && hashtag

Expand Down
53 changes: 43 additions & 10 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,71 @@
class RedisClient
class Cluster
class OptimisticLocking
def initialize(router)
def initialize(router, command_builder)
@router = router
@command_builder = command_builder
@slot = nil
@conn = nil
@asking = false
end

def watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
def watch(keys, &block)
if @conn
# We're already watching, and the caller wants to watch additional keys
add_to_watch(keys)
else
# First call to #watch
start_watch(keys, &block)
end
end

def unwatch
@conn.call('UNWATCH')
end

def multi
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: @conn, slot: @slot, asking: @asking
)
yield transaction
transaction.execute
end

private

def start_watch(keys)
@slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if @slot.nil?

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
node = @router.find_primary_node_by_slot(@slot)
handle_redirection(node, retry_count: 1) do |nd|
nd.with do |c|
c.ensure_connected_cluster_scoped(retryable: false) do
c.call('ASKING') if @asking
c.call('WATCH', *keys)
@conn = c
@conn.call('ASKING') if @asking
@conn.call('WATCH', *keys)
begin
yield(c, slot, @asking)
yield(c, @slot, @asking)
rescue ::RedisClient::ConnectionError
# No need to unwatch on a connection error.
raise
rescue StandardError
c.call('UNWATCH')
unwatch
raise
end
end
end
end
end

private
def add_to_watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "inconsistent watch: #{keys.join(' ')}" if slot != @slot

@conn.call('WATCH', *keys)
end

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
Expand Down
14 changes: 0 additions & 14 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
when 'memory' then send_memory_command(method, command, args, &block)
when 'script' then send_script_command(method, command, args, &block)
when 'pubsub' then send_pubsub_command(method, command, args, &block)
when 'watch' then send_watch_command(command, &block)
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
@node.call_all(method, command, args).first.then(&TSF.call(block))
when 'flushall', 'flushdb'
Expand Down Expand Up @@ -311,19 +310,6 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
end
end

# for redis-rb
def send_watch_command(command)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given?

::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
self, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
end
end

def update_cluster_info!
@node.reload!
end
Expand Down
16 changes: 9 additions & 7 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -461,20 +461,22 @@ def test_transaction_in_race_condition
def test_transaction_with_dedicated_watch_command
@client.call('MSET', '{key}1', '0', '{key}2', '0')

got = @client.call('WATCH', '{key}1', '{key}2') do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
got = @client.send(:watch, ['{key}1', '{key}2']) do |watcher|
watcher.multi do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
end
end

assert_equal(%w[START OK OK FINISH], got)
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_with_dedicated_watch_command_without_block
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.call('WATCH', '{key}1', '{key}2')
assert_raises(ArgumentError) do
@client.send(:watch, ['{key}1', '{key}2'])
end
end

Expand Down