Skip to content
10 changes: 10 additions & 0 deletions ydb/core/cms/json_proxy_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ class TJsonProxyProto : public TActorBootstrapped<TJsonProxyProto> {
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TSchemeShardControls::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTCMallocControls")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTCMallocControls::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TSamplingThrottlingOptions")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TSamplingThrottlingOptions.TThrottlingOptions")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TThrottlingOptions::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TSamplingThrottlingOptions.TSamplingOptions")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TSamplingOptions::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TKeyValue")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TKeyValue::descriptor(), ctx);
}

ctx.Send(RequestEvent->Sender,
Expand Down
62 changes: 62 additions & 0 deletions ydb/core/control/common_controls/tracing_control.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "tracing_control.h"

#include <ydb/core/control/immediate_control_board_impl.h>
#include <ydb/core/protos/config.pb.h>
#include <library/cpp/random_provider/random_provider.h>

namespace NKikimr {

namespace {

const NKikimrConfig::TImmediateControlOptions& GetImmediateControlOptionsForField(
const google::protobuf::Descriptor& descriptor, TString fieldName) {
auto field = descriptor.FindFieldByName(fieldName);
Y_ABORT_UNLESS(field);
auto& fieldOptions = field->options();
return fieldOptions.GetExtension(NKikimrConfig::ControlOptions);
}

TThrottler CreateThrottler(TIntrusivePtr<TControlBoard>& icb, TIntrusivePtr<ITimeProvider> timeProvider, TString domain) {
TControlWrapper maxRatePerMinute;
TControlWrapper maxBurst;

const std::array<std::pair<TControlWrapper&, TStringBuf>, 2> controls = {{
{maxRatePerMinute, "MaxRatePerMinute"},
{maxBurst, "MaxBurst"},
}};
const auto& throttlingOptions = *NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TThrottlingOptions::descriptor();
for (auto& [control, fieldName] : controls) {
const auto& controlOptions = GetImmediateControlOptionsForField(throttlingOptions, TString(fieldName));

control.Reset(controlOptions.GetDefaultValue(), controlOptions.GetMinValue(), controlOptions.GetMaxValue());
icb->RegisterSharedControl(control, domain + "." + fieldName);
}

return TThrottler(std::move(maxRatePerMinute), std::move(maxBurst), std::move(timeProvider));
}

}

TTracingControl::TTracingControl(TIntrusivePtr<TControlBoard>& icb, TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider>& randomProvider, TString controlDomain)
{
SampledThrottler = CreateThrottler(icb, timeProvider, controlDomain + ".SampledThrottling");
ExternalThrottler = CreateThrottler(icb, timeProvider, controlDomain + ".ExternalThrottling");

TControlWrapper samplingPPM;
const std::array<std::pair<TControlWrapper&, TStringBuf>, 2> controls = {{
{samplingPPM, "PPM"},
{SampledLevel, "Level"},
}};

const auto& samplingOptions = *NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TSamplingOptions::descriptor();
for (auto [control, name] : controls) {
const auto& controlOptions = GetImmediateControlOptionsForField(samplingOptions, TString(name));
control.Reset(controlOptions.GetDefaultValue(), controlOptions.GetMinValue(), controlOptions.GetMaxValue());
icb->RegisterSharedControl(control, controlDomain + ".Sampling." + name);
}

Sampler = TSampler(std::move(samplingPPM), randomProvider->GenRand64());
}

} // namespace NKikimr
33 changes: 33 additions & 0 deletions ydb/core/control/common_controls/tracing_control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/control/immediate_control_board_sampler.h>
#include <ydb/core/control/immediate_control_board_throttler.h>

namespace NKikimr {

class TTracingControl {
public:
TTracingControl(TIntrusivePtr<TControlBoard>& icb, TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider>& randomProvider, TString controlDomain);

bool SampleThrottle() {
return Sampler.Sample() && !SampledThrottler.Throttle();
}

bool ThrottleExternal() {
return ExternalThrottler.Throttle();
}

ui8 SampledVerbosity() const {
return SampledLevel;
}

private:
TSampler Sampler;
TThrottler SampledThrottler;
TThrottler ExternalThrottler;
TControlWrapper SampledLevel;
};

} // namespace NKikimr
13 changes: 13 additions & 0 deletions ydb/core/control/common_controls/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
LIBRARY()

PEERDIR(
ydb/library/actors/wilson
ydb/core/protos
)

SRCS(
tracing_control.h
tracing_control.cpp
)

END()
25 changes: 25 additions & 0 deletions ydb/core/control/immediate_control_board_sampler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <ydb/core/control/immediate_control_board_wrapper.h>

namespace NKikimr {

class TSampler {
public:
TSampler() : Rng(0) {}

TSampler(TControlWrapper samplingPPM, ui64 seed)
: SamplingPPM(std::move(samplingPPM))
, Rng(seed)
{}

bool Sample() {
return Rng() % 1'000'000 < SamplingPPM;
}

private:
TControlWrapper SamplingPPM;
TReallyFastRng32 Rng;
};

} // namespace NKikimr
66 changes: 66 additions & 0 deletions ydb/core/control/immediate_control_board_throttler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#pragma once

#include <ydb/core/control/immediate_control_board_wrapper.h>
#include <library/cpp/time_provider/time_provider.h>

namespace NKikimr {

class TThrottler {
public:
TThrottler() = default;

TThrottler(TControlWrapper maxRatePerMinute, TControlWrapper maxBurst,
TIntrusivePtr<ITimeProvider> timeProvider)
: TimeProvider(std::move(timeProvider))
, MaxRatePerMinute(std::move(maxRatePerMinute))
, MaxBurst(std::move(maxBurst))
, LastUpdate(TimeProvider->Now())
{}

bool Throttle() {
auto maxRatePerMinute = static_cast<i64>(MaxRatePerMinute);
auto maxBurst = static_cast<i64>(MaxBurst);
auto maxTotal = maxBurst + 1;
CurrentBurst = std::min(CurrentBurst, maxTotal);
if (maxRatePerMinute == 0) {
return true;
}

auto now = TimeProvider->Now();
if (now < LastUpdate) {
return true;
}

const auto deltaBetweenSends = TDuration::Minutes(1) / maxRatePerMinute;
UpdateStats(now, deltaBetweenSends);

if (CurrentBurst < maxTotal) {
CurrentBurst += 1;
return false;
}

return true;
}

private:
void UpdateStats(TInstant now, TDuration deltaBetweenSends) {
i64 decrease = (now - LastUpdate) / deltaBetweenSends;
decrease = std::min(decrease, CurrentBurst);
Y_ABORT_UNLESS(decrease >= 0);
CurrentBurst -= decrease;
LastUpdate += decrease * deltaBetweenSends;
if (CurrentBurst == 0) {
LastUpdate = now;
}
}

TIntrusivePtr<ITimeProvider> TimeProvider;

TControlWrapper MaxRatePerMinute;
TControlWrapper MaxBurst;

TInstant LastUpdate = TInstant::Zero();
i64 CurrentBurst = 0;
};

} // namespace NKikimr
6 changes: 6 additions & 0 deletions ydb/core/control/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ SRCS(
immediate_control_board_impl.cpp
immediate_control_board_impl.h
immediate_control_board_wrapper.h
immediate_control_board_throttler.h
immediate_control_board_sampler.h
)

END()

RECURSE(
common_controls
)

RECURSE_FOR_TESTS(
ut
)
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
for (size_t i = 0; i < proxyCount; ++i) {
auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck()
? NGRpcService::CreateGRpcRequestProxySimple(Config)
: NGRpcService::CreateGRpcRequestProxy(Config);
: NGRpcService::CreateGRpcRequestProxy(Config, appData->Icb);
setup->LocalServices.push_back(std::pair<TActorId,
TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(i),
TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled,
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ class IRequestProxyCtx : public virtual IRequestCtxBase {
virtual void StartTracing(NWilson::TSpan&& span) = 0;
virtual void LegacyFinishSpan() = 0;

// Used for per-type sampling
virtual const TString& GetInternalRequestType() const = 0;

// validation
virtual bool Validate(TString& error) = 0;

Expand Down Expand Up @@ -485,6 +488,10 @@ class TRefreshTokenImpl

void StartTracing(NWilson::TSpan&& /*span*/) override {}
void LegacyFinishSpan() override {}
const TString& GetInternalRequestType() const final {
static const TString empty = "";
return empty;
}

void UpdateAuthState(NYdbGrpc::TAuthState::EAuthState state) override {
State_.State = state;
Expand Down Expand Up @@ -890,6 +897,10 @@ class TGRpcRequestBiStreamWrapper
Span_.End();
}

const TString& GetInternalRequestType() const final {
return TRequest::descriptor()->full_name();
}

// IRequestCtxBase
//
void AddAuditLogPart(const TStringBuf&, const TString&) override {
Expand Down Expand Up @@ -1302,6 +1313,10 @@ class TGRpcRequestWrapperImpl

void LegacyFinishSpan() override {}

const TString& GetInternalRequestType() const final {
return TRequest::descriptor()->full_name();
}

void ReplyGrpcError(grpc::StatusCode code, const TString& msg, const TString& details = "") {
Ctx_->ReplyError(code, msg, details);
}
Expand Down
Loading