Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewritten sampling and throttling controls to share limits and implement scoping #1902

Merged
merged 10 commits into from
Feb 21, 2024
160 changes: 137 additions & 23 deletions ydb/core/cms/console/jaeger_tracing_configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
#include "console.h"

#include <ydb/core/jaeger_tracing/sampling_throttling_configurator.h>
#include <ydb/core/jaeger_tracing/settings.h>
#include <ydb/library/actors/core/actor.h>

namespace NKikimr::NConsole {

using namespace NJaegerTracing;

class TJaegerTracingConfigurator : public TActorBootstrapped<TJaegerTracingConfigurator> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::JAEGER_TRACING_CONFIGURATOR;
}

TJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg);
TJaegerTracingConfigurator(TSamplingThrottlingConfigurator tracingConfigurator,
NKikimrConfig::TTracingConfig cfg);

void Bootstrap(const TActorContext& ctx);

Expand All @@ -27,26 +30,28 @@ class TJaegerTracingConfigurator : public TActorBootstrapped<TJaegerTracingConfi
IgnoreFunc(TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse)
)

TMaybe<TString> ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg);
void ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg);
static TMaybe<ERequestType> GetRequestType(const NKikimrConfig::TTracingConfig::TSelectors& selectors);
static TSettings<double, TThrottlingSettings> GetSettings(const NKikimrConfig::TTracingConfig& cfg);

NJaegerTracing::TSamplingThrottlingConfigurator TracingConfigurator;
TSamplingThrottlingConfigurator TracingConfigurator;
NKikimrConfig::TTracingConfig initialConfig;
};

TJaegerTracingConfigurator::TJaegerTracingConfigurator(
NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg)
TSamplingThrottlingConfigurator tracingConfigurator,
NKikimrConfig::TTracingConfig cfg)
: TracingConfigurator(std::move(tracingConfigurator))
{
if (auto err = ApplyConfigs(cfg)) {
Y_ABORT_UNLESS(false, "Failed to apply initial tracing configs: %s", err->c_str());
}
}
, initialConfig(std::move(cfg))
{}

void TJaegerTracingConfigurator::Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: Bootstrap");
Become(&TThis::StateWork);

LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: subscribe to config updates");
ApplyConfigs(initialConfig);

LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: subscribing to config updates");
ui32 item = static_cast<ui32>(NKikimrConsole::TConfigItem::TracingConfigItem);
ctx.Send(MakeConfigsDispatcherID(SelfId().NodeId()),
new TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest(item));
Expand All @@ -55,27 +60,136 @@ void TJaegerTracingConfigurator::Bootstrap(const TActorContext& ctx) {
void TJaegerTracingConfigurator::Handle(TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx) {
auto& rec = ev->Get()->Record;

LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS,
"TJaegerTracingConfigurator: got new config: "
<< rec.GetConfig().ShortDebugString());
LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: got new config: " << rec.GetConfig().ShortDebugString());

if (auto err = ApplyConfigs(rec.GetConfig().GetTracingConfig())) {
LOG_NOTICE_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: Failed to apply tracing configs: " << *err);
}
ApplyConfigs(rec.GetConfig().GetTracingConfig());

auto resp = MakeHolder<TEvConsole::TEvConfigNotificationResponse>(rec);
LOG_TRACE_S(ctx, NKikimrServices::CMS_CONFIGS,
"TJaegerTracingConfigurator: Send TEvConfigNotificationResponse");
ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie);
}

TMaybe<TString> TJaegerTracingConfigurator::ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg) {
return TracingConfigurator.HandleConfigs(cfg);
void TJaegerTracingConfigurator::ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg) {
auto settings = GetSettings(cfg);
return TracingConfigurator.UpdateSettings(std::move(settings));
}

