diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 168507dd45d9..2a39356ea18a 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1277,6 +1277,16 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) } txSourceIds.insert(s.first); } + auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); + + if (!inFlightIter.IsEnd()) { + if (s.second.MinSeqNo <= inFlightIter->second) { + tx.Predicate = false; + tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first; + tx.WriteInfoApplied = true; + break; + } + } auto existing = knownSourceIds.find(s.first); if (existing.IsEnd()) @@ -1291,9 +1301,6 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) if (ret == EProcessResult::Continue && tx.Predicate.GetOrElse(true)) { TxAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end()); - // A temporary solution. This line should be deleted when we fix the error with the SeqNo promotion. - WriteAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end()); - tx.WriteInfoApplied = true; WriteKeysSizeEstimate += tx.WriteInfo->BodyKeys.size(); WriteKeysSizeEstimate += tx.WriteInfo->SrcIdInfo.size(); @@ -2389,6 +2396,13 @@ void TPartition::CommitWriteOperations(TTransaction& t) if (!t.WriteInfo) { return; } + for (const auto& s : t.WriteInfo->SrcIdInfo) { + auto [iter, ins] = TxInflightMaxSeqNoPerSourceId.emplace(s.first, s.second.SeqNo); + if (!ins) { + Y_ABORT_UNLESS(iter->second < s.second.SeqNo); + iter->second = s.second.SeqNo; + } + } const auto& ctx = ActorContext(); if (!HaveWriteMsg) { @@ -2404,6 +2418,8 @@ void TPartition::CommitWriteOperations(TTransaction& t) ", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size()); PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead); + auto oldHeadOffset = NewHead.Offset; + if (!t.WriteInfo->BodyKeys.empty()) { bool needCompactHead = (Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0; @@ -2492,12 +2508,16 @@ void TPartition::CommitWriteOperations(TTransaction& t) WriteInflightSize += msg.Msg.Data.size(); ExecRequest(msg, *Parameters, PersistRequest.Get()); - - auto& info = TxSourceIdForPostPersist[blob.SourceId]; - info.SeqNo = blob.SeqNo; - info.Offset = NewHead.Offset; } } + for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) { + auto& sourceIdBatch = Parameters->SourceIdBatch; + auto sourceId = sourceIdBatch.GetSource(srcId); + sourceId.Update(info.SeqNo, info.Offset + oldHeadOffset, CurrentTimestamp); + auto& persistInfo = TxSourceIdForPostPersist[srcId]; + persistInfo.SeqNo = info.SeqNo; + persistInfo.Offset = info.Offset + oldHeadOffset; + } Parameters->FirstCommitWriteOperations = false; diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 1cfa15852463..cdda09f3c6d6 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -665,6 +665,8 @@ class TPartition : public TActorBootstrapped { THashSet TxAffectedConsumers; THashSet SetOffsetAffectedConsumers; THashMap TxSourceIdForPostPersist; + THashMap TxInflightMaxSeqNoPerSourceId; + ui32 MaxBlobSize; const ui32 TotalLevels = 4; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index c590b3b6e4e8..74a824bdf779 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -315,6 +315,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { already ? maxOffset : offset, CurrentTimestamp, already, maxSeqNo, PartitionQuotaWaitTimeForCurrentBlob, TopicQuotaWaitTimeForCurrentBlob, queueTime, writeTime, response.Span ); + PQ_LOG_D("Answering for message sourceid: '" << EscapeC(s) << "', Topic: '" << TopicName() << "', Partition: " << Partition @@ -521,6 +522,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { SourceIdCounter.Use(sourceId, now); } TxSourceIdForPostPersist.clear(); + TxInflightMaxSeqNoPerSourceId.clear(); TxAffectedSourcesIds.clear(); WriteAffectedSourcesIds.clear(); @@ -1042,6 +1044,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); return EProcessResult::ContinueDrop; } + if (DiskIsFull) { ScheduleReplyError(p.Cookie, NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, @@ -1051,6 +1054,13 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { if (TxAffectedSourcesIds.contains(p.Msg.SourceId)) { return EProcessResult::Blocked; } + auto inflightMaxSeqNo = TxInflightMaxSeqNoPerSourceId.find(p.Msg.SourceId); + + if (!inflightMaxSeqNo.IsEnd()) { + if (p.Msg.SeqNo <= inflightMaxSeqNo->second) { + return EProcessResult::Blocked; + } + } WriteAffectedSourcesIds.insert(p.Msg.SourceId); return EProcessResult::Continue; } @@ -1173,12 +1183,11 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey << ". Writing seqNo: " << sourceId.UpdatedSeqNo() << ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset ); - if (!p.Internal) { - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); - MsgsDiscarded.Inc(); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); - BytesDiscarded.Inc(p.Msg.Data.size()); - } + Y_ENSURE(!p.Internal); // No Already for transactions; + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); + MsgsDiscarded.Inc(); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); + BytesDiscarded.Inc(p.Msg.Data.size()); } else { if (!p.Internal) { TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); @@ -1225,6 +1234,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey ", must be at least " << curOffset, p, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); + return false; } @@ -1279,6 +1289,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey //this must not be happen - client sends gaps, fail this client till the end //now no changes will leak ctx.Send(Tablet, new TEvents::TEvPoisonPill()); + return false; } WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size(); @@ -1380,7 +1391,6 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey ++curOffset; PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); } - return true; } diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 80cf308775ce..5cf4f3c23c95 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -127,7 +127,6 @@ struct TTestContext { static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr& event, TDuration duration, TInstant& deadline) { if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) { - Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") { return true; } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index e7750e59d876..1fbe4e51c211 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -143,6 +143,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture { TMaybe Status; TMaybe ErrorCode; TMaybe Offset; + TMaybe AlreadyWritten; + TMaybe SeqNo; }; struct TErrorMatcher { @@ -679,7 +681,6 @@ void TPartitionFixture::WaitProxyResponse(const TProxyResponseMatcher& matcher) { auto event = Ctx->Runtime->GrabEdgeEvent(); UNIT_ASSERT(event != nullptr); - if (matcher.Cookie) { UNIT_ASSERT_VALUES_EQUAL(*matcher.Cookie, event->Cookie); } @@ -699,6 +700,18 @@ void TPartitionFixture::WaitProxyResponse(const TProxyResponseMatcher& matcher) UNIT_ASSERT(event->Response->GetPartitionResponse().HasCmdGetClientOffsetResult()); UNIT_ASSERT_VALUES_EQUAL(*matcher.Offset, event->Response->GetPartitionResponse().GetCmdGetClientOffsetResult().GetOffset()); } + if (matcher.AlreadyWritten) { + UNIT_ASSERT(event->Response->HasPartitionResponse()); + UNIT_ASSERT_VALUES_EQUAL(event->Response->GetPartitionResponse().CmdWriteResultSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(*matcher.AlreadyWritten, + event->Response->GetPartitionResponse().GetCmdWriteResult(0).GetAlreadyWritten()); + } + if (matcher.SeqNo) { + UNIT_ASSERT(event->Response->HasPartitionResponse()); + UNIT_ASSERT_VALUES_EQUAL(event->Response->GetPartitionResponse().CmdWriteResultSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(*matcher.SeqNo, + event->Response->GetPartitionResponse().GetCmdWriteResult(0).GetSeqNo()); + } } void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher) @@ -1174,6 +1187,8 @@ void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) { return TTestActorRuntimeBase::EEventAction::DROP; } else if (auto* msg = ev->CastAsLocal()) { return TTestActorRuntimeBase::EEventAction::DROP; + } else if (auto* msg = ev->CastAsLocal()) { + return TTestActorRuntimeBase::EEventAction::PROCESS; } return TTestActorRuntimeBase::EEventAction::PROCESS; }); @@ -1374,7 +1389,7 @@ struct TTxBatchingTestParams { class TPartitionTxTestHelper : public TPartitionFixture { private: - auto AddWriteTxImpl(const TSrcIdMap& srcIdsAffected, ui64 txId, ui64 step); + auto AddWriteTxImpl(const TSrcIdMap& srcIdsAffected, ui64 txId, ui64 step, TMaybe&& blobFromHead = Nothing()); void AddWriteInfoObserver(bool success, const NPQ::TSourceIdMap& srcIdInfo, const TActorId& supportivePart); void SendWriteInfoResponseImpl(const TActorId& supportiveId, const TActorId& partitionId, bool status); @@ -1390,7 +1405,7 @@ class TPartitionTxTestHelper : public TPartitionFixture { THashMap ExpectedWriteInfoRequests; TQueue> RecievedWriteInfoRequests; TAdaptiveLock Lock; - THashMap WriteInfoData; + THashMap> WriteInfoData; TVector> Sessions; THashMap> Owners; @@ -1398,6 +1413,8 @@ class TPartitionTxTestHelper : public TPartitionFixture { void Init(const TTxBatchingTestParams& params = {}) { TxStep = params.TxStep; + Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); + Ctx->Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetEnableQuoting(false); Ctx->Runtime->SetObserverFunc([this](TAutoPtr& ev) { if (auto* msg = ev->CastAsLocal()) { @@ -1410,6 +1427,7 @@ class TPartitionTxTestHelper : public TPartitionFixture { BatchSizes.push_back(msg->BatchSize); } } else if (auto* msg = ev->CastAsLocal()) { + Cerr << "Got KV request\n"; with_lock(Lock) { HadKvRequest = true; } @@ -1443,7 +1461,7 @@ class TPartitionTxTestHelper : public TPartitionFixture { } ui64 AddAndSendNormalWrite(const TString& srcId, ui64 startSeqnNo, ui64 lastSeqNo); - ui64 MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected); + ui64 MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected, TMaybe&& blobFromHead = Nothing()); ui64 MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected); ui64 MakeAndSendNormalOffsetCommit(ui64 client, ui64 offset); ui64 MakeAndSendTxOffsetCommit(ui64 client, ui64 begin, ui64 end); @@ -1509,16 +1527,15 @@ void TPartitionTxTestHelper::SendWriteInfoResponseImpl(const TActorId& supportiv return; } NPQ::TSourceIdMap SrcIds; - auto* reply = new TEvPQ::TEvGetWriteInfoResponse(); auto iter = this->WriteInfoData.find(supportiveId); Y_ABORT_UNLESS(!iter.IsEnd()); - reply->SrcIdInfo = iter->second; + auto& reply = iter->second; reply->BytesWrittenTotal = 1; reply->BytesWrittenGrpc = 1; reply->BytesWrittenUncompressed = 1; reply->MessagesWrittenTotal = 1; reply->MessagesWrittenGrpc = 1; - SendEvent(reply, supportiveId, partitionId); + SendEvent(reply.Release(), supportiveId, partitionId); } void TPartitionTxTestHelper::WaitWriteInfoRequest(ui64 userActId, bool autoRespond) { @@ -1703,7 +1720,6 @@ ui64 TPartitionTxTestHelper::AddAndSendNormalWrite( msg.ReceiveTimestamp = TMonotonic::Now().Seconds(); msg.DisableDeduplication = false; msg.Data = data; - msg.Data = data; msg.UncompressedSize = data.size(); msg.External = false; msg.IgnoreQuotaDeadline = false; @@ -1723,7 +1739,7 @@ ui64 TPartitionTxTestHelper::AddAndSendNormalWrite( return id; } -auto TPartitionTxTestHelper::AddWriteTxImpl(const TSrcIdMap& srcIdsAffected, ui64 txId, ui64 step) { +auto TPartitionTxTestHelper::AddWriteTxImpl(const TSrcIdMap& srcIdsAffected, ui64 txId, ui64 step, TMaybe&& blobFromHead) { auto id = NextActId++; TTestUserAct act{.IsImmediateTx = (step != 0), .TxId = txId, .SupportivePartitionId = CreateFakePartition()}; NPQ::TSourceIdMap srcIdMap; @@ -1734,15 +1750,19 @@ auto TPartitionTxTestHelper::AddWriteTxImpl(const TSrcIdMap& srcIdsAffected, ui6 srcIdMap.emplace(key, std::move(srcInfo)); } auto iter = UserActs.insert(std::make_pair(id, act)).first; - + auto ev = MakeHolder(); + ev->SrcIdInfo = std::move(srcIdMap); + if (blobFromHead.Defined()) { + ev->BlobsFromHead.emplace_back(std::move(blobFromHead.GetRef())); + } with_lock(Lock) { - WriteInfoData.emplace(act.SupportivePartitionId, std::move(srcIdMap)); + WriteInfoData.emplace(act.SupportivePartitionId, std::move(ev)); } return iter; } -ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected) { - auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, TxStep); +ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected, TMaybe&& blobFromHead) { + auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, TxStep, std::move(blobFromHead)); auto event = MakeHolder(TxStep, actIter->second.TxId); event->SupportivePartitionActor = actIter->second.SupportivePartitionId; Cerr << "Create distr tx with id = " << actIter->second.TxId << " and act no: " << actIter->first << Endl; @@ -2653,7 +2673,7 @@ Y_UNIT_TEST_F(DataTxCalcPredicateOk, TPartitionTxTestHelper) WaitProxyResponse({.Cookie=cookie}); Cerr << "Wait third predicate result " << Endl; - auto tx3 = MakeAndSendWriteTx({{"src1", {1, 10}}, {"SourceId", {6, 10}}}); + auto tx3 = MakeAndSendWriteTx({{"src1", {12, 20}}, {"SourceId", {6, 10}}}); WaitWriteInfoRequest(tx3, true); WaitTxPredicateReply(tx3); SendTxCommit(tx3); @@ -2753,9 +2773,6 @@ Y_UNIT_TEST_F(TestTxBatchInFederation, TPartitionTxTestHelper) { } Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { - // A temporary solution. This line should be deleted when we fix the error with the SeqNo promotion. - return; - TTxBatchingTestParams params {.WriterSessions{"src1", "src4"},.EndOffset=1}; Init(std::move(params)); @@ -2837,9 +2854,6 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { } Y_UNIT_TEST_F(ConflictingTxProceedAfterRollback, TPartitionTxTestHelper) { - // A temporary solution. This line should be deleted when we fix the error with the SeqNo promotion. - return; - Init(); auto tx1 = MakeAndSendWriteTx({{"src1", {1, 3}}, {"src2", {5, 10}}}); @@ -2866,6 +2880,122 @@ Y_UNIT_TEST_F(ConflictingTxProceedAfterRollback, TPartitionTxTestHelper) { WaitImmediateTxComplete(immTx, true); } +Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { + TTxBatchingTestParams params {.WriterSessions{"src1"}}; + Init(std::move(params)); + + auto tx1 = MakeAndSendWriteTx({{"src1", {1, 5}}}); + auto tx2 = MakeAndSendWriteTx({{"src1", {6, 10}}}); + auto tx3 = MakeAndSendWriteTx({{"src1", {2, 11}}}); + auto tx4 = MakeAndSendWriteTx({{"src1", {8, 15}}}); + + WaitWriteInfoRequest(tx1, true); + WaitWriteInfoRequest(tx2, true); + WaitWriteInfoRequest(tx3, true); + WaitWriteInfoRequest(tx4, true); + WaitTxPredicateReply(tx1); + + Cerr << "Wait batch of 1 completion\n"; + SendTxCommit(tx1); + WaitBatchCompletion(1); + Cerr << "Expect no KV request\n"; + ExpectNoKvRequest(); + WaitTxPredicateReply(tx2); + SendTxCommit(tx2); + + Cerr << "Waif or tx 3 predicate failure\n"; + WaitTxPredicateFailure(tx3); + Cerr << "Waif or tx 4 predicate failure\n"; + WaitTxPredicateFailure(tx4); + + + Cerr << "Wait batch of 3 completion\n"; + WaitBatchCompletion(1); // Immediate Tx 2 - 4. + Cerr << "Expect no KV request\n"; + ExpectNoKvRequest(); + SendTxRollback(tx3); + SendTxRollback(tx4); + WaitBatchCompletion(2); // Immediate Tx 2 - 4. + + ExpectNoCommitDone(); + WaitKvRequest(); + SendKvResponse(); + Cerr << "Wait for commits\n"; + WaitCommitDone(tx1); + WaitCommitDone(tx2); +} + +Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelper) { + TTxBatchingTestParams params {.WriterSessions{"src1"}, .EndOffset = 1}; + Init(std::move(params)); + + auto tx1 = MakeAndSendWriteTx({{"src1", {1, 3}},}); + auto tx2 = MakeAndSendWriteTx({{"src1", {2, 4}}}); + auto tx3 = MakeAndSendWriteTx({{"src1", {4, 6}}}); + AddAndSendNormalWrite("src1", 1, 1); + AddAndSendNormalWrite("src1", 7, 7); + AddAndSendNormalWrite("src1", 7, 7); + + + WaitWriteInfoRequest(tx1, true); + WaitWriteInfoRequest(tx2, true); + WaitWriteInfoRequest(tx3, true); + WaitTxPredicateReply(tx1); + + SendTxCommit(tx1); + WaitBatchCompletion(1); + + ExpectNoKvRequest(); + WaitTxPredicateFailure(tx2); + WaitTxPredicateReply(tx3); + SendTxRollback(tx2); + SendTxCommit(tx3); + WaitBatchCompletion(2); // Tx 2 & 3. + ExpectNoCommitDone(); + WaitKvRequest(); + SendKvResponse(); + WaitCommitDone(tx1); + WaitCommitDone(tx3); + WaitBatchCompletion(3); + WaitKvRequest(); + SendKvResponse(); + WaitProxyResponse({.AlreadyWritten=true, .SeqNo=1}); + WaitProxyResponse({.AlreadyWritten=false, .SeqNo=7}); + WaitProxyResponse({.AlreadyWritten=true, .SeqNo=7}); +} + +Y_UNIT_TEST_F(ConflictingSrcIdForTxWithHead, TPartitionTxTestHelper) { + TTxBatchingTestParams params {.WriterSessions{"src1"}, .EndOffset=1}; + Init(std::move(params)); + + NPQ::TClientBlob clientBlob("src1", 10, "valuevalue", TMaybe(), TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 0, "123", "123"); + + auto tx1 = MakeAndSendWriteTx({{"src1", {1, 10}}}, std::move(clientBlob)); + AddAndSendNormalWrite("src1", 8, 8); + AddAndSendNormalWrite("src1", 10, 10); + AddAndSendNormalWrite("src1", 11, 11); + + + WaitWriteInfoRequest(tx1, true); + WaitTxPredicateReply(tx1); + + SendTxCommit(tx1); + WaitBatchCompletion(1); + Cerr << "Wait 1st KV request\n"; + WaitKvRequest(); + SendKvResponse(); + WaitCommitDone(tx1); + WaitBatchCompletion(3); + Cerr << "Wait 2nd KV request\n"; + WaitKvRequest(); + SendKvResponse(); + WaitProxyResponse({.AlreadyWritten=true, .SeqNo=8}); + WaitProxyResponse({.AlreadyWritten=true, .SeqNo=10}); + WaitProxyResponse({.AlreadyWritten=false, .SeqNo=11}); + + //WaitProxyResponse() +} + class TBatchingConditionsTest { TPartitionTxTestHelper* TxHelper; ui64 SeqNo = 1; @@ -2892,12 +3022,14 @@ class TBatchingConditionsTest { } ui64 AddTx() { - return TxHelper->MakeAndSendWriteTx({{SrcId, {SeqNo, SeqNo}}}); + auto ret = TxHelper->MakeAndSendWriteTx({{SrcId, {SeqNo, SeqNo}}}); SeqNo++; + return ret; } ui64 AddImmediateTx() { - return TxHelper->MakeAndSendImmediateTx({{SrcId, {SeqNo, SeqNo}}}); + auto ret = TxHelper->MakeAndSendImmediateTx({{SrcId, {SeqNo, SeqNo}}}); SeqNo++; + return ret; } void AddNormalWrite() { TxHelper->AddAndSendNormalWrite(SrcId, SeqNo, SeqNo); @@ -2994,6 +3126,7 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { WaitCommitDone(tx); WaitImmediateTxComplete(immTx, true); } + } Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { Init({.WriterSessions={"src1", "src2"}, .EndOffset = 1});