From 2a518291ce6e849ddb570479f2729bdccb03cf7a Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Wed, 13 Mar 2024 16:18:44 +0500 Subject: [PATCH 1/2] Fix Verify in WriteSessionActor (#2651) --- .../deprecated/persqueue_v0/grpc_pq_write_actor.cpp | 7 +++++-- ydb/services/persqueue_v1/actors/write_session_actor.ipp | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index e29b4bc4f304..196b65ec8b66 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -310,7 +310,10 @@ void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& db void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) { - Y_ABORT_UNLESS(State == ES_WAIT_SCHEME || State == ES_INITED); + if (State != ES_WAIT_SCHEME && State != ES_INITED) { + return CloseSession("erroneous internal state", NPersQueue::NErrorCode::ERROR, ctx); + } + auto& res = ev->Get()->Result; Y_ABORT_UNLESS(res->ResultSet.size() == 1); @@ -865,7 +868,7 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) { void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) { if (State != ES_INITED) { - return; + return CloseSession("erroneous internal state", NPersQueue::NErrorCode::ERROR, ctx); } auto now = ctx.Now(); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index b76ab8fb24a7..f7e11244db8e 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1503,7 +1503,10 @@ void TWriteSessionActor::Handle(TEvents::TEvWakeup::TPtr& template void TWriteSessionActor::RecheckACL(const TActorContext& ctx) { - Y_ABORT_UNLESS(State == ES_INITED); + if (State != ES_INITED) { + LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "WriteSessionActor state is wrong. Actual state '" << (int)State << "'"); + return CloseSession("erroneous internal state", PersQueue::ErrorCode::ERROR, ctx); + } auto now = ctx.Now(); From 6ace007f9bb79096b51f8610d072004d5c07490e Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Wed, 13 Mar 2024 13:04:14 +0000 Subject: [PATCH 2/2] fix --- ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h | 2 ++ ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp | 5 +++++ ydb/services/persqueue_v1/actors/write_session_actor.h | 2 ++ ydb/services/persqueue_v1/actors/write_session_actor.ipp | 4 ++++ 4 files changed, 13 insertions(+) diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h index 0d5e472ca443..cb1ca35ecd86 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h @@ -554,6 +554,8 @@ class TWriteSessionActor : public NActors::TActorBootstrapped { diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index 196b65ec8b66..6172305476af 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -506,6 +506,11 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont } void TWriteSessionActor::CloseSession(const TString& errorReason, const NPersQueue::NErrorCode::EErrorCode errorCode, const NActors::TActorContext& ctx) { + if (SessionClosed) { + return; + } + SessionClosed = true; + if (errorCode != NPersQueue::NErrorCode::OK) { if (InternalErrorCode(errorCode)) { SLIErrors.Inc(); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 2d69db971d7f..17c47aab7837 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -271,6 +271,8 @@ class TWriteSessionActor TActorId PartitionWriterCache; TActorId PartitionChooser; + + bool SessionClosed = false; }; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index f7e11244db8e..825e342b7ead 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -724,6 +724,10 @@ void TWriteSessionActor::DestroyPartitionWriterCache(const template void TWriteSessionActor::CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) { + if (SessionClosed) { + return; + } + SessionClosed = true; if (errorCode != PersQueue::ErrorCode::OK) {