-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(server): client pause work while blocking commands run #2584
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we can ignore them because no updates are possible, they can't be unlocked.
The only issue I see:
- While we wait with the dispatch tracker, some command unblocks a blocking one
- The blocking one starts doing stuff, but we didn't track it, so we don't wait for it
@dranikpg you are right. Have a suggestion how we can fix this? |
@adiholden My only idea would be to call it again, maybe we can do it only if we have blocking commands.
so maybe
|
We also have one more problem, namely BLPOP on it's started phase is not counted as blocking, it has two hops:
Will report on this as well 🤔🤔🤔🤔 |
So actually having 2 hops in blpop also makes CancelBlocking funciton bugy |
Maybe it's simpler just to do busy looping Dispatch { paused = true }
do {
ok = Dispatch {
if (num_dispatch - num_blocking - num_paused > 0) return false
if (blocking_controller -> awakened > 0) return false
return true;
}
if (!ok) sleep(1ms)
} while (!ok) The difference here is that we retry this condition, so we wait for BLPOP to either finish or become blocked, so the difference = 0 |
Signed-off-by: adi_holden <adi@dragonflydb.io>
74229e1
to
396a7cd
Compare
@@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, | |||
// paired with shardlocal eval | |||
const bool is_eval = CO::IsEvalKind(ArgS(args, 0)); | |||
|
|||
if (!is_multi && !is_eval && cid != nullptr) { | |||
const bool is_blocking = cid != nullptr && cid->IsBlocking(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fix for #2661
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks 🙏🏻
@dranikpg I pushed a fix which will solve both of the seraios you mentioned above:
|
will review in 30 mins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to merge it. With a very low probably we can time out spuriously or wait one second... But most importantly it looks correct
@@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, | |||
// paired with shardlocal eval | |||
const bool is_eval = CO::IsEvalKind(ArgS(args, 0)); | |||
|
|||
if (!is_multi && !is_eval && cid != nullptr) { | |||
const bool is_blocking = cid != nullptr && cid->IsBlocking(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks 🙏🏻
@@ -392,6 +392,7 @@ DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners, | |||
issuer_{issuer}, | |||
ignore_paused_{ignore_paused}, | |||
ignore_blocked_{ignore_blocked} { | |||
bc_ = make_unique<util::fb2::BlockingCounter>(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already a smart pointer internally, you can just re-assign it
src/facade/dragonfly_listener.cc
Outdated
bool res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); | ||
if (!res && ignore_blocked_) { | ||
// We track all connections again because a connection might became blocked between the time | ||
// we call tracking the last time. | ||
bc_ = make_unique<util::fb2::BlockingCounter>(0); | ||
TrackAll(); | ||
res = bc_->WaitFor(absl::ToChronoMilliseconds(duration)); | ||
} | ||
return res; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so whenever we encounter such a case we time out on the 1 second which is somewhat unpleasant, but at least we don't miss it 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically, sometimes, (say, a few ms timeout and a packed transaction queue, it is not guaranteed, that all blocking commands reach a suspension point, so we can spuriously time out again.
I mean that blpop schedules, wait for the first hop, we hit the first timeout, wait again and only at that point blpop finishes the first hop and suspends (yet we've send it a checkpoint)
src/server/server_family.cc
Outdated
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */}; | ||
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */, | ||
true /*ignore blocking*/}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should comment why we ignore blocking commands and how they are handled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Namely that whenver a blocking command wakes up it first checks the pause state, so we don't have to track them
// Wait for the blocking barrier to be closed. | ||
// Note: It might return immediately if another thread already notified us. | ||
cv_status status = blocking_barrier_.Wait(tp); | ||
|
||
DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); | ||
--stats->num_blocked_clients; | ||
|
||
// TBD set connection pause state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, otherwise it's a deadlock 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also document the reasoning somewhere, it's not evident:
Because we set paused atomically and immediately track running commands, it should be correct.
If we wake up a thread that wasn't tracked yet, it's also not paused yet, so we'll see the woken up command as running and we'll track it
If we wake up a thread that is already paused, it will pause upon unblocking, so no data operations will be performed
|
||
|
||
@pytest.mark.asyncio | ||
async def test_blocking_command_client_pause(async_client: aioredis.Redis): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻
But please add a comment above pause why it works with blocking, even though it ignores them 😅 |
Signed-off-by: adi_holden <adi@dragonflydb.io>
fix #2576
fix #2661