diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 97770b38528e..6d4ded2774d2 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -14,10 +14,6 @@ ydb/core/client/ut TClientTest.ReadFromFollower ydb/core/client/ut TFlatTest.AutoSplitMergeQueue ydb/core/cms/ut_sentinel TSentinelTests.BSControllerCantChangeStatus ydb/core/debug_tools/ut OperationLog.ConcurrentWrites -ydb/core/persqueue/ut [31/40]* -ydb/core/persqueue/ut TopicSplitMerge.PartitionSplit -ydb/core/persqueue/ut TPersQueueMirrorer.TestBasicRemote -ydb/core/persqueue/ut TPQTest.TestDirectReadHappyWay ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp index 0790f673a133..08f1af2c66e6 100644 --- a/ydb/core/persqueue/ut/mirrorer_ut.cpp +++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp @@ -251,7 +251,6 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) { } } } - } Y_UNIT_TEST(ValidStartStream) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 12d7e86b53cb..13a5517ff6c2 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -57,6 +57,8 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, } TReadSession::~TReadSession() { + Close(TDuration::Zero()); + { TDeferredActions deferred; NYql::TIssues issues; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 93b23a5749a2..49f40cf3a704 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -518,6 +518,11 @@ class TRawPartitionStreamEventQueue { (NotReady.empty() ? Ready : NotReady).pop_back(); } + void clear() noexcept { + NotReady.clear(); + Ready.clear(); + } + void SignalReadyEvents(TIntrusivePtr> stream, TReadSessionEventsQueue& queue, TDeferredActions& deferred); @@ -539,6 +544,7 @@ class TRawPartitionStreamEventQueue { TUserRetrievedEventsInfoAccumulator& accumulator, std::deque>& queue); +private: std::deque> Ready; std::deque> NotReady; }; @@ -719,6 +725,14 @@ class TPartitionStreamImpl : public TAPartitionStream { void DeleteNotReadyTail(TDeferredActions& deferred); + void ClearQueue() noexcept { + EventsQueue.clear(); + } + + TRawPartitionStreamEventQueue ExtractQueue() noexcept { + return std::move(EventsQueue); + } + static void GetDataEventImpl(TIntrusivePtr> partitionStream, size_t& maxEventsCount, size_t& maxByteSize, @@ -776,14 +790,27 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue& event, TDeferredActions& deferred) { TWaiter waiter; + TVector> deferredDelete; with_lock (TParent::Mutex) { - if (TParent::Closed) + if (TParent::Closed) { return false; + } + deferredDelete.reserve(TParent::Events.size()); + while (!TParent::Events.empty()) { + auto& event = TParent::Events.front(); + if (!event.IsEmpty()) { + deferredDelete.push_back(event.PartitionStream->ExtractQueue()); + } + TParent::Events.pop(); + } TParent::CloseEvent = event; TParent::Closed = true; waiter = TWaiter(TParent::Waiter.ExtractPromise(), this); } + // Delayed deletion is necessary to avoid deadlock with PushEvent + deferredDelete.clear(); + TReadSessionEventInfo info(event); ApplyHandler(info, deferred); deferred.DeferSignalWaiter(std::move(waiter)); @@ -959,6 +986,8 @@ class TSingleClusterReadSessionImpl : public NPersQueue::TEnableSelfContext* partitionStream, TMaybe readOffset, TMaybe commitOffset); void ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index d04d5fd843ed..c973b0441eef 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -225,6 +225,14 @@ void TRawPartitionStreamEventQueue::DeleteNotReadyTail(TDe //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TSingleClusterReadSessionImpl +template +TSingleClusterReadSessionImpl::~TSingleClusterReadSessionImpl() { + for (auto&& [_, partitionStream] : PartitionStreams) { + partitionStream->ClearQueue(); + } +} + + template TStringBuilder TSingleClusterReadSessionImpl::GetLogPrefix() const { return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] "; @@ -1057,6 +1065,7 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( PartitionStreams[partitionStream->GetAssignId()]; if (currentPartitionStream) { CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId()); + bool pushRes = EventsQueue->PushEvent( currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent( diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index 949abf9ab11c..92fd3b210fe1 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -34,6 +34,8 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, } TReadSession::~TReadSession() { + Close(TDuration::Zero()); + Abort(EStatus::ABORTED, "Aborted"); ClearAllEvents();