Skip to content

Commit

Permalink
Reintroduce light indicator for bursts to BsCostModel, ydb-platform#1336
Browse files Browse the repository at this point in the history
 (ydb-platform#1477)

* Move TLight to core/util, add BurstDetector to TBsCostTracker

* Add UT

* Count PDisk responses, fix UT build

* Update BurstDetector

* Fix UT

* Move TLight to core/util, add BurstDetector to TBsCostTracker

* Add UT

* Count PDisk responses, fix UT build

* Update BurstDetector

* Fix UT

* Fix BurstDetection UT

* Fix rounding in conversion from Ns to Us in PDisk mock device settings

* Update sector map performance parameters

* Fix UT (once again)

* Revert removing patch CostModel tests

* Convert seek time from Ns to Us correctly

* Update muted_ya.txt
  • Loading branch information
serbel324 authored and vporyadke committed Feb 8, 2024
1 parent 4fdf2f0 commit 6689d33
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 349 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ydb/core/blobstorage/dsproxy/ut TBlobStorageProxySequenceTest.TestBlock42PutWithChangingSlowDisk
ydb/core/blobstorage/dsproxy/ut_fat TBlobStorageProxyTest.TestBatchedPutRequestDoesNotContainAHugeBlob
ydb/core/blobstorage/ut_blobstorage CostMetricsGetBlock4Plus2.TestGet4Plus2BlockRequests10000Inflight1BlobSize1000
ydb/core/blobstorage/ut_blobstorage CostMetricsPatchMirror3dc.*
ydb/core/client/ut TClientTest.PromoteFollower
ydb/core/client/ut TClientTest.ReadFromFollower
ydb/core/client/ut TFlatTest.AutoSplitMergeQueue
Expand Down
265 changes: 1 addition & 264 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/mon/mon.h>
#include <ydb/core/protos/node_whiteboard.pb.h>
#include <ydb/core/util/light.h>

#include <library/cpp/bucket_quoter/bucket_quoter.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
Expand All @@ -14,270 +15,6 @@ namespace NKikimr {

struct TPDiskConfig;

inline NHPTimer::STime HPNow() {
NHPTimer::STime ret;
GetTimeFast(&ret);
return ret;
}

inline double HPSecondsFloat(i64 cycles) {
if (cycles > 0) {
return double(cycles) / NHPTimer::GetClockRate();
} else {
return 0.0;
}
}

inline double HPMilliSecondsFloat(i64 cycles) {
if (cycles > 0) {
return double(cycles) * 1000.0 / NHPTimer::GetClockRate();
} else {
return 0;
}
}

inline ui64 HPMilliSeconds(i64 cycles) {
return (ui64)HPMilliSecondsFloat(cycles);
}

inline ui64 HPMicroSecondsFloat(i64 cycles) {
if (cycles > 0) {
return double(cycles) * 1000000.0 / NHPTimer::GetClockRate();
} else {
return 0;
}
}

inline ui64 HPMicroSeconds(i64 cycles) {
return (ui64)HPMicroSecondsFloat(cycles);
}

inline ui64 HPNanoSeconds(i64 cycles) {
if (cycles > 0) {
return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate());
} else {
return 0;
}
}

inline ui64 HPCyclesNs(ui64 ns) {
return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0);
}

inline ui64 HPCyclesUs(ui64 us) {
return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0);
}

inline ui64 HPCyclesMs(ui64 ms) {
return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0);
}

class TLightBase {
protected:
TString Name;
::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red)
::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state
::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state
::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state
private:
ui64 RedCycles = 0;
ui64 GreenCycles = 0;
NHPTimer::STime AdvancedTill = 0;
NHPTimer::STime LastNow = 0;
ui64 UpdateThreshold = 0;
public:
void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) {
Name = name;
State = counters->GetCounter(name + "_state");
Count = counters->GetCounter(name + "_count", true);
RedMs = counters->GetCounter(name + "_redMs", true);
GreenMs = counters->GetCounter(name + "_greenMs", true);
UpdateThreshold = HPCyclesMs(100);
AdvancedTill = Now();
}

