Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): client pause work while blocking commands run #2584

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
issuer_{issuer},
ignore_paused_{ignore_paused},
ignore_blocked_{ignore_blocked} {
bc_ = make_unique<util::fb2::BlockingCounter>(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already a smart pointer internally, you can just re-assign it

}

void DispatchTracker::TrackOnThread() {
Expand All @@ -400,7 +401,15 @@ void DispatchTracker::TrackOnThread() {
}

bool DispatchTracker::Wait(absl::Duration duration) {
return bc_.WaitFor(absl::ToChronoMilliseconds(duration));
bool res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
if (!res && ignore_blocked_) {
// We track all connections again because a connection might became blocked between the time
// we call tracking the last time.
bc_.reset(new util::fb2::BlockingCounter(0));
TrackAll();
res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
}
return res;
}

void DispatchTracker::TrackAll() {
Expand All @@ -410,7 +419,7 @@ void DispatchTracker::TrackAll() {

void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_);
fconn->SendCheckpoint(*bc_, ignore_paused_, ignore_blocked_);
}

} // namespace facade
2 changes: 1 addition & 1 deletion src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class DispatchTracker {

std::vector<facade::Listener*> listeners_;
facade::Connection* issuer_;
util::fb2::BlockingCounter bc_{0};
std::unique_ptr<util::fb2::BlockingCounter> bc_;
bool ignore_paused_;
bool ignore_blocked_;
};
Expand Down
4 changes: 4 additions & 0 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ class CommandId : public facade::CommandId {
return opt_mask_ & CO::WRITE;
}

bool IsBlocking() const {
return opt_mask_ & CO::BLOCKING;
}

static const char* OptName(CO::CommandOpt fl);

CommandId&& SetHandler(Handler f) && {
Expand Down
4 changes: 3 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
// paired with shardlocal eval
const bool is_eval = CO::IsEvalKind(ArgS(args, 0));

if (!is_multi && !is_eval && cid != nullptr) {
const bool is_blocking = cid != nullptr && cid->IsBlocking();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fix for #2661

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🙏🏻


if (!is_multi && !is_eval && !is_blocking && cid != nullptr) {
stored_cmds.reserve(args_list.size());
stored_cmds.emplace_back(cid, tail_args);
continue;
Expand Down
11 changes: 7 additions & 4 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,17 +559,20 @@ string_view GetRedisMode() {
std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
// Set global pause state and track commands that are running when the pause state is flipped.
// Exlude already paused commands from the busy count.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */};
// Track connections and set pause state to be able to wait untill all running transactions read
// the new pause state. Exlude already paused commands from the busy count. Exlude tracking
// blocked connections because: a) If the connection is blocked it is puased. b) We read pause
// state after waking from blocking so if the trasaction was waken by another running
// command that did not pause on the new state yet we will pause after waking up.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */,
true /*ignore blocking*/};
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread();
ServerState::tlocal()->SetPauseState(pause_state, true);
});

// TODO handle blocking commands
// Wait for all busy commands to finish running before replying to guarantee
// that no more (write) operations will occur.
const absl::Duration kDispatchTimeout = absl::Seconds(1);
Expand Down
5 changes: 4 additions & 1 deletion src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1349,14 +1349,17 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();

// TBD set connection blocking state
// Wait for the blocking barrier to be closed.
// Note: It might return immediately if another thread already notified us.
cv_status status = blocking_barrier_.Wait(tp);

DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();
--stats->num_blocked_clients;

// TBD set connection pause state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise it's a deadlock 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also document the reasoning somewhere, it's not evident:

Because we set paused atomically and immediately track running commands, it should be correct.

If we wake up a thread that wasn't tracked yet, it's also not paused yet, so we'll see the woken up command as running and we'll track it

If we wake up a thread that is already paused, it will pause upon unblocking, so no data operations will be performed

ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands

OpStatus result = OpStatus::OK;
if (status == cv_status::timeout) {
result = OpStatus::TIMED_OUT;
Expand Down
53 changes: 53 additions & 0 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import string
import pytest
import asyncio
import time
Expand Down Expand Up @@ -737,3 +738,55 @@ async def do_write():
await asyncio.sleep(0.0)
assert p3.done()
await p3


@pytest.mark.asyncio
async def test_blocking_command_client_pause(async_client: aioredis.Redis):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

"""
1. Check client pause success when blocking transaction is running
2. lpush is paused after running client puase
3. once puased is finished lpush will run and blpop will pop the pushed value
"""

async def blocking_command():
res = await async_client.execute_command("blpop key 2")
assert res == ["key", "value"]

async def lpush_command():
await async_client.execute_command("lpush key value")

blocking = asyncio.create_task(blocking_command())
await asyncio.sleep(0.1)

res = await async_client.execute_command("client pause 1000")
assert res == "OK"

lpush = asyncio.create_task(lpush_command())
assert not lpush.done()

await lpush
await blocking


@pytest.mark.asyncio
async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis):
"""
Check running client pause command simultaneously with running multiple blocking command
from multiple connections
"""

async def just_blpop():
key = "".join(random.choices(string.ascii_letters, k=3))
await async_client.execute_command(f"blpop {key} 2")

async def client_pause():
res = await async_client.execute_command("client pause 1000")
assert res == "OK"

tasks = [just_blpop() for _ in range(20)]
tasks.append(client_pause())

all = asyncio.gather(*tasks)

assert not all.done()
await all
Loading