Skip to content

Commit

Permalink
pq sdk deadlock has been fixed (between Close and OnReadDone) (#3115)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored and blinkov committed Mar 26, 2024
1 parent ee65c88 commit 2ef6dd4
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,10 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
EventsQueue.clear();
}

TRawPartitionStreamEventQueue<UseMigrationProtocol> ExtractQueue() noexcept {
return std::move(EventsQueue);
}

static void GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
size_t& maxEventsCount,
size_t& maxByteSize,
Expand Down Expand Up @@ -796,14 +800,16 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti

bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
TWaiter waiter;
TVector<TRawPartitionStreamEventQueue<UseMigrationProtocol>> defferedDelete;
with_lock (TParent::Mutex) {
if (TParent::Closed) {
return false;
}
defferedDelete.reserve(TParent::Events.size());
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
if (!event.IsEmpty()) {
event.PartitionStream->ClearQueue();
defferedDelete.push_back(event.PartitionStream->ExtractQueue());
}
TParent::Events.pop();
}
Expand All @@ -812,6 +818,9 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
waiter = TWaiter(TParent::Waiter.ExtractPromise(), this);
}

// Delayed deletion is necessary to avoid deadlock with PushEvent
defferedDelete.clear();

TReadSessionEventInfo<UseMigrationProtocol> info(event);
ApplyHandler(info, deferred);
deferred.DeferSignalWaiter(std::move(waiter));
Expand Down

0 comments on commit 2ef6dd4

Please sign in to comment.