diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index 1981c4846bb3..04456e82ac21 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -392,6 +392,7 @@ DispatchTracker::DispatchTracker(absl::Span listeners, issuer_{issuer}, ignore_paused_{ignore_paused}, ignore_blocked_{ignore_blocked} { + bc_ = make_unique(0); } void DispatchTracker::TrackOnThread() { @@ -400,7 +401,15 @@ void DispatchTracker::TrackOnThread() { } bool DispatchTracker::Wait(absl::Duration duration) { - return bc_.WaitFor(absl::ToChronoMilliseconds(duration)); + bool res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); + if (!res && ignore_blocked_) { + // We track all connections again because a connection might became blocked between the time + // we call tracking the last time. + bc_.reset(new util::fb2::BlockingCounter(0)); + TrackAll(); + res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); + } + return res; } void DispatchTracker::TrackAll() { @@ -410,7 +419,7 @@ void DispatchTracker::TrackAll() { void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) { if (auto* fconn = static_cast(conn); fconn != issuer_) - fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_); + fconn->SendCheckpoint(*bc_, ignore_paused_, ignore_blocked_); } } // namespace facade diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index e6cc751d1abd..b78b29699660 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -103,7 +103,7 @@ class DispatchTracker { std::vector listeners_; facade::Connection* issuer_; - util::fb2::BlockingCounter bc_{0}; + std::unique_ptr bc_; bool ignore_paused_; bool ignore_blocked_; }; diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 8b42e65f584e..7eb6f12c48c2 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -107,6 +107,10 @@ class CommandId : public facade::CommandId { return opt_mask_ & CO::WRITE; } + bool IsBlocking() const { + return opt_mask_ & CO::BLOCKING; + } + static const char* OptName(CO::CommandOpt fl); CommandId&& SetHandler(Handler f) && { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 0629b49a6a2b..06f81cdadd44 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span args_list, // paired with shardlocal eval const bool is_eval = CO::IsEvalKind(ArgS(args, 0)); - if (!is_multi && !is_eval && cid != nullptr) { + const bool is_blocking = cid != nullptr && cid->IsBlocking(); + + if (!is_multi && !is_eval && !is_blocking && cid != nullptr) { stored_cmds.reserve(args_list.size()); stored_cmds.emplace_back(cid, tail_args); continue; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b3f6fdd1d497..6ea66553392d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -559,9 +559,13 @@ string_view GetRedisMode() { std::optional Pause(absl::Span listeners, facade::Connection* conn, ClientPause pause_state, std::function is_pause_in_progress) { - // Set global pause state and track commands that are running when the pause state is flipped. - // Exlude already paused commands from the busy count. - DispatchTracker tracker{listeners, conn, true /* ignore paused commands */}; + // Track connections and set pause state to be able to wait untill all running transactions read + // the new pause state. Exlude already paused commands from the busy count. Exlude tracking + // blocked connections because: a) If the connection is blocked it is puased. b) We read pause + // state after waking from blocking so if the trasaction was waken by another running + // command that did not pause on the new state yet we will pause after waking up. + DispatchTracker tracker{listeners, conn, true /* ignore paused commands */, + true /*ignore blocking*/}; shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) { // Commands don't suspend before checking the pause state, so // it's impossible to deadlock on waiting for a command that will be paused. @@ -569,7 +573,6 @@ std::optional Pause(absl::Span listeners, ServerState::tlocal()->SetPauseState(pause_state, true); }); - // TODO handle blocking commands // Wait for all busy commands to finish running before replying to guarantee // that no more (write) operations will occur. const absl::Duration kDispatchTimeout = absl::Seconds(1); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 59c47b931f06..3ce72ed62b43 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1349,7 +1349,7 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); - + // TBD set connection blocking state // Wait for the blocking barrier to be closed. // Note: It might return immediately if another thread already notified us. cv_status status = blocking_barrier_.Wait(tp); @@ -1357,6 +1357,9 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); --stats->num_blocked_clients; + // TBD set connection pause state + ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands + OpStatus result = OpStatus::OK; if (status == cv_status::timeout) { result = OpStatus::TIMED_OUT; diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 601e5b064cbd..33389e116f00 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -1,4 +1,5 @@ import random +import string import pytest import asyncio import time @@ -737,3 +738,55 @@ async def do_write(): await asyncio.sleep(0.0) assert p3.done() await p3 + + +@pytest.mark.asyncio +async def test_blocking_command_client_pause(async_client: aioredis.Redis): + """ + 1. Check client pause success when blocking transaction is running + 2. lpush is paused after running client puase + 3. once puased is finished lpush will run and blpop will pop the pushed value + """ + + async def blocking_command(): + res = await async_client.execute_command("blpop key 2") + assert res == ["key", "value"] + + async def lpush_command(): + await async_client.execute_command("lpush key value") + + blocking = asyncio.create_task(blocking_command()) + await asyncio.sleep(0.1) + + res = await async_client.execute_command("client pause 1000") + assert res == "OK" + + lpush = asyncio.create_task(lpush_command()) + assert not lpush.done() + + await lpush + await blocking + + +@pytest.mark.asyncio +async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis): + """ + Check running client pause command simultaneously with running multiple blocking command + from multiple connections + """ + + async def just_blpop(): + key = "".join(random.choices(string.ascii_letters, k=3)) + await async_client.execute_command(f"blpop {key} 2") + + async def client_pause(): + res = await async_client.execute_command("client pause 1000") + assert res == "OK" + + tasks = [just_blpop() for _ in range(20)] + tasks.append(client_pause()) + + all = asyncio.gather(*tasks) + + assert not all.done() + await all