Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void TCompletionLogWrite::Release(TActorSystem *actorSystem) {

TCompletionChunkReadPart::TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, ui64 rawReadSize,
ui64 payloadReadSize, ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart,
const TControlWrapper& useT1ha0Hasher)
const TControlWrapper& useT1ha0Hasher, NWilson::TSpan&& span)
: TCompletionAction()
, PDisk(pDisk)
, Read(read)
Expand All @@ -127,6 +127,7 @@ TCompletionChunkReadPart::TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<
, Buffer(PDisk->BufferPool->Pop())
, IsTheLastPart(isTheLastPart)
, UseT1ha0Hasher(useT1ha0Hasher)
, Span(std::move(span))
{
if (!IsTheLastPart) {
CumulativeCompletion->AddPart();
Expand All @@ -145,6 +146,7 @@ TBuffer *TCompletionChunkReadPart::GetBuffer() {
}

void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
auto execSpan = Span.CreateChild(TWilson::PDisk, "PDisk.CompletionChunkReadPart.Exec");
Y_ABORT_UNLESS(actorSystem);
Y_ABORT_UNLESS(CumulativeCompletion);
if (TCompletionAction::Result != EIoResult::Ok) {
Expand Down Expand Up @@ -279,6 +281,8 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {

AtomicSub(PDisk->InFlightChunkRead, RawReadSize);
RawReadSize = 0;
execSpan.EndOk();
Span.EndOk();
delete this;
}

Expand All @@ -289,6 +293,7 @@ void TCompletionChunkReadPart::Release(TActorSystem *actorSystem) {
}
AtomicSub(PDisk->InFlightChunkRead, RawReadSize);
RawReadSize = 0;
Span.EndError("release");
delete this;
}

Expand All @@ -301,6 +306,7 @@ TCompletionChunkRead::~TCompletionChunkRead() {
}

void TCompletionChunkRead::Exec(TActorSystem *actorSystem) {
auto execSpan = Span.CreateChild(TWilson::PDisk, "PDisk.CompletionChunkRead.Exec");
THolder<TEvChunkReadResult> result = MakeHolder<TEvChunkReadResult>(NKikimrProto::OK,
Read->ChunkIdx, Read->Offset, Read->Cookie, PDisk->GetStatusFlags(Read->Owner, Read->OwnerGroupType), "");
result->Data = std::move(CommonBuffer);
Expand All @@ -321,6 +327,8 @@ void TCompletionChunkRead::Exec(TActorSystem *actorSystem) {
actorSystem->Send(Read->Sender, result.Release());
Read->IsReplied = true;
PDisk->Mon.GetReadCounter(Read->PriorityClass)->CountResponse();
execSpan.EndOk();
Span.EndOk();
delete this;
}

Expand Down
17 changes: 14 additions & 3 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ class TCompletionChunkWrite : public TCompletionAction {
ui8 PriorityClass;
std::function<void()> OnDestroy;
TReqId ReqId;
NWilson::TSpan Span;

public:
TCompletionChunkWrite(const TActorId &recipient, TEvChunkWriteResult *event,
TPDiskMon *mon, ui32 pdiskId, NHPTimer::STime startTime, size_t sizeBytes,
ui8 priorityClass, std::function<void()> onDestroy, TReqId reqId)
ui8 priorityClass, std::function<void()> onDestroy, TReqId reqId,
NWilson::TSpan&& span)
: Recipient(recipient)
, Event(event)
, Mon(mon)
Expand All @@ -89,6 +91,7 @@ class TCompletionChunkWrite : public TCompletionAction {
, PriorityClass(priorityClass)
, OnDestroy(std::move(onDestroy))
, ReqId(reqId)
, Span(std::move(span))
{
}

Expand All @@ -97,6 +100,7 @@ class TCompletionChunkWrite : public TCompletionAction {
}

void Exec(TActorSystem *actorSystem) override {
auto execSpan = Span.CreateChild(TWilson::PDisk, "PDisk.CompletionChunkWrite.Exec");
double responseTimeMs = HPMilliSecondsFloat(HPNow() - StartTime);
LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK,
"PDiskId# " << PDiskId << " ReqId# " << ReqId
Expand All @@ -111,13 +115,16 @@ class TCompletionChunkWrite : public TCompletionAction {
if (Mon) {
Mon->GetWriteCounter(PriorityClass)->CountResponse();
}
execSpan.EndOk();
Span.EndOk();
delete this;
}

void Release(TActorSystem *actorSystem) override {
Event->Status = NKikimrProto::CORRUPTED;
Event->ErrorReason = ErrorReason;
actorSystem->Send(Recipient, Event.Release());
Span.EndError(ErrorReason);
delete this;
}
};
Expand Down Expand Up @@ -160,11 +167,12 @@ class TCompletionChunkRead : public TCompletionAction {
TAtomic Deletes;
std::function<void()> OnDestroy;
ui64 ChunkNonce;
NWilson::TSpan Span;

const ui64 DoubleFreeCanary;
public:
TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
ui64 chunkNonce)
ui64 chunkNonce, NWilson::TSpan&& span)
: TCompletionAction()
, PDisk(pDisk)
, Read(read)
Expand All @@ -174,6 +182,7 @@ class TCompletionChunkRead : public TCompletionAction {
, Deletes(0)
, OnDestroy(std::move(onDestroy))
, ChunkNonce(chunkNonce)
, Span(std::move(span))
, DoubleFreeCanary(ReferenceCanary)
{}

Expand Down Expand Up @@ -202,6 +211,7 @@ class TCompletionChunkRead : public TCompletionAction {

void Release(TActorSystem *actorSystem) override {
ReplyError(actorSystem, "TCompletionChunkRead is released");
Span.EndError("release");
}
};

Expand All @@ -215,10 +225,11 @@ class TCompletionChunkReadPart : public TCompletionAction {
TBuffer::TPtr Buffer;
bool IsTheLastPart;
TControlWrapper UseT1ha0Hasher;
NWilson::TSpan Span;
public:
TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, ui64 rawReadSize, ui64 payloadReadSize,
ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart,
const TControlWrapper& useT1ha0Hasher);
const TControlWrapper& useT1ha0Hasher, NWilson::TSpan&& span);


bool CanHandleResult() const override {
Expand Down
Loading