Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ struct TEvBlobStorage {
EvInplacePatch,
EvAssimilate,

EvGetQueuesInfo, // for debugging purposes

//
EvPutResult = EvPut + 512, /// 268 632 576
EvGetResult,
Expand All @@ -506,6 +508,8 @@ struct TEvBlobStorage {
EvInplacePatchResult,
EvAssimilateResult,

EvQueuesInfo, // for debugging purposes

// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
EvVGet,
Expand Down Expand Up @@ -878,6 +882,7 @@ struct TEvBlobStorage {
EvRunActor = EvPut + 15 * 512,
EvVMockCtlRequest,
EvVMockCtlResponse,
EvDelayedMessageWrapper,

// incremental huge blob keeper
EvIncrHugeInit = EvPut + 17 * 512,
Expand Down
36 changes: 36 additions & 0 deletions ydb/core/blobstorage/backpressure/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,39 @@
#define QLOG_DEBUG_S(marker, arg) QLOG_LOG_S(marker, NActors::NLog::PRI_DEBUG , arg)

LWTRACE_USING(BLOBSTORAGE_PROVIDER);

namespace NKikimr::NBsQueue {

// Special timer for debug purposes, which works with virtual time of TTestActorSystem
struct TActivationContextTimer {
TActivationContextTimer()
: CreationTimestamp(NActors::TActivationContext::Monotonic())
{}

double Passed() const {
return (NActors::TActivationContext::Monotonic() - CreationTimestamp).SecondsFloat();
}

TMonotonic CreationTimestamp;
};

struct TBSQueueTimer {
TBSQueueTimer(bool useActorSystemTime)
{
if (useActorSystemTime) {
Timer.emplace<TActivationContextTimer>();
} else {
Timer.emplace<THPTimer>();
}
}

std::variant<THPTimer, TActivationContextTimer> Timer;

double Passed() const {
return std::visit([](const auto& timer) -> double {
return timer.Passed();
}, Timer);
}
};

} // namespace NKikimr::NBsQueue
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/backpressure/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ IEventBase *TEventHolder::MakeErrorReply(NKikimrProto::EReplyStatus status, cons

void TEventHolder::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId,
ui64 sequenceId, bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
const THPTimer& processingTimer) {
const TBSQueueTimer& processingTimer) {
// check that we are not discarded yet
Y_ABORT_UNLESS(Type != 0);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/backpressure/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class TEventHolder {

void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId, ui64 sequenceId,
bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
const THPTimer& processingTimer);
const TBSQueueTimer& processingTimer);

void Discard();
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/backpressure/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace NKikimr::NBsQueue {

TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility)
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility, bool useActorSystemTime)
: Queues(bspctx)
, WindowSize(0)
, InFlightCost(0)
Expand All @@ -16,6 +16,7 @@ TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamic
, ClientId(clientId)
, BytesWaiting(0)
, InterconnectChannel(interconnectChannel)
, UseActorSystemTime(useActorSystemTime)
// use parent group visibility
, QueueWaitingItems(counters->GetCounter("QueueWaitingItems", false, visibility))
, QueueWaitingBytes(counters->GetCounter("QueueWaitingBytes", false, visibility))
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/blobstorage/backpressure/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ class TBlobStorageQueue {
const ui64 QueueCookie;
ui64 Cost;
bool DirtyCost;
THPTimer ProcessingTimer;
TBSQueueTimer ProcessingTimer;

TTrackableList<TItem>::iterator Iterator;

template<typename TEvent>
TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline,
const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems,
const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
bool local)
bool local, bool useActorSystemTime)
: Queue(EItemQueue::NotSet)
, CostEssence(*event->Get())
, Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight")
Expand All @@ -70,6 +71,7 @@ class TBlobStorageQueue {
, QueueCookie(RandomNumber<ui64>())
, Cost(0)
, DirtyCost(true)
, ProcessingTimer(useActorSystemTime)
{
if (Span) {
Span
Expand Down Expand Up @@ -129,6 +131,8 @@ class TBlobStorageQueue {

const ui32 InterconnectChannel;

const bool UseActorSystemTime;

public:
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingItems;
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingBytes;
Expand Down Expand Up @@ -156,7 +160,8 @@ class TBlobStorageQueue {
TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
const TBlobStorageGroupType &gType,
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public);
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public,
bool useActorSystemTime = false);

~TBlobStorageQueue();

Expand Down Expand Up @@ -213,7 +218,8 @@ class TBlobStorageQueue {
TItemList::iterator newIt;
if (Queues.Unused.empty()) {
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local,
UseActorSystemTime);
++*QueueSize;
} else {
newIt = Queues.Unused.begin();
Expand All @@ -222,7 +228,7 @@ class TBlobStorageQueue {
TItem& item = *newIt;
item.~TItem();
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
InterconnectChannel, local);
InterconnectChannel, local, UseActorSystemTime);
}

newIt->Iterator = newIt;
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
ui32 interconnectChannel, bool /*local*/, TDuration watchdogTimeout,
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility)
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
bool useActorSystemTime)
: BSProxyCtx(bspctx)
, QueueName(queueName)
, Counters(counters->GetSubgroup("queue", queueName))
, Queue(Counters, LogPrefix, bspctx, clientId, interconnectChannel,
(info ? info->Type : TErasureType::ErasureNone), visibility)
(info ? info->Type : TErasureType::ErasureNone), visibility, useActorSystemTime)
, VDiskIdShort(vdiskId)
, QueueId(queueId)
, QueueWatchdogTimeout(watchdogTimeout)
Expand Down Expand Up @@ -975,9 +976,10 @@ IActor* CreateVDiskBackpressureClient(const TIntrusivePtr<TBlobStorageGroupInfo>
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility) {
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
bool useActorSystemTime) {
return new NBsQueue::TVDiskBackpressureClientActor(info, vdiskId, queueId, counters, bspctx, clientId, queueName,
interconnectChannel, local, watchdogTimeout, flowRecord, visibility);
interconnectChannel, local, watchdogTimeout, flowRecord, visibility, useActorSystemTime);
}

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility);
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
bool useActorSystemTime = false);

} // NKikimr
25 changes: 22 additions & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const ui32 MaskSizeBits = 32;
constexpr bool DefaultEnablePutBatching = true;
constexpr bool DefaultEnableVPatch = false;