void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName,
const TString& redMsName,const TString& greenMsName) {
Count = counters->GetCounter(countName, true);
RedMs = counters->GetCounter(redMsName, true);
GreenMs = counters->GetCounter(greenMsName, true);
UpdateThreshold = HPCyclesMs(100);
AdvancedTill = Now();
}

ui64 GetCount() const {
return *Count;
}

ui64 GetRedMs() const {
return *RedMs;
}

ui64 GetGreenMs() const {
return *GreenMs;
}
protected:
void Modify(bool state, bool prevState) {
if (state && !prevState) { // Switched to ON state
if (State) {
*State = true;
}
(*Count)++;
return;
}
if (!state && prevState) { // Switched to OFF state
if (State) {
*State = false;
}
return;
}
}

void Advance(bool state, NHPTimer::STime now) {
if (now == AdvancedTill) {
return;
}
Elapsed(state, now - AdvancedTill);
if (RedCycles > UpdateThreshold) {
*RedMs += CutMs(RedCycles);
}
if (GreenCycles > UpdateThreshold) {
*GreenMs += CutMs(GreenCycles);
}
AdvancedTill = now;
}

NHPTimer::STime Now() {
// Avoid time going backwards
NHPTimer::STime now = HPNow();
if (now < LastNow) {
now = LastNow;
}
LastNow = now;
return now;
}
private:
void Elapsed(bool state, ui64 cycles) {
if (state) {
RedCycles += cycles;
} else {
GreenCycles += cycles;
}
}

ui64 CutMs(ui64& src) {
ui64 ms = HPMilliSeconds(src);
ui64 cycles = HPCyclesMs(ms);
src -= cycles;
return ms;
}
};

// Thread-safe light
class TLight : public TLightBase {
private:
struct TItem {
bool State;
bool Filled;
TItem(bool state = false, bool filled = false)
: State(state)
, Filled(filled)
{}
};

// Cyclic buffer to enforce event ordering by seqno
TSpinLock Lock;
size_t HeadIdx = 0; // Index of current state
size_t FilledCount = 0;
ui16 Seqno = 0; // Current seqno
TStackVec<TItem, 32> Data; // In theory should have not more than thread count items
public:
TLight() {
InitData();
}

void Set(bool state, ui16 seqno) {
TGuard<TSpinLock> g(Lock);
Push(state, seqno);
bool prevState;
// Note that 'state' variable is being reused
NHPTimer::STime now = Now();
while (Pop(state, prevState)) {
Modify(state, prevState);
Advance(prevState, now);
}
}

void Update() {
TGuard<TSpinLock> g(Lock);
Advance(Data[HeadIdx].State, Now());
}

private:
void InitData(bool state = false, bool filled = false) {
Data.clear();
Data.emplace_back(state, filled);
Data.resize(32);
HeadIdx = 0;
}

void Push(bool state, ui16 seqno) {
FilledCount++;
if (FilledCount == 1) { // First event must initialize seqno
Seqno = seqno;
InitData(state, true);
if (state) {
Modify(true, false);
}
return;
}
Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
(int)Seqno, (int)seqno, (int)state, (int)CountFilled());
ui16 diff = seqno;
diff -= Seqno; // Underflow is fine
size_t size = Data.size();
if (size <= diff) { // Buffer is full -- extend and move wrapped part
Data.resize(size * 2);
for (size_t i = 0; i < HeadIdx; i++) {
Data[size + i] = Data[i];
Data[i].Filled = false;
}
}
TItem& item = Data[(HeadIdx + diff) % Data.size()];
Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
(int)Seqno, (int)seqno, (int)state, (int)CountFilled());
item.Filled = true;
item.State = state;
}

bool Pop(bool& state, bool& prevState) {
size_t nextIdx = (HeadIdx + 1) % Data.size();
TItem& head = Data[HeadIdx];
TItem& next = Data[nextIdx];
if (!head.Filled || !next.Filled) {
return false;
}
state = next.State;
prevState = head.State;
head.Filled = false;
HeadIdx = nextIdx;
Seqno++; // Overflow is fine
FilledCount--;
if (FilledCount == 1 && Data.size() > 32) {
InitData(state, true);
}
return true;
}

size_t CountFilled() const {
size_t ret = 0;
for (const TItem& item : Data) {
ret += item.Filled;
}
return ret;
}
};

