Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,20 +451,28 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return WriteResult(ErrorCode, "Rejected by writer", MakeResponse(cookie));
}

void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
bool HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
auto& record = ev->Get()->Record;
const auto cookie = record.GetPartitionRequest().GetCookie();

Y_ABORT_UNLESS(Pending.empty() || Pending.rbegin()->first < cookie);
Y_ABORT_UNLESS(PendingReserve.empty() || PendingReserve.rbegin()->first < cookie);
Y_ABORT_UNLESS(PendingWrite.empty() || PendingWrite.back() < cookie);
auto pendingValid = (Pending.empty() || Pending.rbegin()->first < cookie);
auto reserveValid = (PendingReserve.empty() || PendingReserve.rbegin()->first < cookie);
auto writeValid = (PendingWrite.empty() || PendingWrite.back() < cookie);

if (!(pendingValid && reserveValid && writeValid)) {
ERROR("The cookie of WriteRequest is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return false;
}

Pending.emplace(cookie, std::move(ev->Get()->Record));
return true;
}

void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev, const TActorContext& ctx) {
HoldPending(ev);
ReserveBytes(ctx);
if (HoldPending(ev)) {
ReserveBytes(ctx);
}
}

void ReserveBytes(const TActorContext& ctx) {
Expand Down Expand Up @@ -514,10 +522,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}

void EnqueueReservedAndProcess(ui64 cookie) {
Y_ABORT_UNLESS(!PendingReserve.empty());
if(PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #01");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();

Y_ABORT_UNLESS(it->first == cookie);
if(it->first != cookie) {
ERROR("The order of reservation is invalid. Cookie=" << cookie << ", ReserveCookie=" << it->first);
Disconnected(EErrorCode::InternalError);
return;
}

ReceivedReserve.emplace(it->first, std::move(it->second));

Expand Down Expand Up @@ -582,11 +598,20 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}

void Write(ui64 cookie) {
Y_ABORT_UNLESS(!PendingReserve.empty());
if (PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();

Y_ABORT_UNLESS(it->first == cookie);
Y_ABORT_UNLESS(PendingWrite.empty() || PendingWrite.back() < cookie);
auto cookieReserveValid = (it->first == cookie);
auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
if (!(cookieReserveValid && cookieWriteValid)) {
ERROR("The cookie of Write is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return;
}

Write(cookie, std::move(it->second.Request));

Expand Down Expand Up @@ -634,7 +659,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

WriteAccepted(cookie);

Y_ABORT_UNLESS(!PendingReserve.empty());
if (PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();
auto& holder = it->second;

Expand Down