TMaybe<ERequestType> TJaegerTracingConfigurator::GetRequestType(const NKikimrConfig::TTracingConfig::TSelectors& selectors) {
if (!selectors.HasRequestType()) {
return ERequestType::UNSPECIFIED;
}
if (auto it = NameToRequestType.FindPtr(selectors.GetRequestType())) {
return *it;
}
return {};
}

TSettings<double, TThrottlingSettings> TJaegerTracingConfigurator::GetSettings(const NKikimrConfig::TTracingConfig& cfg) {
TSettings<double, TThrottlingSettings> settings;

for (const auto& samplingRule : cfg.GetSampling()) {
ERequestType requestType;
if (auto parsedRequestType = GetRequestType(samplingRule.GetScope())) {
requestType = *parsedRequestType;
} else {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "failed to parse request type in the rule "
<< samplingRule.ShortDebugString() << ". Skipping the rule");
continue;
}
if (!samplingRule.HasLevel() || !samplingRule.HasFraction() || !samplingRule.HasMaxRatePerMinute()) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "missing required fields in rule " << samplingRule.ShortDebugString()
<< " (required fields are: level, fraction, max_rate_per_minute). Skipping the rule");
continue;
}
if (samplingRule.GetMaxRatePerMinute() == 0) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "max_rate_per_minute should never be zero. Found in rule " << samplingRule.GetMaxRatePerMinute()
<< ". Skipping the rule");
continue;
}

ui64 level = samplingRule.GetLevel();
double fraction = samplingRule.GetFraction();
if (level > 15) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "sampling level exceeds maximum allowed value (" << level
<< " provided, maximum is 15). Lowering the level");
level = 15;
}
if (fraction < 0 || fraction > 1) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "provided fraction " << fraction
<< " violated range [0; 1]. Clamping it to the range");
fraction = std::min(1.0, std::max(0.0, fraction));
}

TSamplingRule<double, TThrottlingSettings> rule {
.Level = static_cast<ui8>(level),
.Sampler = fraction,
.Throttler = TThrottlingSettings {
.MaxRatePerMinute = samplingRule.GetMaxRatePerMinute(),
.MaxBurst = samplingRule.GetMaxBurst(),
},
};
settings.SamplingRules[static_cast<size_t>(requestType)].push_back(rule);
}

for (const auto& throttlingRule : cfg.GetExternalThrottling()) {
ERequestType requestType;
if (auto parsedRequestType = GetRequestType(throttlingRule.GetScope())) {
requestType = *parsedRequestType;
} else {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "failed to parse request type in rule "
<< throttlingRule.ShortDebugString() << ". Skipping the rule");
continue;
}

if (!throttlingRule.HasMaxRatePerMinute()) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "missing required field max_rate_per_minute in rule "
<< throttlingRule.ShortDebugString() << ". Skipping the rule");
continue;
}
if (throttlingRule.GetMaxRatePerMinute() == 0) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "max_rate_per_minute should never be zero. Found in rule " << throttlingRule.GetMaxRatePerMinute()
<< ". Skipping the rule");
continue;
}

ui64 maxRatePerMinute = throttlingRule.GetMaxRatePerMinute();
ui64 maxBurst = throttlingRule.GetMaxBurst();
TExternalThrottlingRule<TThrottlingSettings> rule {
.Throttler = TThrottlingSettings {
.MaxRatePerMinute = maxRatePerMinute,
.MaxBurst = maxBurst,
},
};
auto& currentRule = settings.ExternalThrottlingRules[static_cast<size_t>(requestType)];
if (currentRule) {
ALOG_WARN(NKikimrServices::CMS_CONFIGS, "duplicate external throttling rule for scope "
<< throttlingRule.GetScope() << ". Adding the limits");
currentRule->Throttler.MaxBurst += rule.Throttler.MaxBurst;
currentRule->Throttler.MaxRatePerMinute += rule.Throttler.MaxRatePerMinute;
} else {
currentRule = rule;
}
}

