@@ -247,6 +247,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
247247 void TrySendGetNextBatch (SessionInfo& sessionInfo);
248248 template <class TEventPtr >
249249 bool CheckSession (SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
250+ void SendStopSession (const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
250251};
251252
252253TDqPqRdReadActor::TDqPqRdReadActor (
@@ -308,12 +309,14 @@ void TDqPqRdReadActor::ProcessState() {
308309 }
309310 State = EState::WAIT_PARTITIONS_ADDRES;
310311 auto partitionToRead = GetPartitionsToRead ();
311- SRC_LOG_I (" Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString () << " , partIds: " << JoinSeq (" , " , partitionToRead));
312+ auto cookie = ++CoordinatorRequestCookie;
313+ SRC_LOG_I (" Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString () << " , partIds: "
314+ << JoinSeq (" , " , partitionToRead) << " cookie " << cookie);
312315 Send (
313316 *CoordinatorActorId,
314317 new NFq::TEvRowDispatcher::TEvCoordinatorRequest (SourceParams, partitionToRead),
315318 IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
316- ++CoordinatorRequestCookie );
319+ cookie );
317320 return ;
318321 }
319322 case EState::WAIT_PARTITIONS_ADDRES:
@@ -327,7 +330,7 @@ void TDqPqRdReadActor::ProcessState() {
327330 TPartitionKey partitionKey{TString{}, partitionId};
328331 const auto offsetIt = PartitionToOffset.find (partitionKey);
329332 if (offsetIt != PartitionToOffset.end ()) {
330- SRC_LOG_D (" readOffset found" );
333+ SRC_LOG_D (" ReadOffset found" );
331334 readOffset = offsetIt->second ;
332335 }
333336
@@ -439,8 +442,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e
439442 auto sessionIt = Sessions.find (partitionId);
440443 if (sessionIt == Sessions.end ()) {
441444 SRC_LOG_W (" Ignore TEvStartSessionAck from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
442- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
445+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev-> Cookie );
443446 YQL_ENSURE (State != EState::STARTED);
447+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
444448 return ;
445449 }
446450 auto & sessionInfo = sessionIt->second ;
@@ -458,8 +462,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
458462 auto sessionIt = Sessions.find (partitionId);
459463 if (sessionIt == Sessions.end ()) {
460464 SRC_LOG_W (" Ignore TEvSessionError from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
461- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
465+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev-> Cookie );
462466 YQL_ENSURE (State != EState::STARTED);
467+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
463468 return ;
464469 }
465470
@@ -472,14 +477,15 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
472477
473478void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
474479 const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get ()->Record .GetTransportMeta ();
475- SRC_LOG_D (" TEvStatistics from " << ev->Sender << " , offset " << ev->Get ()->Record .GetNextMessageOffset () << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
480+ SRC_LOG_T (" TEvStatistics from " << ev->Sender << " , offset " << ev->Get ()->Record .GetNextMessageOffset () << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
476481
477482 ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
478483 auto sessionIt = Sessions.find (partitionId);
479484 if (sessionIt == Sessions.end ()) {
480485 SRC_LOG_W (" Ignore TEvStatistics from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
481486 << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
482487 YQL_ENSURE (State != EState::STARTED);
488+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
483489 return ;
484490 }
485491 auto & sessionInfo = sessionIt->second ;
@@ -505,6 +511,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
505511 SRC_LOG_W (" Ignore TEvNewDataArrived from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
506512 << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
507513 YQL_ENSURE (State != EState::STARTED);
514+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
508515 return ;
509516 }
510517
@@ -552,7 +559,8 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e
552559 auto sessionIt = Sessions.find (partitionId);
553560 if (sessionIt == Sessions.end ()) {
554561 SRC_LOG_W (" Ignore TEvHeartbeat from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
555- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
562+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev->Cookie );
563+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
556564 return ;
557565 }
558566 CheckSession (sessionIt->second , ev, partitionId);
@@ -596,21 +604,23 @@ void TDqPqRdReadActor::Stop(const TString& message) {
596604}
597605
598606void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) {
599- SRC_LOG_D (" TEvCoordinatorResult from " << ev->Sender .ToString () << " , cookie " << ev->Cookie );
607+ SRC_LOG_I (" TEvCoordinatorResult from " << ev->Sender .ToString () << " , cookie " << ev->Cookie );
600608 if (ev->Cookie != CoordinatorRequestCookie) {
601609 SRC_LOG_W (" Ignore TEvCoordinatorResult. wrong cookie" );
602610 return ;
603611 }
612+ if (State != EState::WAIT_PARTITIONS_ADDRES) {
613+ SRC_LOG_W (" Ignore TEvCoordinatorResult. wrong state " << static_cast <ui64>(EState::WAIT_PARTITIONS_ADDRES));
614+ return ;
615+ }
604616 for (auto & p : ev->Get ()->Record .GetPartitions ()) {
605617 TActorId rowDispatcherActorId = ActorIdFromProto (p.GetActorId ());
606- SRC_LOG_D (" rowDispatcherActorId:" << rowDispatcherActorId);
607-
608618 for (auto partitionId : p.GetPartitionId ()) {
609- SRC_LOG_D (" partitionId:" << partitionId);
610619 if (Sessions.contains (partitionId)) {
611620 Stop (" Internal error: session already exists" );
612621 return ;
613622 }
623+ SRC_LOG_I (" Create session to RD (" << rowDispatcherActorId << " ), partitionId " << partitionId);
614624 Sessions.emplace (
615625 std::piecewise_construct,
616626 std::forward_as_tuple (partitionId),
@@ -655,8 +665,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
655665 auto sessionIt = Sessions.find (partitionId);
656666 if (sessionIt == Sessions.end ()) {
657667 SRC_LOG_W (" Ignore TEvMessageBatch from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
658- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
668+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev-> Cookie );
659669 YQL_ENSURE (State != EState::STARTED);
670+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
660671 return ;
661672 }
662673
@@ -705,6 +716,12 @@ std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TStrin
705716}
706717
707718void TDqPqRdReadActor::Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
719+ if (State != EState::STARTED) {
720+ if (!Sessions.empty ()) {
721+ Stop (TStringBuilder () << " Internal error: wrong state on TEvSessionClosed, session size " << Sessions.size () << " state " << static_cast <ui64>(State));
722+ }
723+ return ;
724+ }
708725 ReInit (TStringBuilder () << " Session closed, event queue id " << ev->Get ()->EventQueueId );
709726}
710727
@@ -754,10 +771,7 @@ template <class TEventPtr>
754771bool TDqPqRdReadActor::CheckSession (SessionInfo& session, const TEventPtr& ev, ui64 partitionId) {
755772 if (ev->Cookie != session.Generation ) {
756773 SRC_LOG_W (" Wrong message generation (" << typeid (TEventPtr).name () << " ), sender " << ev->Sender << " cookie " << ev->Cookie << " , session generation " << session.Generation << " , send TEvStopSession" );
757- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
758- *event->Record .MutableSource () = SourceParams;
759- event->Record .SetPartitionId (partitionId);
760- Send (ev->Sender , event.release (), 0 , ev->Cookie );
774+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
761775 return false ;
762776 }
763777 if (!session.EventsQueue .OnEventReceived (ev)) {
@@ -768,6 +782,13 @@ bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, u
768782 return true ;
769783}
770784
785+ void TDqPqRdReadActor::SendStopSession (const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
786+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
787+ *event->Record .MutableSource () = SourceParams;
788+ event->Record .SetPartitionId (partitionId);
789+ Send (recipient, event.release (), 0 , cookie);
790+ }
791+
771792std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor (
772793 NPq::NProto::TDqPqTopicSource&& settings,
773794 ui64 inputIndex,
0 commit comments