class TBurstmeter {
private:
TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket;
Expand Down
16 changes: 10 additions & 6 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ struct TPDiskMockState::TImpl {
NPDisk::TStatusFlags StatusFlags;
THashSet<ui32> ReadOnlyVDisks;
TString StateErrorReason;
NPDisk::EDeviceType DeviceType;

TImpl(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize)
TImpl(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize, NPDisk::EDeviceType deviceType)
: NodeId(nodeId)
, PDiskId(pdiskId)
, PDiskGuid(pdiskGuid)
Expand All @@ -57,6 +58,7 @@ struct TPDiskMockState::TImpl {
, AppendBlockSize(4096)
, NextFreeChunk(1)
, StatusFlags(NPDisk::TStatusFlags{})
, DeviceType(deviceType)
{}

TImpl(const TImpl&) = default;
Expand Down Expand Up @@ -275,8 +277,9 @@ struct TPDiskMockState::TImpl {
}
};

TPDiskMockState::TPDiskMockState(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize)
: TPDiskMockState(std::make_unique<TImpl>(nodeId, pdiskId, pdiskGuid, size, chunkSize))
TPDiskMockState::TPDiskMockState(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize,
NPDisk::EDeviceType deviceType)
: TPDiskMockState(std::make_unique<TImpl>(nodeId, pdiskId, pdiskGuid, size, chunkSize, deviceType))
{}

TPDiskMockState::TPDiskMockState(std::unique_ptr<TImpl>&& impl)
Expand Down Expand Up @@ -406,9 +409,10 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {

// fill in the response
TVector<TChunkIdx> ownedChunks(owner->CommittedChunks.begin(), owner->CommittedChunks.end());
const ui64 seekTimeUs = 100;
const ui64 readSpeedBps = 100 * 1000 * 1000;
const ui64 writeSpeedBps = 100 * 1000 * 1000;
const auto& performanceParams = NPDisk::DevicePerformance.at(Impl.DeviceType);
const ui64 seekTimeUs = (performanceParams.SeekTimeNs + 1000) / 1000 - 1;
const ui64 readSpeedBps = performanceParams.FirstSectorReadBytesPerSec;
const ui64 writeSpeedBps = performanceParams.FirstSectorWriteBytesPerSec;
const ui64 readBlockSize = 65536;
const ui64 writeBlockSize = 65536;
const ui64 bulkWriteBlockSize = 65536;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/pdisk/mock/pdisk_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace NKikimr {
using TPtr = TIntrusivePtr<TPDiskMockState>;

public:
TPDiskMockState(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize = 128 << 20);
TPDiskMockState(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize = 128 << 20,
NPDisk::EDeviceType deviceType = NPDisk::EDeviceType::DEVICE_TYPE_NVME);
TPDiskMockState(std::unique_ptr<TImpl>&& impl);
~TPDiskMockState();

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/ut_blobstorage/lib/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct TEnvironmentSetup {
const bool SetupHive = false;
const bool SuppressCompatibilityCheck = false;
const TFeatureFlags FeatureFlags;
const NPDisk::EDeviceType DiskType = NPDisk::EDeviceType::DEVICE_TYPE_NVME;
};

const TSettings Settings;
Expand All @@ -55,7 +56,8 @@ struct TEnvironmentSetup {
const auto key = std::make_pair(nodeId, pdiskId);
TIntrusivePtr<TPDiskMockState>& state = Env.PDiskMockStates[key];
if (!state) {
state.Reset(new TPDiskMockState(nodeId, pdiskId, cfg->PDiskGuid, ui64(10) << 40, cfg->ChunkSize));
state.Reset(new TPDiskMockState(nodeId, pdiskId, cfg->PDiskGuid, ui64(10) << 40, cfg->ChunkSize,
Env.Settings.DiskType));
}
const TActorId& actorId = ctx.Register(CreatePDiskMockActor(state), TMailboxType::HTSwap, poolId);
const TActorId& serviceId = MakeBlobStoragePDiskID(nodeId, pdiskId);
Expand Down
Loading

0 comments on commit 6689d33

Please sign in to comment.