Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,14 @@ struct TEvPQ {
std::optional<TRowVersion> HeartbeatVersion;
};

TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite)
TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite, std::optional<ui64> initialSeqNo)
: Cookie(cookie)
, MessageNo(messageNo)
, OwnerCookie(ownerCookie)
, Offset(offset)
, Msgs(std::move(msgs))
, IsDirectWrite(isDirectWrite)
, InitialSeqNo(initialSeqNo)
{}

ui64 Cookie;
Expand All @@ -231,6 +232,7 @@ struct TEvPQ {
TMaybe<ui64> Offset;
TVector<TMsg> Msgs;
bool IsDirectWrite;
std::optional<ui64> InitialSeqNo;

};

Expand Down Expand Up @@ -939,12 +941,6 @@ struct TEvPQ {
NKikimrClient::TPersQueueFetchResponse Response;
};

struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> {
};

struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> {
};

struct TEvRegisterDirectReadSession : public TEventLocal<TEvRegisterDirectReadSession, EvRegisterDirectReadSession> {
TEvRegisterDirectReadSession(const NPQ::TReadSessionKey& sessionKey, ui32 tabletGeneration)
: Session(sessionKey)
Expand Down
105 changes: 29 additions & 76 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/counters.h>
#include <ydb/core/base/path.h>
#include <ydb/core/quoter/public/quoter.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
Expand Down Expand Up @@ -482,8 +483,6 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)

Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());

SourceManager.PassAway();

Die(ctx);
}

Expand Down Expand Up @@ -935,50 +934,33 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
}

void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) {
SourceManager.EnsureSourceIds(ev->Get()->SourceIds);
MaxSeqNoRequests.emplace_back(ev);
ProcessMaxSeqNoRequest(ctx);
}

void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) {
PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size());

while(!MaxSeqNoRequests.empty()) {
auto& ev = MaxSeqNoRequests.front();

auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
NKikimrClient::TResponse& resp = *response->Response;

resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);

auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
for (const auto& sourceId : ev->Get()->SourceIds) {
auto& protoInfo = *result.AddSourceIdInfo();
protoInfo.SetSourceId(sourceId);
auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
NKikimrClient::TResponse& resp = *response->Response;

auto info = SourceManager.Get(sourceId);
if (!info) {
PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId);
return;
}
if (info.State == TSourceIdInfo::EState::Unknown) {
continue;
}
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);

Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
for (const auto& sourceId : ev->Get()->SourceIds) {
auto& protoInfo = *result.AddSourceIdInfo();
protoInfo.SetSourceId(sourceId);

protoInfo.SetSeqNo(info.SeqNo);
protoInfo.SetOffset(info.Offset);
protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
protoInfo.SetExplicit(info.Explicit);
protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
auto info = SourceManager.Get(sourceId);
if (info.State == TSourceIdInfo::EState::Unknown) {
continue;
}

ctx.Send(Tablet, response.Release());
MaxSeqNoRequests.pop_front();
Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);

protoInfo.SetSeqNo(info.SeqNo);
protoInfo.SetOffset(info.Offset);
protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
protoInfo.SetExplicit(info.Explicit);
protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
}

ctx.Send(Tablet, response.Release());
}

void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
Expand Down Expand Up @@ -2612,42 +2594,6 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
}
}

void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

if (Partition != record.GetPartition()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." <<
" Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << "."
);
return;
}

auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds();

auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>();
for(auto& sourceId : record.GetSourceId()) {
auto* s = response->Record.AddSource();
s->SetId(sourceId);

auto it = memoryStorage.find(sourceId);
if (it != memoryStorage.end()) {
auto& info = it->second;
s->SetState(Convert(info.State));
s->SetSeqNo(info.SeqNo);
s->SetOffset(info.Offset);
s->SetExplicit(info.Explicit);
s->SetWriteTimestamp(info.WriteTimestamp.GetValue());
} else {
s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown);
}
}

Send(ev->Sender, response.Release());
}

void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

Expand All @@ -2664,6 +2610,13 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);

if (record.HasSourceId()) {
auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(record.GetSourceId()));
if (sit != SourceIdStorage.GetInMemorySourceIds().end()) {
response->Record.SetSeqNo(sit->second.SeqNo);
}
}

Send(ev->Sender, response.Release());
}

Expand Down
6 changes: 0 additions & 6 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
void CheckIfSessionExists(TUserInfoBase& userInfo, const TActorId& newPipe);
// void DestroyReadSession(const TReadSessionKey& key);

void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);

TString LogPrefix() const;
Expand Down Expand Up @@ -482,9 +481,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
Expand Down Expand Up @@ -540,9 +537,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
Expand Down Expand Up @@ -613,7 +608,6 @@ class TPartition : public TActorBootstrapped<TPartition> {

std::deque<TMessage> Requests;
std::deque<TMessage> Responses;
std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests;

THead Head;
THead NewHead;
Expand Down
Loading