Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,18 +1226,25 @@ void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorCont

void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
const ui64 cookie = ev->Get()->GetCookie();
Y_ABORT_UNLESS(ReadInfo.contains(cookie));

auto it = ReadInfo.find(cookie);
Y_ABORT_UNLESS(it != ReadInfo.end());

// If there is no such cookie, then read was canceled.
// For example, it can be after consumer deletion
if (it == ReadInfo.end()) {
return;
}

TReadInfo info = std::move(it->second);
ReadInfo.erase(it);

//make readinfo class
auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx);
auto* userInfo = UsersInfoStorage->GetIfExists(info.User);
if (!userInfo) {
ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(info.User));
OnReadRequestFinished(info.Destination, 0, info.User, ctx);
}

TReadAnswer answer(info.FormAnswer(
ctx, *ev->Get(), EndOffset, Partition, &userInfo,
ctx, *ev->Get(), EndOffset, Partition, userInfo,
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode()
));
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
Expand Down Expand Up @@ -2428,6 +2435,20 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
}

UsersInfoStorage->Remove(user, ctx);

// Finish all ongoing reads
std::unordered_set<ui64> readCookies;
for (auto& [cookie, info] : ReadInfo) {
if (info.User == user) {
readCookies.insert(cookie);
ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(user));
OnReadRequestFinished(info.Destination, 0, user, ctx);
}
}
for (ui64 cookie : readCookies) {
ReadInfo.erase(cookie);
}

Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);
void RemovePendingRequests(TMessageQueue& requests);
void RemoveMessagesToQueue(TMessageQueue& requests);
static TString GetConsumerDeletedMessage(TStringBuf consumerName);

private:
ui64 TabletID;
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
auto* read = readEvent->Get();
const TString& user = read->ClientId;
auto userInfo = UsersInfoStorage->GetIfExists(user);
if(!userInfo) {
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition");
if (!userInfo) {
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(read->ClientId));
Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
OnReadRequestFinished(read->Cookie, 0, user, ctx);
return;
Expand Down Expand Up @@ -1026,4 +1026,8 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
ctx.Send(BlobCache, request.Release());
}

TString TPartition::GetConsumerDeletedMessage(TStringBuf consumerName) {
return TStringBuilder() << "cannot finish read request. Consumer " << consumerName << " is gone from partition";
}

} // namespace NKikimr::NPQ
94 changes: 94 additions & 0 deletions ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2326,6 +2326,100 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) {
});
}

Y_UNIT_TEST(TestReadAndDeleteConsumer) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
return tc.InitialEventsFilter.Prepare();
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
TFinalizer finalizer(tc);
tc.Prepare(dispatchName, setup, activeZone);
activeZone = false;
tc.Runtime->SetScheduledLimit(2000);
tc.Runtime->SetScheduledEventFilter(&tc.ImmediateLogFlushAndRequestTimeoutFilter);

TVector<std::pair<ui64, TString>> data;
TString msg;
msg.resize(102400, 'a');
for (ui64 i = 1; i <= 1000; ++i) {
data.emplace_back(i, msg);
}

PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=1},
{{"user1", true}, {"user2", true}}, tc);
CmdWrite(0, "sourceid1", data, tc, false, {}, true);

// Reset tablet cache
PQTabletRestart(tc);

TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse* readResult = nullptr;
THolder<TEvPersQueue::TEvRequest> readRequest;
TEvPersQueue::TEvUpdateConfigResponse* consumerDeleteResult = nullptr;
THolder<TEvPersQueue::TEvUpdateConfig> consumerDeleteRequest;

// Read request
{
readRequest.Reset(new TEvPersQueue::TEvRequest);
auto req = readRequest->Record.MutablePartitionRequest();
req->SetPartition(0);
auto read = req->MutableCmdRead();
read->SetOffset(1);
read->SetClientId("user1");
read->SetCount(1);
read->SetBytes(1'000'000);
read->SetTimeoutMs(5000);
}

// Consumer delete request
{
consumerDeleteRequest.Reset(new TEvPersQueue::TEvUpdateConfig());
consumerDeleteRequest->MutableRecord()->SetTxId(42);
auto& cfg = *consumerDeleteRequest->MutableRecord()->MutableTabletConfig();
cfg.SetVersion(42);
cfg.AddPartitionIds(0);
cfg.AddPartitions()->SetPartitionId(0);
cfg.SetLocalDC(true);
cfg.SetTopic("topic");
auto& cons = *cfg.AddConsumers();
cons.SetName("user2");
cons.SetImportant(true);
}

TActorId edge = tc.Runtime->AllocateEdgeActor();

// Delete consumer during read request
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, readRequest.Release(), 0, GetPipeConfigWithRetries());

// Intercept TEvPQ::TEvBlobResponse event
std::vector<TEvPQ::TEvBlobResponse::TPtr> capturedBlobResponses;
auto captureBlobResponsesObserver = tc.Runtime->AddObserver<TEvPQ::TEvBlobResponse>([&](TEvPQ::TEvBlobResponse::TPtr& ev) {
capturedBlobResponses.emplace_back().Swap(ev);
});

// Delete consumer while read request is still in progress
tc.Runtime->SendToPipe(tc.TabletId, edge, consumerDeleteRequest.Release(), 0, GetPipeConfigWithRetries());
consumerDeleteResult = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
{
//Cerr << "Got consumer delete response: " << consumerDeleteResult->Record << Endl;
UNIT_ASSERT(consumerDeleteResult->Record.HasStatus());
UNIT_ASSERT_EQUAL(consumerDeleteResult->Record.GetStatus(), NKikimrPQ::EStatus::OK);
}

// Resend intercepted blob responses and wait for read result
captureBlobResponsesObserver.Remove();
for (auto& ev : capturedBlobResponses) {
tc.Runtime->Send(ev.Release(), 0, true);
}

readResult = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
{
//Cerr << "Got read response: " << readResult->Record << Endl;
UNIT_ASSERT(readResult->Record.HasStatus());
UNIT_ASSERT_EQUAL(readResult->Record.GetErrorCode(), NPersQueue::NErrorCode::BAD_REQUEST);
UNIT_ASSERT_STRING_CONTAINS_C(readResult->Record.GetErrorReason(), "Consumer user1 is gone from partition", readResult->Record.Utf8DebugString());
}
});
}


} // Y_UNIT_TEST_SUITE(TPQTest)
Expand Down
Loading