Skip to content

Commit

Permalink
Cherry pick fix leak (#3167)
Browse files Browse the repository at this point in the history
Co-authored-by: Oleg Doronin <dorooleg@yandex.ru>
  • Loading branch information
ildar-khisambeev and dorooleg authored Mar 26, 2024
1 parent 6fbfd4a commit d563efb
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 6 deletions.
4 changes: 0 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ ydb/core/client/ut TClientTest.ReadFromFollower
ydb/core/client/ut TFlatTest.AutoSplitMergeQueue
ydb/core/cms/ut_sentinel TSentinelTests.BSControllerCantChangeStatus
ydb/core/debug_tools/ut OperationLog.ConcurrentWrites
ydb/core/persqueue/ut [31/40]*
ydb/core/persqueue/ut TopicSplitMerge.PartitionSplit
ydb/core/persqueue/ut TPersQueueMirrorer.TestBasicRemote
ydb/core/persqueue/ut TPQTest.TestDirectReadHappyWay
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario
ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario
Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/mirrorer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
}
}
}

}

Y_UNIT_TEST(ValidStartStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
}

TReadSession::~TReadSession() {
Close(TDuration::Zero());

{
TDeferredActions<true> deferred;
NYql::TIssues issues;
Expand Down
31 changes: 30 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ class TRawPartitionStreamEventQueue {
(NotReady.empty() ? Ready : NotReady).pop_back();
}

void clear() noexcept {
NotReady.clear();
Ready.clear();
}

void SignalReadyEvents(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
TReadSessionEventsQueue<UseMigrationProtocol>& queue,
TDeferredActions<UseMigrationProtocol>& deferred);
Expand All @@ -539,6 +544,7 @@ class TRawPartitionStreamEventQueue {
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator,
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>>& queue);

private:
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady;
};
Expand Down Expand Up @@ -719,6 +725,14 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {

void DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred);

void ClearQueue() noexcept {
EventsQueue.clear();
}

TRawPartitionStreamEventQueue<UseMigrationProtocol> ExtractQueue() noexcept {
return std::move(EventsQueue);
}

static void GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
size_t& maxEventsCount,
size_t& maxByteSize,
Expand Down Expand Up @@ -776,14 +790,27 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti

bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
TWaiter waiter;
TVector<TRawPartitionStreamEventQueue<UseMigrationProtocol>> deferredDelete;
with_lock (TParent::Mutex) {
if (TParent::Closed)
if (TParent::Closed) {
return false;
}
deferredDelete.reserve(TParent::Events.size());
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
if (!event.IsEmpty()) {
deferredDelete.push_back(event.PartitionStream->ExtractQueue());
}
TParent::Events.pop();
}
TParent::CloseEvent = event;
TParent::Closed = true;
waiter = TWaiter(TParent::Waiter.ExtractPromise(), this);
}

// Delayed deletion is necessary to avoid deadlock with PushEvent
deferredDelete.clear();

TReadSessionEventInfo<UseMigrationProtocol> info(event);
ApplyHandler(info, deferred);
deferred.DeferSignalWaiter(std::move(waiter));
Expand Down Expand Up @@ -959,6 +986,8 @@ class TSingleClusterReadSessionImpl : public NPersQueue::TEnableSelfContext<TSin
{
}

~TSingleClusterReadSessionImpl();

void Start();
void ConfirmPartitionStreamCreate(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
void ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail(TDe
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TSingleClusterReadSessionImpl

template<bool UseMigrationProtocol>
TSingleClusterReadSessionImpl<UseMigrationProtocol>::~TSingleClusterReadSessionImpl() {
for (auto&& [_, partitionStream] : PartitionStreams) {
partitionStream->ClearQueue();
}
}


template<bool UseMigrationProtocol>
TStringBuilder TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetLogPrefix() const {
return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] ";
Expand Down Expand Up @@ -1057,6 +1065,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
PartitionStreams[partitionStream->GetAssignId()];
if (currentPartitionStream) {
CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId());

bool pushRes = EventsQueue->PushEvent(
currentPartitionStream,
TReadSessionEvent::TPartitionStreamClosedEvent(
Expand Down
2 changes: 2 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
}

TReadSession::~TReadSession() {
Close(TDuration::Zero());

Abort(EStatus::ABORTED, "Aborted");
ClearAllEvents();

Expand Down

0 comments on commit d563efb

Please sign in to comment.