Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,22 @@ struct TEventTypeField {
NAMES("pdisk", "schedStep")) \
PROBE(PDiskChunkReadPieceAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui32, ui64, ui64), \
NAMES("pdisk", "pieceIdx", "size", "offset")) \
PROBE(PDiskChunkReadPieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64), \
NAMES("pdisk", "size")) \
NAMES("pdisk", "pieceIdx", "offset", "size")) \
PROBE(PDiskChunkReadPiecesSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField), \
NAMES("pdisk")) \
PROBE(PDiskChunkReadPieceComplete, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64), \
NAMES("pdisk", "size", "relativeOffset")) \
PROBE(PDiskAddWritePieceToScheduler, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64, double), \
NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \
PROBE(PDiskChunkWriteAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, double, ui64, bool, ui64, ui64), \
NAMES("pdisk", "reqId", "creationTimeSec", "owner", "isFast", "priorityClass", "size")) \
PROBE(PDiskChunkWritePieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
PROBE(PDiskChunkWriteLastPieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64, ui64, ui64), \
NAMES("pdisk", "owner", "chunkIdx", "pieceOffset", "pieceSize")) \
PROBE(PDiskLogWriteComplete, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, double, double, double, double, double, double), \
NAMES("pdisk", "reqId", "creationTimeSec", "costMs", "responseTimeMs", "inputTimeMs", "scheduleTimeMs", "deviceTotalTimeMs")) \
TYPES(TPDiskIdField, ui64, double, double, double, double, double, double, double, ui64), \
NAMES("pdisk", "reqId", "creationTimeSec", "costMs", "responseTimeMs", "inputTimeMs", "scheduleTimeMs", "deviceTotalTimeMs", "deviceOnlyTimeMs", "batchSize")) \
PROBE(PDiskChunkResponseTime, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64, double, ui64), \
NAMES("pdisk", "reqId", "priorityClass", "responseTimeMs", "sizeBytes")) \
Expand All @@ -235,6 +235,12 @@ struct TEventTypeField {
PROBE(PDiskDeviceWriteDuration, GROUPS("PDisk"), \
TYPES(TPDiskIdField, double, ui64), \
NAMES("pdisk", "deviceTimeMs", "size")) \
PROBE(PDiskDeviceGetFromDevice, GROUPS("PDisk"), \
TYPES(), \
NAMES()) \
PROBE(PDiskDeviceGetFromWaiting, GROUPS("PDisk"), \
TYPES(), \
NAMES()) \
PROBE(PDiskDeviceTrimDuration, GROUPS("PDisk"), \
TYPES(TPDiskIdField, double, ui64), \
NAMES("pdisk", "trimTimeMs", "trimOffset")) \
Expand All @@ -255,8 +261,11 @@ struct TEventTypeField {
TYPES(ui32, ui64, ui64), \
NAMES("chunkIdx", "size", "offset")) \
PROBE(PDiskUpdateCycleDetails, GROUPS("PDisk"), \
TYPES(float, float, float, float, float), \
NAMES("entireUpdateMs", "inputQueueMs", "schedulingMs", "processingMs", "waitingMs")) \
TYPES(ui32, float, float, float, float, float), \
NAMES("pdisk", "entireUpdateMs", "inputQueueMs", "schedulingMs", "processingMs", "waitingMs")) \
PROBE(PDiskEnqueueAllDetails, GROUPS("PDisk"), \
TYPES(ui64, size_t, size_t, size_t, double), \
NAMES("pdisk", "initialQueueSize", "processedReqs", "pushedToForsetiReqs", "spentTimeMs")) \
PROBE(DSProxyGetEnqueue, GROUPS("DSProxy", "LWTrackStart"), TYPES(), NAMES()) \
PROBE(DSProxyGetBootstrap, GROUPS("DSProxy"), TYPES(), NAMES()) \
PROBE(DSProxyGetHandle, GROUPS("DSProxy", "LWTrackStart"), TYPES(), NAMES()) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ class TRealBlockDevice : public IBlockDevice {
EIoResult ret = EIoResult::TryAgain;
while (ret == EIoResult::TryAgain) {
action->SubmitTime = HPNow();
if (action->FlushAction) {
action->FlushAction->SubmitTime = action->SubmitTime;
}

if (op->GetType() == IAsyncIoOperation::EType::PWrite) {
PDISK_FAIL_INJECTION(1);
Expand Down Expand Up @@ -334,8 +337,8 @@ class TRealBlockDevice : public IBlockDevice {
class TSharedCallback : public ICallback {
ui64 NextPossibleNoop = 0;
ui64 EndOffset = 0;
ui64 PrevEventGotAtCycle = HPNow();
ui64 PrevEstimationAtCycle = HPNow();
NHPTimer::STime PrevEventGotAtCycle = HPNow();
NHPTimer::STime PrevEstimationAtCycle = HPNow();
ui64 PrevEstimatedCostNs = 0;
ui64 PrevActualCostNs = 0;

Expand Down Expand Up @@ -366,29 +369,32 @@ class TRealBlockDevice : public IBlockDevice {
void Exec(TAsyncIoOperationResult *event) {
IAsyncIoOperation *op = event->Operation;
// Add up the execution time of all the events
ui64 totalExecutionCycles = 0;
ui64 totalCostNs = 0;
ui64 eventGotAtCycle = HPNow();
NHPTimer::STime eventGotAtCycle = HPNow();
AtomicSet(Device.Mon.LastDoneOperationTimestamp, eventGotAtCycle);

TCompletionAction *completionAction = static_cast<TCompletionAction*>(op->GetCookie());
FillCompletionAction(completionAction, op, event->Result);
completionAction->GetTime = eventGotAtCycle;
LWTRACK(PDiskDeviceGetFromDevice, completionAction->Orbit);
if (completionAction->FlushAction) {
completionAction->FlushAction->GetTime = eventGotAtCycle;
LWTRACK(PDiskDeviceGetFromDevice, completionAction->FlushAction->Orbit);
}

Device.QuitCounter.Decrement();
Device.IdleCounter.Decrement();
Device.FlightControl.MarkComplete(completionAction->OperationIdx);

ui64 startCycle = Max((ui64)completionAction->SubmitTime, PrevEventGotAtCycle);
ui64 durationCycles = (eventGotAtCycle > startCycle) ? eventGotAtCycle - startCycle : 0;
totalExecutionCycles = Max(totalExecutionCycles, durationCycles);
totalCostNs += completionAction->CostNs;
NHPTimer::STime startCycle = Max(completionAction->SubmitTime, (i64)PrevEventGotAtCycle);
NHPTimer::STime durationCycles = (eventGotAtCycle > startCycle) ? eventGotAtCycle - startCycle : 0;
NHPTimer::STime totalExecutionCycles = durationCycles;
NHPTimer::STime totalCostNs = completionAction->CostNs;

bool isSeekExpected =
((ui64)completionAction->SubmitTime + Device.SeekCostNs / 25ull >= PrevEventGotAtCycle);
bool isSeekExpected = (completionAction->SubmitTime + (NHPTimer::STime)Device.SeekCostNs / 25ll >= PrevEventGotAtCycle);

const ui64 opSize = op->GetSize();
Device.DecrementMonInFlight(op->GetType(), opSize);
if (opSize == 0) {
if (opSize == 0) { // Special case for flush operation, which is a read operation with 0 bytes size
if (op->GetType() == IAsyncIoOperation::EType::PRead) {
Y_ABORT_UNLESS(WaitingNoops[completionAction->OperationIdx % MaxWaitingNoops] == nullptr);
WaitingNoops[completionAction->OperationIdx % MaxWaitingNoops] = completionAction;
Expand Down Expand Up @@ -433,6 +439,9 @@ class TRealBlockDevice : public IBlockDevice {
while (NextPossibleNoop < firstIncompleteIdx) {
ui64 i = NextPossibleNoop % MaxWaitingNoops;
if (WaitingNoops[i] && WaitingNoops[i]->OperationIdx == NextPossibleNoop) {
LWTRACK(PDiskDeviceGetFromWaiting, WaitingNoops[i]->Orbit);
double durationMs = HPMilliSecondsFloat(HPNow() - WaitingNoops[i]->GetTime);
Device.Mon.DeviceFlushDuration.Increment(durationMs);
Device.CompletionThread->Schedule(WaitingNoops[i]);
WaitingNoops[i] = nullptr;
}
Expand Down Expand Up @@ -557,6 +566,9 @@ class TRealBlockDevice : public IBlockDevice {
EIoResult ret = EIoResult::TryAgain;
while (ret == EIoResult::TryAgain) {
action->SubmitTime = HPNow();
if (action->FlushAction) {
action->FlushAction->SubmitTime = action->SubmitTime;
}
ret = Device.IoContext->Submit(op, Device.SharedCallback.Get());
if (ret == EIoResult::Ok) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace NKikimr::NPDisk {
struct TCompletionAction {
ui64 OperationIdx;
NHPTimer::STime SubmitTime;
NHPTimer::STime GetTime;
TCompletionAction *FlushAction = nullptr;
ui64 CostNs = 0;
NWilson::TTraceId TraceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include "blobstorage_pdisk_impl.h"
#include "blobstorage_pdisk_sectorrestorator.h"

constexpr size_t MAX_RESULTS_PER_BATCH = 50; // It took ~0.25ms in VDisk's handler to process such batch

namespace NKikimr {
namespace NPDisk {

Expand Down Expand Up @@ -53,7 +55,9 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) {
HPMilliSecondsFloat(now - evLog.CreationTime),
HPMilliSecondsFloat(evLog.InputTime - evLog.CreationTime),
HPMilliSecondsFloat(evLog.ScheduleTime - evLog.InputTime),
HPMilliSecondsFloat(now - evLog.ScheduleTime));
HPMilliSecondsFloat(now - evLog.ScheduleTime),
HPMilliSecondsFloat(GetTime - SubmitTime),
batch ? batch->Result->Results.size() : 0);
if (evLog.Result->Results) {
evLog.Result->Results.front().Orbit = std::move(evLog.Orbit);
}
Expand All @@ -65,7 +69,7 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) {
}
if (evLog.Result->Status == NKikimrProto::OK) {
if (batch) {
if (batch->Sender == evLog.Sender) {
if (batch->Sender == evLog.Sender && batch->Result->Results.size() < MAX_RESULTS_PER_BATCH) {
batch->Result->Results.push_back(std::move(evLog.Result->Results[0]));
} else {
sendResponse(batch);
Expand Down Expand Up @@ -275,7 +279,9 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
endBadUserOffset = 0xffffffff;
}

LWTRACK(PDiskChunkReadPieceComplete, Read->Orbit, PDisk->PDiskId, RawReadSize, CommonBufferOffset);
double deviceTimeMs = HPMilliSecondsFloat(GetTime - SubmitTime);
LWTRACK(PDiskChunkReadPieceComplete, Orbit, PDisk->PDiskId, RawReadSize, CommonBufferOffset, deviceTimeMs);
Read->Orbit.Join(Orbit);
CumulativeCompletion->PartReadComplete(actorSystem);
CumulativeCompletion = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TCompletionChunkWrite : public TCompletionAction {
Mon->IncrementResponseTime(PriorityClass, responseTimeMs, SizeBytes);
}
LWTRACK(PDiskChunkResponseTime, Orbit, PDiskId, ReqId.Id, PriorityClass, responseTimeMs, SizeBytes);
Event->Orbit = std::move(Orbit);
actorSystem->Send(Recipient, Event.Release());
if (Mon) {
Mon->GetWriteCounter(PriorityClass)->CountResponse();
Expand Down
40 changes: 30 additions & 10 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include <util/system/unaligned_mem.h>

constexpr size_t MAX_REQS_PER_CYCLE = 200; // 200 requests take ~0.2ms in EnqueueAll function

namespace NKikimr {
namespace NPDisk {

Expand Down Expand Up @@ -843,9 +845,6 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi

guard.Release();

LWTRACK(PDiskChunkWritePieceSendToDevice, evChunkWrite->Orbit, PDiskId, evChunkWrite->Owner, chunkIdx,
pieceShift, pieceSize);

ui32 bytesAvailable = pieceSize;
Y_ABORT_UNLESS(evChunkWrite->BytesWritten == pieceShift);
const ui32 count = evChunkWrite->PartsPtr->Size();
Expand Down Expand Up @@ -904,6 +903,9 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
LOG_INFO(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " chunkIdx# %" PRIu32
" was zero-padded after writing", (ui32)PDiskId, (ui32)chunkIdx);
}
LWTRACK(PDiskChunkWriteLastPieceSendToDevice, evChunkWrite->Orbit, PDiskId, evChunkWrite->Owner, chunkIdx,
pieceShift, pieceSize);

auto traceId = evChunkWrite->SpanStack.GetTraceId();
evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit);
writer.Flush(evChunkWrite->ReqId, &traceId, evChunkWrite->Completion.Release());
Expand Down Expand Up @@ -946,7 +948,7 @@ void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringSt
}

TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector,
ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId) {
ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) {
if (read->IsReplied) {
return ReadPieceResultOk;
}
Expand Down Expand Up @@ -1015,6 +1017,8 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
THolder<TCompletionChunkReadPart> completion(new TCompletionChunkReadPart(this, read, bytesToRead,
payloadBytesToRead, payloadOffset, read->FinalCompletion, isTheLastPart, Cfg->UseT1ha0HashInFooter, std::move(span)));
completion->CostNs = DriveModel.TimeForSizeNs(bytesToRead, read->ChunkIdx, TDriveModel::OP_TYPE_READ);
LWTRACK(PDiskChunkReadPiecesSendToDevice, orbit, PDiskId);
completion->Orbit = std::move(orbit);
Y_ABORT_UNLESS(bytesToRead <= completion->GetBuffer()->Size());
ui8 *data = completion->GetBuffer()->Data();
BlockDevice->PreadAsync(data, bytesToRead, readOffset, completion.Release(),
Expand Down Expand Up @@ -2279,7 +2283,7 @@ void TPDisk::ProcessChunkReadQueue() {
ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit - size);
ui64 reallyReadDiskBytes;
EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector + size / Format.SectorSize,
currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId());
currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId(), std::move(piece->Orbit));
isComplete = (result != ReadPieceResultInProgress);
// Read pieces is sliced previously and it is expected that ChunkReadPiece will read exactly
// currentLimit bytes
Expand Down Expand Up @@ -2941,7 +2945,6 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {

auto result = std::make_unique<TEvChunkWriteResult>(NKikimrProto::OK, ev.ChunkIdx, ev.Cookie,
GetStatusFlags(ev.Owner, ev.OwnerGroupType), TString());
result->Orbit = std::move(ev.Orbit);

++state.OperationsInProgress;
++ownerData.InFlight->ChunkWrites;
Expand Down Expand Up @@ -3185,7 +3188,7 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span));
piece->EstimateCost(DriveModel);
AddJobToForseti(cbs, piece, request->JobKind);
LWTRACK(PDiskAddWritePieceToScheduler, request->Orbit, PDiskId, request->ReqId.Id,
LWTRACK(PDiskChunkWriteAddToScheduler, request->Orbit, PDiskId, request->ReqId.Id,
HPSecondsFloat(HPNow() - request->CreationTime), request->Owner, request->IsFast,
request->PriorityClass, whole->TotalSize);
} else if (request->GetType() == ERequestType::RequestChunkRead) {
Expand All @@ -3206,7 +3209,8 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
// Schedule small job.
auto piece = new TChunkReadPiece(read, idx * smallJobSize,
smallJobSize * Format.SectorSize, false, std::move(span));
LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, idx, idx * smallJobSize,
read->Orbit.Fork(piece->Orbit);
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PDiskId, idx, idx * smallJobSize * Format.SectorSize,
smallJobSize * Format.SectorSize);
piece->EstimateCost(DriveModel);
piece->SelfPointer = piece;
Expand All @@ -3217,8 +3221,9 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
span.Attribute("is_last_piece", true);
auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize,
largeJobSize * Format.SectorSize, true, std::move(span));
LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, smallJobCount,
smallJobCount * smallJobSize, largeJobSize * Format.SectorSize);
read->Orbit.Fork(piece->Orbit);
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PDiskId, smallJobCount,
smallJobCount * smallJobSize * Format.SectorSize, largeJobSize * Format.SectorSize);
piece->EstimateCost(DriveModel);
piece->SelfPointer = piece;
AddJobToForseti(cbs, piece, request->JobKind);
Expand Down Expand Up @@ -3406,7 +3411,14 @@ void TPDisk::ProcessYardInitSet() {
}

void TPDisk::EnqueueAll() {
TInstant start = TInstant::Now();

TGuard<TMutex> guard(StateMutex);
size_t initialQueueSize = InputQueue.GetWaitingSize();
size_t processedReqs = 0;
size_t pushedToForsetiReqs = 0;


while (InputQueue.GetWaitingSize() > 0) {
TRequestBase* request = InputQueue.Pop();
AtomicSub(InputQueueCost, request->Cost);
Expand Down Expand Up @@ -3449,9 +3461,17 @@ void TPDisk::EnqueueAll() {
} else {
if (PreprocessRequest(request)) {
PushRequestToForseti(request);
++pushedToForsetiReqs;
}
}
++processedReqs;
if (processedReqs >= MAX_REQS_PER_CYCLE) {
break;
}
}

double spentTimeMs = (TInstant::Now() - start).MillisecondsFloat();
LWPROBE(PDiskEnqueueAllDetails, PDiskId, initialQueueSize, processedReqs, pushedToForsetiReqs, spentTimeMs);
}

void TPDisk::Update() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class TPDisk : public IPDisk {
void SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringStream& errorReason,
NKikimrProto::EReplyStatus status);
EChunkReadPieceResult ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit,
ui64 *reallyReadBytes, NWilson::TTraceId traceId);
ui64 *reallyReadBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit);
void SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJObSize, ui32 *outSmallJobCount);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Chunk locking
Expand Down
Loading