Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
void ProcessChangeOwnerRequests(const TActorContext& ctx);
void ProcessHasDataRequests(const TActorContext& ctx);
bool ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx);
void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
void ProcessReserveRequests(const TActorContext& ctx);
void ProcessTimestampRead(const TActorContext& ctx);
Expand Down
56 changes: 33 additions & 23 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,32 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
return res;
}

bool TPartition::ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx) {
auto sendResponse = [&](ui64 lagSize, bool readingFinished) {
auto response = MakeHasDataInfoResponse(lagSize, request.Cookie, readingFinished);
ctx.Send(request.Sender, response.Release());
};

if (!IsActive()) {
if (request.Offset < EndOffset && (!request.ReadTimestamp || *request.ReadTimestamp <= EndWriteTimestamp)) {
sendResponse(GetSizeLag(request.Offset), false);
} else {
sendResponse(0, true);

auto now = ctx.Now();
auto& userInfo = UsersInfoStorage->GetOrCreate(request.ClientId, ctx);
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
}
} else if (request.Offset < EndOffset) {
sendResponse(GetSizeLag(request.Offset), false);
} else {
return false;
}

return true;
}


void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
if (!InitDone) {
return;
Expand All @@ -150,13 +176,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
};

for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
ctx.Send(request->Sender, response.Release());
} else if (!IsActive()) {
auto response = MakeHasDataInfoResponse(0, request->Cookie, true);
ctx.Send(request->Sender, response.Release());
} else {
if (!ProcessHasDataRequest(*request, ctx)) {
break;
}

Expand Down Expand Up @@ -185,32 +205,22 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
auto& record = ev->Get()->Record;
Y_ABORT_UNLESS(record.HasSender());

auto now = ctx.Now();

auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero(), ctx);

TActorId sender = ActorIdFromProto(record.GetSender());
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
ctx.Send(sender, response.Release());
} else if (InitDone && !IsActive()) {
auto now = ctx.Now();

auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};

auto response = MakeHasDataInfoResponse(0, cookie, true);
ctx.Send(sender, response.Release());
} else {
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
if (!InitDone || !ProcessHasDataRequest(req, ctx)) {
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
auto res = HasDataRequests.insert(req);
auto res = HasDataRequests.insert(std::move(req));
HasDataDeadlines.insert(dl);
Y_ABORT_UNLESS(res.second);

if (InitDone && record.HasClientId() && !record.GetClientId().empty()) {
auto now = ctx.Now();

auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
++userInfo.Subscriptions;
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct TPartition::THasDataReq {
TMaybe<TInstant> ReadTimestamp;

bool operator < (const THasDataReq& req) const {
return Num < req.Num;
return std::tuple(Offset, Num) < std::tuple(req.Offset, req.Num);
}
};

Expand Down
Loading