// If external_throttling section is absent we want to allow all requests to be traced
if (cfg.GetExternalThrottling().empty()){
TExternalThrottlingRule<TThrottlingSettings> rule {
.Throttler = TThrottlingSettings {
.MaxRatePerMinute = Max<ui64>(),
.MaxBurst = 0,
},
};

settings.ExternalThrottlingRules[static_cast<size_t>(ERequestType::UNSPECIFIED)] = rule;
}

return settings;
}

IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg) {
return new TJaegerTracingConfigurator(std::move(tracingConfigurator), cfg);
IActor* CreateJaegerTracingConfigurator(TSamplingThrottlingConfigurator tracingConfigurator,
NKikimrConfig::TTracingConfig cfg) {
return new TJaegerTracingConfigurator(std::move(tracingConfigurator), std::move(cfg));
}

} // namespace NKikimr::NConsole
2 changes: 1 addition & 1 deletion ydb/core/cms/console/jaeger_tracing_configurator.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
namespace NKikimr::NConsole {

IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg);
NKikimrConfig::TTracingConfig cfg);

} // namespace NKikimr::NConsole
27 changes: 0 additions & 27 deletions ydb/core/jaeger_tracing/control.h

This file was deleted.

23 changes: 0 additions & 23 deletions ydb/core/jaeger_tracing/control_wrapper.h

This file was deleted.

12 changes: 5 additions & 7 deletions ydb/core/jaeger_tracing/sampler.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
#pragma once

#include "control_wrapper.h"

#include <util/random/fast.h>

namespace NKikimr::NJaegerTracing {

class TSampler {
public:
TSampler(TControlWrapper samplingPPM, ui64 seed)
: SamplingPPM(std::move(samplingPPM))
TSampler(double fraction, ui64 seed)
: SamplingFraction(fraction)
, Rng(seed)
{}

bool Sample() {
return Rng() % 1'000'000 < SamplingPPM.Get();
return Rng.GenRandReal1() < SamplingFraction;
}

private:
TControlWrapper SamplingPPM;
TReallyFastRng32 Rng;
const double SamplingFraction;
TFastRng64 Rng;
};

} // namespace NKikimr::NJaegerTracing
27 changes: 3 additions & 24 deletions ydb/core/jaeger_tracing/sampler_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,26 @@ Y_UNIT_TEST_SUITE(SamplingControlTests) {
}

Y_UNIT_TEST(Simple) {
TControlWrapper ppm(500'000);
TSampler sampler(ppm, 42);
TSampler sampler(0.5, 42);

auto samples = RunTrials(sampler, 100'000);
UNIT_ASSERT_GE(samples, 48'000);
UNIT_ASSERT_LE(samples, 52'000);
}

Y_UNIT_TEST(EdgeCaseLower) {
TControlWrapper ppm(0);
TSampler sampler(ppm, 42);
TSampler sampler(0, 42);

auto samples = RunTrials(sampler, 100'000);
UNIT_ASSERT_EQUAL(samples, 0);
}

Y_UNIT_TEST(EdgeCaseUpper) {
TControlWrapper ppm(1'000'000);
TSampler sampler(ppm, 42);
TSampler sampler(1, 42);

auto samples = RunTrials(sampler, 100'000);
UNIT_ASSERT_EQUAL(samples, 100'000);
}

Y_UNIT_TEST(ChangingControl) {
TControlWrapper ppm(250'000);
TSampler sampler(ppm, 42);

{
auto samples = RunTrials(sampler, 100'000);
UNIT_ASSERT_GE(samples, 23'000);
UNIT_ASSERT_LE(samples, 27'000);
}

ppm.Set(750'000);
{
auto samples = RunTrials(sampler, 100'000);
UNIT_ASSERT_GE(samples, 73'000);
UNIT_ASSERT_LE(samples, 77'000);
}
}
}

} // namespace NKikimr::NJaegerTracing
Loading
Loading