Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,16 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
}
txSourceIds.insert(s.first);
}
auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first);

if (!inFlightIter.IsEnd()) {
if (s.second.MinSeqNo <= inFlightIter->second) {
tx.Predicate = false;
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
tx.WriteInfoApplied = true;
break;
}
}

auto existing = knownSourceIds.find(s.first);
if (existing.IsEnd())
Expand All @@ -1291,9 +1301,6 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
if (ret == EProcessResult::Continue && tx.Predicate.GetOrElse(true)) {
TxAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end());

// A temporary solution. This line should be deleted when we fix the error with the SeqNo promotion.
WriteAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end());

tx.WriteInfoApplied = true;
WriteKeysSizeEstimate += tx.WriteInfo->BodyKeys.size();
WriteKeysSizeEstimate += tx.WriteInfo->SrcIdInfo.size();
Expand Down Expand Up @@ -2389,6 +2396,13 @@ void TPartition::CommitWriteOperations(TTransaction& t)
if (!t.WriteInfo) {
return;
}
for (const auto& s : t.WriteInfo->SrcIdInfo) {
auto [iter, ins] = TxInflightMaxSeqNoPerSourceId.emplace(s.first, s.second.SeqNo);
if (!ins) {
Y_ABORT_UNLESS(iter->second < s.second.SeqNo);
iter->second = s.second.SeqNo;
}
}
const auto& ctx = ActorContext();

if (!HaveWriteMsg) {
Expand All @@ -2404,6 +2418,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);

auto oldHeadOffset = NewHead.Offset;

if (!t.WriteInfo->BodyKeys.empty()) {
bool needCompactHead =
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;
Expand Down Expand Up @@ -2492,12 +2508,16 @@ void TPartition::CommitWriteOperations(TTransaction& t)

WriteInflightSize += msg.Msg.Data.size();
ExecRequest(msg, *Parameters, PersistRequest.Get());

auto& info = TxSourceIdForPostPersist[blob.SourceId];
info.SeqNo = blob.SeqNo;
info.Offset = NewHead.Offset;
}
}
for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) {
auto& sourceIdBatch = Parameters->SourceIdBatch;
auto sourceId = sourceIdBatch.GetSource(srcId);
sourceId.Update(info.SeqNo, info.Offset + oldHeadOffset, CurrentTimestamp);
auto& persistInfo = TxSourceIdForPostPersist[srcId];
persistInfo.SeqNo = info.SeqNo;
persistInfo.Offset = info.Offset + oldHeadOffset;
}

Parameters->FirstCommitWriteOperations = false;

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
THashSet<TString> TxAffectedConsumers;
THashSet<TString> SetOffsetAffectedConsumers;
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
THashMap<TString, ui64> TxInflightMaxSeqNoPerSourceId;


ui32 MaxBlobSize;
const ui32 TotalLevels = 4;
Expand Down
24 changes: 17 additions & 7 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
already ? maxOffset : offset, CurrentTimestamp, already, maxSeqNo,
PartitionQuotaWaitTimeForCurrentBlob, TopicQuotaWaitTimeForCurrentBlob, queueTime, writeTime, response.Span
);

PQ_LOG_D("Answering for message sourceid: '" << EscapeC(s)
<< "', Topic: '" << TopicName()
<< "', Partition: " << Partition
Expand Down Expand Up @@ -521,6 +522,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
SourceIdCounter.Use(sourceId, now);
}
TxSourceIdForPostPersist.clear();
TxInflightMaxSeqNoPerSourceId.clear();

TxAffectedSourcesIds.clear();
WriteAffectedSourcesIds.clear();
Expand Down Expand Up @@ -1042,6 +1044,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return EProcessResult::ContinueDrop;
}

if (DiskIsFull) {
ScheduleReplyError(p.Cookie,
NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL,
Expand All @@ -1051,6 +1054,13 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
if (TxAffectedSourcesIds.contains(p.Msg.SourceId)) {
return EProcessResult::Blocked;
}
auto inflightMaxSeqNo = TxInflightMaxSeqNoPerSourceId.find(p.Msg.SourceId);

if (!inflightMaxSeqNo.IsEnd()) {
if (p.Msg.SeqNo <= inflightMaxSeqNo->second) {
return EProcessResult::Blocked;
}
}
WriteAffectedSourcesIds.insert(p.Msg.SourceId);
return EProcessResult::Continue;
}
Expand Down Expand Up @@ -1173,12 +1183,11 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
<< ". Writing seqNo: " << sourceId.UpdatedSeqNo()
<< ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset
);
if (!p.Internal) {
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
MsgsDiscarded.Inc();
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
BytesDiscarded.Inc(p.Msg.Data.size());
}
Y_ENSURE(!p.Internal); // No Already for transactions;
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
MsgsDiscarded.Inc();
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
BytesDiscarded.Inc(p.Msg.Data.size());
} else {
if (!p.Internal) {
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
Expand Down Expand Up @@ -1225,6 +1234,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
", must be at least " << curOffset,
p,
NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);

return false;
}

Expand Down Expand Up @@ -1279,6 +1289,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
//this must not be happen - client sends gaps, fail this client till the end
//now no changes will leak
ctx.Send(Tablet, new TEvents::TEvPoisonPill());

return false;
}
WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size();
Expand Down Expand Up @@ -1380,7 +1391,6 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
++curOffset;
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
}

return true;
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ struct TTestContext {

static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration duration, TInstant& deadline) {
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") {
return true;
}
Expand Down
Loading
Loading