diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 3647ba43393b..989b7ed6be49 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -451,20 +451,28 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return WriteResult(ErrorCode, "Rejected by writer", MakeResponse(cookie)); } - void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { + bool HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { auto& record = ev->Get()->Record; const auto cookie = record.GetPartitionRequest().GetCookie(); - Y_ABORT_UNLESS(Pending.empty() || Pending.rbegin()->first < cookie); - Y_ABORT_UNLESS(PendingReserve.empty() || PendingReserve.rbegin()->first < cookie); - Y_ABORT_UNLESS(PendingWrite.empty() || PendingWrite.back() < cookie); + auto pendingValid = (Pending.empty() || Pending.rbegin()->first < cookie); + auto reserveValid = (PendingReserve.empty() || PendingReserve.rbegin()->first < cookie); + auto writeValid = (PendingWrite.empty() || PendingWrite.back() < cookie); + + if (!(pendingValid && reserveValid && writeValid)) { + ERROR("The cookie of WriteRequest is invalid. Cookie=" << cookie); + Disconnected(EErrorCode::InternalError); + return false; + } Pending.emplace(cookie, std::move(ev->Get()->Record)); + return true; } void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev, const TActorContext& ctx) { - HoldPending(ev); - ReserveBytes(ctx); + if (HoldPending(ev)) { + ReserveBytes(ctx); + } } void ReserveBytes(const TActorContext& ctx) { @@ -514,10 +522,18 @@ class TPartitionWriter: public TActorBootstrapped, private TRl } void EnqueueReservedAndProcess(ui64 cookie) { - Y_ABORT_UNLESS(!PendingReserve.empty()); + if(PendingReserve.empty()) { + ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #01"); + Disconnected(EErrorCode::InternalError); + return; + } auto it = PendingReserve.begin(); - Y_ABORT_UNLESS(it->first == cookie); + if(it->first != cookie) { + ERROR("The order of reservation is invalid. Cookie=" << cookie << ", ReserveCookie=" << it->first); + Disconnected(EErrorCode::InternalError); + return; + } ReceivedReserve.emplace(it->first, std::move(it->second)); @@ -582,11 +598,20 @@ class TPartitionWriter: public TActorBootstrapped, private TRl } void Write(ui64 cookie) { - Y_ABORT_UNLESS(!PendingReserve.empty()); + if (PendingReserve.empty()) { + ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02"); + Disconnected(EErrorCode::InternalError); + return; + } auto it = PendingReserve.begin(); - Y_ABORT_UNLESS(it->first == cookie); - Y_ABORT_UNLESS(PendingWrite.empty() || PendingWrite.back() < cookie); + auto cookieReserveValid = (it->first == cookie); + auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie); + if (!(cookieReserveValid && cookieWriteValid)) { + ERROR("The cookie of Write is invalid. Cookie=" << cookie); + Disconnected(EErrorCode::InternalError); + return; + } Write(cookie, std::move(it->second.Request)); @@ -634,7 +659,11 @@ class TPartitionWriter: public TActorBootstrapped, private TRl WriteAccepted(cookie); - Y_ABORT_UNLESS(!PendingReserve.empty()); + if (PendingReserve.empty()) { + ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03"); + Disconnected(EErrorCode::InternalError); + return; + } auto it = PendingReserve.begin(); auto& holder = it->second;