diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index f0694691f0a5..4d127aff8cc1 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1226,18 +1226,25 @@ void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorCont void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) { const ui64 cookie = ev->Get()->GetCookie(); - Y_ABORT_UNLESS(ReadInfo.contains(cookie)); - auto it = ReadInfo.find(cookie); - Y_ABORT_UNLESS(it != ReadInfo.end()); + + // If there is no such cookie, then read was canceled. + // For example, it can be after consumer deletion + if (it == ReadInfo.end()) { + return; + } TReadInfo info = std::move(it->second); ReadInfo.erase(it); - //make readinfo class - auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx); + auto* userInfo = UsersInfoStorage->GetIfExists(info.User); + if (!userInfo) { + ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(info.User)); + OnReadRequestFinished(info.Destination, 0, info.User, ctx); + } + TReadAnswer answer(info.FormAnswer( - ctx, *ev->Get(), EndOffset, Partition, &userInfo, + ctx, *ev->Get(), EndOffset, Partition, userInfo, info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode() )); const auto& resp = dynamic_cast(answer.Event.Get())->Response; @@ -2428,6 +2435,20 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) } UsersInfoStorage->Remove(user, ctx); + + // Finish all ongoing reads + std::unordered_set readCookies; + for (auto& [cookie, info] : ReadInfo) { + if (info.User == user) { + readCookies.insert(cookie); + ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(user)); + OnReadRequestFinished(info.Destination, 0, user, ctx); + } + } + for (ui64 cookie : readCookies) { + ReadInfo.erase(cookie); + } + Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); } } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index c954012caa66..31315dbc7f0b 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -623,6 +623,7 @@ class TPartition : public TActorBootstrapped { static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst); void RemovePendingRequests(TMessageQueue& requests); void RemoveMessagesToQueue(TMessageQueue& requests); + static TString GetConsumerDeletedMessage(TStringBuf consumerName); private: ui64 TabletID; diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 387b51ae1938..4c7abf54e4f2 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -748,8 +748,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim auto* read = readEvent->Get(); const TString& user = read->ClientId; auto userInfo = UsersInfoStorage->GetIfExists(user); - if(!userInfo) { - ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition"); + if (!userInfo) { + ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(read->ClientId)); Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); OnReadRequestFinished(read->Cookie, 0, user, ctx); return; @@ -1026,4 +1026,8 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u ctx.Send(BlobCache, request.Release()); } +TString TPartition::GetConsumerDeletedMessage(TStringBuf consumerName) { + return TStringBuilder() << "cannot finish read request. Consumer " << consumerName << " is gone from partition"; +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index abe6a4e7a0cb..aed439baa2fd 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -2326,6 +2326,100 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) { }); } +Y_UNIT_TEST(TestReadAndDeleteConsumer) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(2000); + tc.Runtime->SetScheduledEventFilter(&tc.ImmediateLogFlushAndRequestTimeoutFilter); + + TVector> data; + TString msg; + msg.resize(102400, 'a'); + for (ui64 i = 1; i <= 1000; ++i) { + data.emplace_back(i, msg); + } + + PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=1}, + {{"user1", true}, {"user2", true}}, tc); + CmdWrite(0, "sourceid1", data, tc, false, {}, true); + + // Reset tablet cache + PQTabletRestart(tc); + + TAutoPtr handle; + TEvPersQueue::TEvResponse* readResult = nullptr; + THolder readRequest; + TEvPersQueue::TEvUpdateConfigResponse* consumerDeleteResult = nullptr; + THolder consumerDeleteRequest; + + // Read request + { + readRequest.Reset(new TEvPersQueue::TEvRequest); + auto req = readRequest->Record.MutablePartitionRequest(); + req->SetPartition(0); + auto read = req->MutableCmdRead(); + read->SetOffset(1); + read->SetClientId("user1"); + read->SetCount(1); + read->SetBytes(1'000'000); + read->SetTimeoutMs(5000); + } + + // Consumer delete request + { + consumerDeleteRequest.Reset(new TEvPersQueue::TEvUpdateConfig()); + consumerDeleteRequest->MutableRecord()->SetTxId(42); + auto& cfg = *consumerDeleteRequest->MutableRecord()->MutableTabletConfig(); + cfg.SetVersion(42); + cfg.AddPartitionIds(0); + cfg.AddPartitions()->SetPartitionId(0); + cfg.SetLocalDC(true); + cfg.SetTopic("topic"); + auto& cons = *cfg.AddConsumers(); + cons.SetName("user2"); + cons.SetImportant(true); + } + + TActorId edge = tc.Runtime->AllocateEdgeActor(); + + // Delete consumer during read request + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, readRequest.Release(), 0, GetPipeConfigWithRetries()); + + // Intercept TEvPQ::TEvBlobResponse event + std::vector capturedBlobResponses; + auto captureBlobResponsesObserver = tc.Runtime->AddObserver([&](TEvPQ::TEvBlobResponse::TPtr& ev) { + capturedBlobResponses.emplace_back().Swap(ev); + }); + + // Delete consumer while read request is still in progress + tc.Runtime->SendToPipe(tc.TabletId, edge, consumerDeleteRequest.Release(), 0, GetPipeConfigWithRetries()); + consumerDeleteResult = tc.Runtime->GrabEdgeEvent(handle); + { + //Cerr << "Got consumer delete response: " << consumerDeleteResult->Record << Endl; + UNIT_ASSERT(consumerDeleteResult->Record.HasStatus()); + UNIT_ASSERT_EQUAL(consumerDeleteResult->Record.GetStatus(), NKikimrPQ::EStatus::OK); + } + + // Resend intercepted blob responses and wait for read result + captureBlobResponsesObserver.Remove(); + for (auto& ev : capturedBlobResponses) { + tc.Runtime->Send(ev.Release(), 0, true); + } + + readResult = tc.Runtime->GrabEdgeEvent(handle); + { + //Cerr << "Got read response: " << readResult->Record << Endl; + UNIT_ASSERT(readResult->Record.HasStatus()); + UNIT_ASSERT_EQUAL(readResult->Record.GetErrorCode(), NPersQueue::NErrorCode::BAD_REQUEST); + UNIT_ASSERT_STRING_CONTAINS_C(readResult->Record.GetErrorReason(), "Consumer user1 is gone from partition", readResult->Record.Utf8DebugString()); + } + }); +} } // Y_UNIT_TEST_SUITE(TPQTest)