diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 72368464..c2b58d09 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -91,19 +91,16 @@ def pipelined pipeline.execute end - def multi(watch: nil) + def multi(watch: nil, &block) if watch.nil? || watch.empty? transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder) yield transaction return transaction.execute end - ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking| - transaction = ::RedisClient::Cluster::Transaction.new( - @router, @command_builder, node: c, slot: slot, asking: asking - ) - yield transaction - transaction.execute + watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder) + watcher.watch(watch) do + watcher.multi(&block) end end @@ -128,6 +125,17 @@ def close private + # This API is called by redis-clustering/redis-rb, but requries further refinement before we commit + # to making it part of redis-cluster-client's official public API. + def watch(keys) + raise ArgumentError, "#{self.class.name}#watch requires a block for the initial watch" unless block_given? + + watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder) + watcher.watch(keys) do + yield watcher + end + end + def process_with_arguments(key, hashtag) # rubocop:disable Metrics/CyclomaticComplexity raise ArgumentError, 'Only one of key or hashtag may be provided' if key && hashtag diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index 8a7651ea..cfdfa2b1 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -6,30 +6,58 @@ class RedisClient class Cluster class OptimisticLocking - def initialize(router) + def initialize(router, command_builder) @router = router + @command_builder = command_builder + @slot = nil + @conn = nil @asking = false end - def watch(keys) - slot = find_slot(keys) - raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil? + def watch(keys, &block) + if @conn + # We're already watching, and the caller wants to watch additional keys + add_to_watch(keys) + else + # First call to #watch + start_watch(keys, &block) + end + end + + def unwatch + @conn.call('UNWATCH') + end + + def multi + transaction = ::RedisClient::Cluster::Transaction.new( + @router, @command_builder, node: @conn, slot: @slot, asking: @asking + ) + yield transaction + transaction.execute + end + + private + + def start_watch(keys) + @slot = find_slot(keys) + raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if @slot.nil? # We have not yet selected a node for this transaction, initially, which means we can handle # redirections freely initially (i.e. for the first WATCH call) - node = @router.find_primary_node_by_slot(slot) + node = @router.find_primary_node_by_slot(@slot) handle_redirection(node, retry_count: 1) do |nd| nd.with do |c| c.ensure_connected_cluster_scoped(retryable: false) do - c.call('ASKING') if @asking - c.call('WATCH', *keys) + @conn = c + @conn.call('ASKING') if @asking + @conn.call('WATCH', *keys) begin - yield(c, slot, @asking) + yield(c, @slot, @asking) rescue ::RedisClient::ConnectionError # No need to unwatch on a connection error. raise rescue StandardError - c.call('UNWATCH') + unwatch raise end end @@ -37,7 +65,12 @@ def watch(keys) end end - private + def add_to_watch(keys) + slot = find_slot(keys) + raise ::RedisClient::Cluster::Transaction::ConsistencyError, "inconsistent watch: #{keys.join(' ')}" if slot != @slot + + @conn.call('WATCH', *keys) + end def handle_redirection(node, retry_count: 1, &blk) @router.handle_redirection(node, retry_count: retry_count) do |nd| diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 5583e48f..898d3f31 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -46,7 +46,6 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi when 'memory' then send_memory_command(method, command, args, &block) when 'script' then send_script_command(method, command, args, &block) when 'pubsub' then send_pubsub_command(method, command, args, &block) - when 'watch' then send_watch_command(command, &block) when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save' @node.call_all(method, command, args).first.then(&TSF.call(block)) when 'flushall', 'flushdb' @@ -311,19 +310,6 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics end end - # for redis-rb - def send_watch_command(command) - raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given? - - ::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot, asking| - transaction = ::RedisClient::Cluster::Transaction.new( - self, @command_builder, node: c, slot: slot, asking: asking - ) - yield transaction - transaction.execute - end - end - def update_cluster_info! @node.reload! end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 0b8cead7..eca79e3c 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -461,11 +461,13 @@ def test_transaction_in_race_condition def test_transaction_with_dedicated_watch_command @client.call('MSET', '{key}1', '0', '{key}2', '0') - got = @client.call('WATCH', '{key}1', '{key}2') do |tx| - tx.call('ECHO', 'START') - tx.call('SET', '{key}1', '1') - tx.call('SET', '{key}2', '2') - tx.call('ECHO', 'FINISH') + got = @client.send(:watch, ['{key}1', '{key}2']) do |watcher| + watcher.multi do |tx| + tx.call('ECHO', 'START') + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + tx.call('ECHO', 'FINISH') + end end assert_equal(%w[START OK OK FINISH], got) @@ -473,8 +475,8 @@ def test_transaction_with_dedicated_watch_command end def test_transaction_with_dedicated_watch_command_without_block - assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.call('WATCH', '{key}1', '{key}2') + assert_raises(ArgumentError) do + @client.send(:watch, ['{key}1', '{key}2']) end end