From a03cd88178fd2db1729cd8d2cc71ca8a210697c2 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Tue, 13 Feb 2024 15:01:45 +0200 Subject: [PATCH 1/3] fix(server): client puase work while blocking commands run Signed-off-by: adi_holden --- src/server/server_family.cc | 3 ++- tests/dragonfly/connection_test.py | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b3f6fdd1d497..8f231275445b 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -561,7 +561,8 @@ std::optional Pause(absl::Span listeners, std::function is_pause_in_progress) { // Set global pause state and track commands that are running when the pause state is flipped. // Exlude already paused commands from the busy count. - DispatchTracker tracker{listeners, conn, true /* ignore paused commands */}; + DispatchTracker tracker{listeners, conn, true /* ignore paused commands */, + true /*ignore blocking*/}; shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) { // Commands don't suspend before checking the pause state, so // it's impossible to deadlock on waiting for a command that will be paused. diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 601e5b064cbd..63a57073045a 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -737,3 +737,24 @@ async def do_write(): await asyncio.sleep(0.0) assert p3.done() await p3 + + +@pytest.mark.asyncio +async def test_blocking_command_client_pause(async_client: aioredis.Redis): + async def blocking_command(): + await async_client.execute_command("blpop key 2") + + async def read_command(): + await async_client.execute_command("get key1") + + blocking = asyncio.create_task(blocking_command()) + await asyncio.sleep(0.1) + + await async_client.execute_command("client pause 1000") + + read = asyncio.create_task(read_command()) + assert not read.done() + + await read + assert not blocking.done() + await blocking From 396a7cd66afc4b8be8949a261f3e89b98630cb17 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 26 Feb 2024 16:53:58 +0200 Subject: [PATCH 2/3] fix PR Signed-off-by: adi_holden --- src/facade/dragonfly_listener.cc | 13 ++++++-- src/facade/dragonfly_listener.h | 2 +- src/server/command_registry.h | 4 +++ src/server/main_service.cc | 4 ++- src/server/server_family.cc | 1 - src/server/transaction.cc | 5 +++- tests/dragonfly/connection_test.py | 48 +++++++++++++++++++++++++----- 7 files changed, 63 insertions(+), 14 deletions(-) diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index 1981c4846bb3..be5089f58357 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -392,6 +392,7 @@ DispatchTracker::DispatchTracker(absl::Span listeners, issuer_{issuer}, ignore_paused_{ignore_paused}, ignore_blocked_{ignore_blocked} { + bc_ = make_unique(0); } void DispatchTracker::TrackOnThread() { @@ -400,7 +401,15 @@ void DispatchTracker::TrackOnThread() { } bool DispatchTracker::Wait(absl::Duration duration) { - return bc_.WaitFor(absl::ToChronoMilliseconds(duration)); + bool res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); + if (!res && ignore_blocked_) { + // We track all connections again because a connection might became blocked between the time + // we call tracking the last time. + bc_ = make_unique(0); + TrackAll(); + res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); + } + return res; } void DispatchTracker::TrackAll() { @@ -410,7 +419,7 @@ void DispatchTracker::TrackAll() { void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) { if (auto* fconn = static_cast(conn); fconn != issuer_) - fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_); + fconn->SendCheckpoint(*bc_, ignore_paused_, ignore_blocked_); } } // namespace facade diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index e6cc751d1abd..b78b29699660 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -103,7 +103,7 @@ class DispatchTracker { std::vector listeners_; facade::Connection* issuer_; - util::fb2::BlockingCounter bc_{0}; + std::unique_ptr bc_; bool ignore_paused_; bool ignore_blocked_; }; diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 8b42e65f584e..7eb6f12c48c2 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -107,6 +107,10 @@ class CommandId : public facade::CommandId { return opt_mask_ & CO::WRITE; } + bool IsBlocking() const { + return opt_mask_ & CO::BLOCKING; + } + static const char* OptName(CO::CommandOpt fl); CommandId&& SetHandler(Handler f) && { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 0629b49a6a2b..06f81cdadd44 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span args_list, // paired with shardlocal eval const bool is_eval = CO::IsEvalKind(ArgS(args, 0)); - if (!is_multi && !is_eval && cid != nullptr) { + const bool is_blocking = cid != nullptr && cid->IsBlocking(); + + if (!is_multi && !is_eval && !is_blocking && cid != nullptr) { stored_cmds.reserve(args_list.size()); stored_cmds.emplace_back(cid, tail_args); continue; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8f231275445b..1b3975f71c7c 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -570,7 +570,6 @@ std::optional Pause(absl::Span listeners, ServerState::tlocal()->SetPauseState(pause_state, true); }); - // TODO handle blocking commands // Wait for all busy commands to finish running before replying to guarantee // that no more (write) operations will occur. const absl::Duration kDispatchTimeout = absl::Seconds(1); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 59c47b931f06..3ce72ed62b43 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1349,7 +1349,7 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); - + // TBD set connection blocking state // Wait for the blocking barrier to be closed. // Note: It might return immediately if another thread already notified us. cv_status status = blocking_barrier_.Wait(tp); @@ -1357,6 +1357,9 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); --stats->num_blocked_clients; + // TBD set connection pause state + ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands + OpStatus result = OpStatus::OK; if (status == cv_status::timeout) { result = OpStatus::TIMED_OUT; diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 63a57073045a..33389e116f00 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -1,4 +1,5 @@ import random +import string import pytest import asyncio import time @@ -741,20 +742,51 @@ async def do_write(): @pytest.mark.asyncio async def test_blocking_command_client_pause(async_client: aioredis.Redis): + """ + 1. Check client pause success when blocking transaction is running + 2. lpush is paused after running client puase + 3. once puased is finished lpush will run and blpop will pop the pushed value + """ + async def blocking_command(): - await async_client.execute_command("blpop key 2") + res = await async_client.execute_command("blpop key 2") + assert res == ["key", "value"] - async def read_command(): - await async_client.execute_command("get key1") + async def lpush_command(): + await async_client.execute_command("lpush key value") blocking = asyncio.create_task(blocking_command()) await asyncio.sleep(0.1) - await async_client.execute_command("client pause 1000") + res = await async_client.execute_command("client pause 1000") + assert res == "OK" - read = asyncio.create_task(read_command()) - assert not read.done() + lpush = asyncio.create_task(lpush_command()) + assert not lpush.done() - await read - assert not blocking.done() + await lpush await blocking + + +@pytest.mark.asyncio +async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis): + """ + Check running client pause command simultaneously with running multiple blocking command + from multiple connections + """ + + async def just_blpop(): + key = "".join(random.choices(string.ascii_letters, k=3)) + await async_client.execute_command(f"blpop {key} 2") + + async def client_pause(): + res = await async_client.execute_command("client pause 1000") + assert res == "OK" + + tasks = [just_blpop() for _ in range(20)] + tasks.append(client_pause()) + + all = asyncio.gather(*tasks) + + assert not all.done() + await all From a5e1b712cd5a2cc13c01756805d31bfa69c04d2c Mon Sep 17 00:00:00 2001 From: adi_holden Date: Wed, 28 Feb 2024 12:40:22 +0200 Subject: [PATCH 3/3] fix PR comments Signed-off-by: adi_holden --- src/facade/dragonfly_listener.cc | 2 +- src/server/server_family.cc | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index be5089f58357..04456e82ac21 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -405,7 +405,7 @@ bool DispatchTracker::Wait(absl::Duration duration) { if (!res && ignore_blocked_) { // We track all connections again because a connection might became blocked between the time // we call tracking the last time. - bc_ = make_unique(0); + bc_.reset(new util::fb2::BlockingCounter(0)); TrackAll(); res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 1b3975f71c7c..6ea66553392d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -559,8 +559,11 @@ string_view GetRedisMode() { std::optional Pause(absl::Span listeners, facade::Connection* conn, ClientPause pause_state, std::function is_pause_in_progress) { - // Set global pause state and track commands that are running when the pause state is flipped. - // Exlude already paused commands from the busy count. + // Track connections and set pause state to be able to wait untill all running transactions read + // the new pause state. Exlude already paused commands from the busy count. Exlude tracking + // blocked connections because: a) If the connection is blocked it is puased. b) We read pause + // state after waking from blocking so if the trasaction was waken by another running + // command that did not pause on the new state yet we will pause after waking up. DispatchTracker tracker{listeners, conn, true /* ignore paused commands */, true /*ignore blocking*/}; shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {