Skip to content

Commit

Permalink
Pass VDisk burst threshold in config, fix TBucketQuoter bug (ydb-plat…
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 committed Jun 5, 2024
1 parent 9fe9ac6 commit 62005ad
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 44 deletions.
2 changes: 1 addition & 1 deletion library/cpp/bucket_quoter/bucket_quoter.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ class TBucketQuoter {

i64 UseAndFill(ui64 tokens) {
TGuard<Lock> g(BucketMutex);
UseNoLock(tokens);
FillBucket();
UseNoLock(tokens);
return Bucket;
}

Expand Down
24 changes: 22 additions & 2 deletions ydb/core/blobstorage/ut_blobstorage/lib/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ struct TEnvironmentSetup {
const bool SuppressCompatibilityCheck = false;
const TFeatureFlags FeatureFlags;
const NPDisk::EDeviceType DiskType = NPDisk::EDeviceType::DEVICE_TYPE_NVME;
const ui32 BurstThresholdNs = 0;
const TString VDiskKind = "";
};

const TSettings Settings;
Expand Down Expand Up @@ -323,6 +325,16 @@ struct TEnvironmentSetup {
config->CacheAccessor = std::make_unique<TAccessor>(Cache[nodeId]);
}
config->FeatureFlags = Settings.FeatureFlags;
if (Settings.VDiskKind) {
NKikimrBlobStorage::TAllVDiskKinds vdiskConfig;
auto* kind = vdiskConfig.AddVDiskKinds();
kind->SetKind(NKikimrBlobStorage::TVDiskKind::Test1);
if (Settings.BurstThresholdNs) {
kind->MutableConfig()->SetBurstThresholdNs(Settings.BurstThresholdNs);
}

config->AllVDiskKinds = MakeIntrusive<TAllVDiskKinds>(vdiskConfig);
}
warden.reset(CreateBSNodeWarden(config));
}

Expand Down Expand Up @@ -408,7 +420,11 @@ struct TEnvironmentSetup {
cmd2->SetName(StoragePoolName);
cmd2->SetKind(StoragePoolName);
cmd2->SetErasureSpecies(TBlobStorageGroupType::ErasureSpeciesName(Settings.Erasure.GetErasure()));
cmd2->SetVDiskKind("Default");
if (Settings.VDiskKind) {
cmd2->SetVDiskKind(Settings.VDiskKind);
} else {
cmd2->SetVDiskKind("Default");
}
cmd2->SetNumGroups(numGroups ? numGroups : NumGroups);
cmd2->AddPDiskFilter()->AddProperty()->SetType(pdiskType);
if (Settings.Encryption) {
Expand All @@ -428,7 +444,11 @@ struct TEnvironmentSetup {
cmd->SetName(poolName);
cmd->SetKind(poolName);
cmd->SetErasureSpecies(TBlobStorageGroupType::ErasureSpeciesName(Settings.Erasure.GetErasure()));
cmd->SetVDiskKind("Default");
if (Settings.VDiskKind) {
cmd->SetVDiskKind(Settings.VDiskKind);
} else {
cmd->SetVDiskKind("Default");
}
cmd->SetNumGroups(1);
cmd->AddPDiskFilter()->AddProperty()->SetType(NKikimrBlobStorage::EPDiskType::ROT);
if (Settings.Encryption) {
Expand Down
31 changes: 19 additions & 12 deletions ydb/core/blobstorage/ut_blobstorage/monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ ui64 AggregateVDiskCounters(std::unique_ptr<TEnvironmentSetup>& env, const NKiki

void SetupEnv(const TBlobStorageGroupInfo::TTopology& topology, std::unique_ptr<TEnvironmentSetup>& env,
NKikimrBlobStorage::TBaseConfig& baseConfig, ui32& groupSize, TBlobStorageGroupType& groupType,
ui32& groupId, std::vector<ui32>& pdiskLayout) {
ui32& groupId, std::vector<ui32>& pdiskLayout, ui32 burstThresholdNs = 0, TString vdiskKind = "") {
groupSize = topology.TotalVDisks;
groupType = topology.GType;
env.reset(new TEnvironmentSetup({
.NodeCount = groupSize,
.Erasure = groupType,
.DiskType = NPDisk::EDeviceType::DEVICE_TYPE_ROT,
.BurstThresholdNs = burstThresholdNs,
.VDiskKind = vdiskKind,
}));

env->CreateBoxAndPool(1, 1);
Expand Down Expand Up @@ -414,14 +416,17 @@ enum class ELoadDistribution : ui8 {
};

template <typename TInflightActor>
void TestBurst(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor, ELoadDistribution loadDistribution) {
void TestBurst(ui32 requests, ui32 inflight, TDuration delay, ELoadDistribution loadDistribution,
ui32 burstThresholdNs = 0) {
TBlobStorageGroupInfo::TTopology topology(TBlobStorageGroupType::ErasureNone, 1, 1, 1, true);
auto* actor = new TInflightActor({requests, inflight, delay}, 8_MB);
std::unique_ptr<TEnvironmentSetup> env;
NKikimrBlobStorage::TBaseConfig baseConfig;
ui32 groupSize;
TBlobStorageGroupType groupType;
ui32 groupId;
std::vector<ui32> pdiskLayout;
SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout);
SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout, burstThresholdNs, "Test1");

actor->SetGroupId(groupId);
env->Runtime->Register(actor, 1);
Expand All @@ -437,16 +442,18 @@ void TestBurst(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor*
}
}

#define MAKE_BURST_TEST(requestType, requests, inflight, delay, distribution) \
Y_UNIT_TEST(Test##requestType##distribution) { \
TBlobStorageGroupInfo::TTopology topology(TBlobStorageGroupType::ErasureNone, 1, 1, 1, true); \
auto* actor = new TInflightActor##requestType({requests, inflight, delay}, 8_MB); \
TestBurst(topology, actor, ELoadDistribution::Distribution##distribution); \
}

Y_UNIT_TEST_SUITE(BurstDetection) {
MAKE_BURST_TEST(Put, 10, 1, TDuration::Seconds(1), Evenly);
MAKE_BURST_TEST(Put, 10, 100, TDuration::MilliSeconds(1), Burst);
Y_UNIT_TEST(TestPutEvenly) {
TestBurst<TInflightActorPut>(10, 1, TDuration::Seconds(1), ELoadDistribution::DistributionEvenly);
}

Y_UNIT_TEST(TestPutBurst) {
TestBurst<TInflightActorPut>(10, 10, TDuration::MilliSeconds(1), ELoadDistribution::DistributionBurst);
}

Y_UNIT_TEST(TestOverlySensitive) {
TestBurst<TInflightActorPut>(10, 1, TDuration::Seconds(1), ELoadDistribution::DistributionBurst, 1);
}
}

#undef MAKE_BURST_TEST
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ class TBsCostModelMirror3of4 : public TBsCostModelBase {
};

TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::EDeviceType diskType,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters)
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 burstThresholdNs)
: GroupType(groupType)
, CostCounters(counters->GetSubgroup("subsystem", "advancedCost"))
, UserDiskCost(CostCounters->GetCounter("UserDiskCost", true))
, CompactionDiskCost(CostCounters->GetCounter("CompactionDiskCost", true))
, ScrubDiskCost(CostCounters->GetCounter("ScrubDiskCost", true))
, DefragDiskCost(CostCounters->GetCounter("DefragDiskCost", true))
, InternalDiskCost(CostCounters->GetCounter("InternalDiskCost", true))
, BucketCapacity(burstThresholdNs / GroupType.BlobSubgroupSize())
, Bucket(&DiskTimeAvailableNs, &BucketCapacity, nullptr, nullptr, nullptr, nullptr, true)
{
BurstDetector.Initialize(CostCounters, "BurstDetector");
Expand All @@ -67,7 +68,6 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
CostModel = std::make_unique<TBsCostModelErasureNone>(diskType);
break;
}
UpdateBucketCapacity();
}

} // NKikimr
26 changes: 3 additions & 23 deletions ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class TBsCostTracker {
::NMonitoring::TDynamicCounters::TCounterPtr DefragDiskCost;
::NMonitoring::TDynamicCounters::TCounterPtr InternalDiskCost;

TAtomic BucketCapacity = 1'000'000'000; // 10^9 nsec
TAtomic BucketCapacity; // 10^9 nsec
TAtomic DiskTimeAvailableNs = 1'000'000'000;
TBucketQuoter<i64, TSpinLock, TAppDataTimerMs<TInstantTimerMs>> Bucket;
TLight BurstDetector;
Expand All @@ -327,7 +327,7 @@ class TBsCostTracker {

public:
TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::EDeviceType diskType,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 burstThresholdNs);

template<class TEv>
ui64 GetCost(const TEv& ev) const {
Expand All @@ -340,35 +340,15 @@ class TBsCostTracker {
return cost;
}

private:
void UpdateBucketCapacity() {
if (!CostModel) {
return;
}
ui64 maxPartSize = GroupType.MaxPartSize(TBlobStorageGroupType::ECrcMode::CrcModeWholePart, MaxVDiskBlobSize);
ui64 maxHugePartSize = GroupType.MaxPartSize(TBlobStorageGroupType::ECrcMode::CrcModeWholePart,
CostModel->HugeBlobSize);
ui64 capacity = std::max({
CostModel->ReadCost(maxHugePartSize),
CostModel->WriteCost(maxPartSize),
CostModel->HugeWriteCost(maxHugePartSize)
}) * ConcurrentHugeRequestsAllowed;

if (capacity != (ui64)AtomicGet(BucketCapacity)) {
AtomicSet(BucketCapacity, capacity);
}
}

public:
void UpdateCostModel(const TCostModel& costModel) {
if (CostModel) {
CostModel->Update(costModel);
}
UpdateBucketCapacity();
}

void CountRequest(ui64 cost) {
Bucket.Use(cost);
Bucket.UseAndFill(cost);
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
}

Expand Down
13 changes: 13 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ namespace NKikimr {
#else
BarrierValidation = true; // switch by default on debug builds
#endif

BurstThresholdNs = NPDisk::DevicePerformance.at(baseInfo.DeviceType).BurstThresholdNs;
}

void TVDiskConfig::SetupHugeBytes() {
Expand Down Expand Up @@ -160,6 +162,8 @@ namespace NKikimr {
UPDATE_MACRO(ReplInterconnectChannel);

UPDATE_MACRO(BarrierValidation);

UPDATE_MACRO(BurstThresholdNs);
#undef UPDATE_MACRO
}

Expand All @@ -180,6 +184,15 @@ namespace NKikimr {
ParseConfig();
}

TAllVDiskKinds::TAllVDiskKinds(const NKikimrBlobStorage::TAllVDiskKinds &proto)
: AllKindsConfig()
, VDiskMegaBaseConfig(TVDiskConfig::TBaseInfo())
, KindsMap()
{
AllKindsConfig.CopyFrom(proto);
ParseConfig();
}

TIntrusivePtr<TVDiskConfig> TAllVDiskKinds::MakeVDiskConfig(const TVDiskConfig::TBaseInfo &baseInfo) {
EKind k = baseInfo.Kind;
TVector<const TKind *> merge;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ namespace NKikimr {
TDuration WhiteboardUpdateInterval;
bool EnableVDiskCooldownTimeout;
TControlWrapper EnableVPatch = true;
ui64 BurstThresholdNs = 1'000'000'000;

///////////// FEATURE FLAGS ////////////////////////
NKikimrConfig::TFeatureFlags FeatureFlags;
Expand All @@ -228,6 +229,7 @@ namespace NKikimr {
class TAllVDiskKinds : public TThrRefBase {
public:
TAllVDiskKinds(const TString &prototext = TString());
TAllVDiskKinds(const NKikimrBlobStorage::TAllVDiskKinds &proto);
TIntrusivePtr<TVDiskConfig> MakeVDiskConfig(const TVDiskConfig::TBaseInfo &baseInfo);
void Merge(const NKikimrBlobStorage::TAllVDiskKinds &allVDiskKinds);

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace NKikimr {
TReplQuoter::TPtr replPDiskReadQuoter,
TReplQuoter::TPtr replPDiskWriteQuoter,
TReplQuoter::TPtr replNodeRequestQuoter,
TReplQuoter::TPtr replNodeResponseQuoter)
TReplQuoter::TPtr replNodeResponseQuoter,
ui64 burstThresholdNs)
: TBSProxyContext(vdiskCounters->GetSubgroup("subsystem", "memhull"))
, VDiskActorId(vdiskActorId)
, Top(std::move(top))
Expand All @@ -57,7 +58,7 @@ namespace NKikimr {
, ReplPDiskWriteQuoter(std::move(replPDiskWriteQuoter))
, ReplNodeRequestQuoter(std::move(replNodeRequestQuoter))
, ReplNodeResponseQuoter(std::move(replNodeResponseQuoter))
, CostTracker(std::make_shared<TBsCostTracker>(Top->GType, type, vdiskCounters))
, CostTracker(std::make_shared<TBsCostTracker>(Top->GType, type, vdiskCounters, burstThresholdNs))
, OutOfSpaceState(Top->GetTotalVDisksNum(), Top->GetOrderNumber(ShortSelfVDisk))
, CostMonGroup(vdiskCounters, "subsystem", "cost")
, Logger(as ? ActorSystemLogger(as) : DevNullLogger())
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ namespace NKikimr {
TReplQuoter::TPtr replPDiskReadQuoter = nullptr,
TReplQuoter::TPtr replPDiskWriteQuoter = nullptr,
TReplQuoter::TPtr replNodeRequestQuoter = nullptr,
TReplQuoter::TPtr replNodeResponseQuoter = nullptr);
TReplQuoter::TPtr replNodeResponseQuoter = nullptr,
ui64 burstThresholdNs = 1'000'000'000);

// The function checks response from PDisk. Normally, it's OK.
// Other alternatives are: 1) shutdown; 2) FAIL
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,10 @@ namespace NKikimr {
TABLED() {str << "VDiskIncarnationGuid";}
TABLED() {str << Db->GetVDiskIncarnationGuid(true);}
}
TABLER() {
TABLED() {str << "BurstThresholdNs";}
TABLED() {str << Config->BurstThresholdNs;}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ namespace NKikimr {
VCtx = MakeIntrusive<TVDiskContext>(ctx.SelfID, GInfo->PickTopology(), VDiskCounters, SelfVDiskId,
ctx.ExecutorThread.ActorSystem, baseInfo.DeviceType, baseInfo.DonorMode,
baseInfo.ReplPDiskReadQuoter, baseInfo.ReplPDiskWriteQuoter, baseInfo.ReplNodeRequestQuoter,
baseInfo.ReplNodeResponseQuoter);
baseInfo.ReplNodeResponseQuoter, Config->BurstThresholdNs);

// create IntQueues
IntQueueAsyncGets = std::make_unique<TIntQueueClass>(
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/blobstorage_vdisk_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ message TVDiskConfig {

optional bool BarrierValidation = 60;
optional bool EnableOverseerLsnReporting = 61; // deprecated

optional uint64 BurstThresholdNs = 62;
};

// organizes hierarchy of VDisk configs: VDisk config may have a base config,
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/pdisk_io/device_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace NKikimr::NPDisk {
ui64 LastSectorReadBytesPerSec;
ui64 FirstSectorWriteBytesPerSec;
ui64 LastSectorWriteBytesPerSec;
ui64 BurstThresholdNs;
};

const static std::unordered_map<EDeviceType, TDevicePerformanceParams> DevicePerformance = {
Expand All @@ -28,27 +29,31 @@ namespace NKikimr::NPDisk {
.LastSectorReadBytesPerSec = 0,
.FirstSectorWriteBytesPerSec = 0,
.LastSectorWriteBytesPerSec = 0,
.BurstThresholdNs = 1'000'000'000,
} },
{ DEVICE_TYPE_ROT, TDevicePerformanceParams{
.SeekTimeNs = 8000000,
.FirstSectorReadBytesPerSec = 200ull * 1024 * 1024,
.LastSectorReadBytesPerSec = 66ull * 1024 * 1024,
.FirstSectorWriteBytesPerSec = 200ull * 1024 * 1024,
.LastSectorWriteBytesPerSec = 66ull * 1024 * 1024,
.BurstThresholdNs = 200'000'000,
} },
{ DEVICE_TYPE_SSD, TDevicePerformanceParams{
.SeekTimeNs = 40000,
.FirstSectorReadBytesPerSec = 500ull * 1024 * 1024,
.LastSectorReadBytesPerSec = 500ull * 1024 * 1024,
.FirstSectorWriteBytesPerSec = 500ull * 1024 * 1024,
.LastSectorWriteBytesPerSec = 500ull * 1024 * 1024,
.BurstThresholdNs = 50'000'000,
} },
{ DEVICE_TYPE_NVME, TDevicePerformanceParams{
.SeekTimeNs = 40000,
.FirstSectorReadBytesPerSec = 1000ull * 1024 * 1024,
.LastSectorReadBytesPerSec = 1000ull * 1024 * 1024,
.FirstSectorWriteBytesPerSec = 1000ull * 1024 * 1024,
.LastSectorWriteBytesPerSec = 1000ull * 1024 * 1024,
.BurstThresholdNs = 32'000'000,
} },
};

Expand Down

0 comments on commit 62005ad

Please sign in to comment.