constexpr float DefaultSlowDiskThreshold = 2;
constexpr float DefaultPredictedDelayMultiplier = 1;

constexpr bool WithMovingPatchRequestToStaticNode = true;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -188,6 +191,11 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TE
}
}

struct TAccelerationParams {
double SlowDiskThreshold = 2;
double PredictedDelayMultiplier = 1;
};

class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor> {
public:
template<typename TEv>
Expand Down Expand Up @@ -362,6 +370,7 @@ struct TBlobStorageGroupPutParameters {
bool TimeStatsEnabled;
TDiskResponsivenessTracker::TPerDiskStatsPtr Stats;
bool EnableRequestMod3x3ForMinLatency;
TAccelerationParams AccelerationParams;
};
IActor* CreateBlobStorageGroupPutRequest(TBlobStorageGroupPutParameters params);

Expand All @@ -379,6 +388,7 @@ struct TBlobStorageGroupMultiPutParameters {
NKikimrBlobStorage::EPutHandleClass HandleClass;
TEvBlobStorage::TEvPut::ETactic Tactic;
bool EnableRequestMod3x3ForMinLatency;
TAccelerationParams AccelerationParams;

static ui32 CalculateRestartCounter(TBatchedVec<TEvBlobStorage::TEvPut::TPtr>& events) {
ui32 maxRestarts = 0;
Expand All @@ -398,6 +408,7 @@ struct TBlobStorageGroupGetParameters {
.Activity = NKikimrServices::TActivity::BS_PROXY_GET_ACTOR,
};
TNodeLayoutInfoPtr NodeLayout;
TAccelerationParams AccelerationParams;
};
IActor* CreateBlobStorageGroupGetRequest(TBlobStorageGroupGetParameters params);

Expand Down Expand Up @@ -498,12 +509,20 @@ IActor* CreateBlobStorageGroupAssimilateRequest(TBlobStorageGroupAssimilateParam

IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon);

struct TBlobStorageProxyParameters {
bool UseActorSystemTimeInBSQueue = false;

const TControlWrapper& EnablePutBatching;
const TControlWrapper& EnableVPatch;
const TControlWrapper& SlowDiskThreshold;
const TControlWrapper& PredictedDelayMultiplier;
};

IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
bool forceWaitAllDrives, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TControlWrapper &enablePutBatching,
const TControlWrapper &enableVPatch);
TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TBlobStorageProxyParameters& params);

IActor* CreateBlobStorageGroupProxyUnconfigured(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
const TControlWrapper &enablePutBatching, const TControlWrapper &enableVPatch);
const TBlobStorageProxyParameters& params);

}//NKikimr
31 changes: 21 additions & 10 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,17 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu
}

void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multiplier) const {
outNWorst->resize(Disks.size());
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
(*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx };
(*outNWorst)[diskIdx] = {
static_cast<ui64>(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier),
diskIdx
};
}
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end());
ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size());
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
}

bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
Expand Down Expand Up @@ -361,7 +366,8 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
}

EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
auto& blob = it->second;
if (!std::exchange(blob.IsChanged, false)) {
Expand All @@ -373,7 +379,7 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
TString errorReason;
for (IStrategy *strategy : s) {
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests, accelerationParams)) {
case EStrategyOutcome::IN_PROGRESS:
status = NKikimrProto::UNKNOWN;
break;
Expand Down Expand Up @@ -415,8 +421,9 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
}

EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, accelerationParams, finished, expired);
}

TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
Expand Down Expand Up @@ -458,13 +465,17 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
}

void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multiplier) const {
ui32 totalVDisks = info.GetTotalVDisksNum();
outNWorst->resize(totalVDisks);
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
(*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber };
(*outNWorst)[orderNumber] = {
static_cast<ui64>(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier),
orderNumber
};
}
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end());
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end());
}

void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ struct TBlobState {
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const;
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multipler = 1) const;
TString ToString() const;
bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;

Expand Down Expand Up @@ -158,7 +159,8 @@ class IStrategy {
public:
virtual ~IStrategy() = default;
virtual EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info,
TBlackboard &blackboard, TGroupDiskRequests &groupDiskRequests) = 0;
TBlackboard &blackboard, TGroupDiskRequests &groupDiskRequests,
const TAccelerationParams& accelerationParams) = 0;
};

struct TBlackboard {
Expand Down Expand Up @@ -201,14 +203,16 @@ struct TBlackboard {
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);

EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
TBatchedVec<TFinishedBlob> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TFinishedBlob> *finished = nullptr,
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished = nullptr,
const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, const TAccelerationParams& accelerationParams,
TBatchedVec<TFinishedBlob> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
TBlobState& GetState(const TLogoBlobID &id);
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status);
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const;
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multiplier = 1) const;
TString ToString() const;

void ChangeAll() {
Expand Down
Loading