diff --git a/ydb/core/cms/console/jaeger_tracing_configurator.cpp b/ydb/core/cms/console/jaeger_tracing_configurator.cpp index eedb77e6fd5d..620396b00260 100644 --- a/ydb/core/cms/console/jaeger_tracing_configurator.cpp +++ b/ydb/core/cms/console/jaeger_tracing_configurator.cpp @@ -4,18 +4,21 @@ #include "console.h" #include +#include #include namespace NKikimr::NConsole { +using namespace NJaegerTracing; + class TJaegerTracingConfigurator : public TActorBootstrapped { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::JAEGER_TRACING_CONFIGURATOR; } - TJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator, - const NKikimrConfig::TTracingConfig& cfg); + TJaegerTracingConfigurator(TSamplingThrottlingConfigurator tracingConfigurator, + NKikimrConfig::TTracingConfig cfg); void Bootstrap(const TActorContext& ctx); @@ -27,26 +30,28 @@ class TJaegerTracingConfigurator : public TActorBootstrapped ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg); + void ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg); + static TMaybe GetRequestType(const NKikimrConfig::TTracingConfig::TSelectors& selectors); + static TSettings GetSettings(const NKikimrConfig::TTracingConfig& cfg); - NJaegerTracing::TSamplingThrottlingConfigurator TracingConfigurator; + TSamplingThrottlingConfigurator TracingConfigurator; + NKikimrConfig::TTracingConfig initialConfig; }; TJaegerTracingConfigurator::TJaegerTracingConfigurator( - NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator, - const NKikimrConfig::TTracingConfig& cfg) + TSamplingThrottlingConfigurator tracingConfigurator, + NKikimrConfig::TTracingConfig cfg) : TracingConfigurator(std::move(tracingConfigurator)) -{ - if (auto err = ApplyConfigs(cfg)) { - Y_ABORT_UNLESS(false, "Failed to apply initial tracing configs: %s", err->c_str()); - } -} + , initialConfig(std::move(cfg)) +{} void TJaegerTracingConfigurator::Bootstrap(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: Bootstrap"); Become(&TThis::StateWork); - LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: subscribe to config updates"); + ApplyConfigs(initialConfig); + + LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: subscribing to config updates"); ui32 item = static_cast(NKikimrConsole::TConfigItem::TracingConfigItem); ctx.Send(MakeConfigsDispatcherID(SelfId().NodeId()), new TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest(item)); @@ -55,13 +60,9 @@ void TJaegerTracingConfigurator::Bootstrap(const TActorContext& ctx) { void TJaegerTracingConfigurator::Handle(TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx) { auto& rec = ev->Get()->Record; - LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS, - "TJaegerTracingConfigurator: got new config: " - << rec.GetConfig().ShortDebugString()); + LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: got new config: " << rec.GetConfig().ShortDebugString()); - if (auto err = ApplyConfigs(rec.GetConfig().GetTracingConfig())) { - LOG_NOTICE_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: Failed to apply tracing configs: " << *err); - } + ApplyConfigs(rec.GetConfig().GetTracingConfig()); auto resp = MakeHolder(rec); LOG_TRACE_S(ctx, NKikimrServices::CMS_CONFIGS, @@ -69,13 +70,126 @@ void TJaegerTracingConfigurator::Handle(TEvConsole::TEvConfigNotificationRequest ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie); } -TMaybe TJaegerTracingConfigurator::ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg) { - return TracingConfigurator.HandleConfigs(cfg); +void TJaegerTracingConfigurator::ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg) { + auto settings = GetSettings(cfg); + return TracingConfigurator.UpdateSettings(std::move(settings)); +} + +TMaybe TJaegerTracingConfigurator::GetRequestType(const NKikimrConfig::TTracingConfig::TSelectors& selectors) { + if (!selectors.HasRequestType()) { + return ERequestType::UNSPECIFIED; + } + if (auto it = NameToRequestType.FindPtr(selectors.GetRequestType())) { + return *it; + } + return {}; +} + +TSettings TJaegerTracingConfigurator::GetSettings(const NKikimrConfig::TTracingConfig& cfg) { + TSettings settings; + + for (const auto& samplingRule : cfg.GetSampling()) { + ERequestType requestType; + if (auto parsedRequestType = GetRequestType(samplingRule.GetScope())) { + requestType = *parsedRequestType; + } else { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "failed to parse request type in the rule " + << samplingRule.ShortDebugString() << ". Skipping the rule"); + continue; + } + if (!samplingRule.HasLevel() || !samplingRule.HasFraction() || !samplingRule.HasMaxRatePerMinute()) { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "missing required fields in rule " << samplingRule.ShortDebugString() + << " (required fields are: level, fraction, max_rate_per_minute). Skipping the rule"); + continue; + } + if (samplingRule.GetMaxRatePerMinute() == 0) { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "max_rate_per_minute should never be zero. Found in rule " << samplingRule.GetMaxRatePerMinute() + << ". Skipping the rule"); + continue; + } + + ui64 level = samplingRule.GetLevel(); + double fraction = samplingRule.GetFraction(); + if (level > 15) { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "sampling level exceeds maximum allowed value (" << level + << " provided, maximum is 15). Lowering the level"); + level = 15; + } + if (fraction < 0 || fraction > 1) { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "provided fraction " << fraction + << " violated range [0; 1]. Clamping it to the range"); + fraction = std::min(1.0, std::max(0.0, fraction)); + } + + TSamplingRule rule { + .Level = static_cast(level), + .Sampler = fraction, + .Throttler = TThrottlingSettings { + .MaxRatePerMinute = samplingRule.GetMaxRatePerMinute(), + .MaxBurst = samplingRule.GetMaxBurst(), + }, + }; + settings.SamplingRules[static_cast(requestType)].push_back(rule); + } + + for (const auto& throttlingRule : cfg.GetExternalThrottling()) { + ERequestType requestType; + if (auto parsedRequestType = GetRequestType(throttlingRule.GetScope())) { + requestType = *parsedRequestType; + } else { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "failed to parse request type in rule " + << throttlingRule.ShortDebugString() << ". Skipping the rule"); + continue; + } + + if (!throttlingRule.HasMaxRatePerMinute()) { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "missing required field max_rate_per_minute in rule " + << throttlingRule.ShortDebugString() << ". Skipping the rule"); + continue; + } + if (throttlingRule.GetMaxRatePerMinute() == 0) { + ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "max_rate_per_minute should never be zero. Found in rule " << throttlingRule.GetMaxRatePerMinute() + << ". Skipping the rule"); + continue; + } + + ui64 maxRatePerMinute = throttlingRule.GetMaxRatePerMinute(); + ui64 maxBurst = throttlingRule.GetMaxBurst(); + TExternalThrottlingRule rule { + .Throttler = TThrottlingSettings { + .MaxRatePerMinute = maxRatePerMinute, + .MaxBurst = maxBurst, + }, + }; + auto& currentRule = settings.ExternalThrottlingRules[static_cast(requestType)]; + if (currentRule) { + ALOG_WARN(NKikimrServices::CMS_CONFIGS, "duplicate external throttling rule for scope " + << throttlingRule.GetScope() << ". Adding the limits"); + currentRule->Throttler.MaxBurst += rule.Throttler.MaxBurst; + currentRule->Throttler.MaxRatePerMinute += rule.Throttler.MaxRatePerMinute; + } else { + currentRule = rule; + } + } + + // If external_throttling section is absent we want to allow all requests to be traced + if (cfg.GetExternalThrottling().empty()){ + TExternalThrottlingRule rule { + .Throttler = TThrottlingSettings { + .MaxRatePerMinute = Max(), + .MaxBurst = 0, + }, + }; + + settings.ExternalThrottlingRules[static_cast(ERequestType::UNSPECIFIED)] = rule; + } + + return settings; } -IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator, - const NKikimrConfig::TTracingConfig& cfg) { - return new TJaegerTracingConfigurator(std::move(tracingConfigurator), cfg); +IActor* CreateJaegerTracingConfigurator(TSamplingThrottlingConfigurator tracingConfigurator, + NKikimrConfig::TTracingConfig cfg) { + return new TJaegerTracingConfigurator(std::move(tracingConfigurator), std::move(cfg)); } } // namespace NKikimr::NConsole diff --git a/ydb/core/cms/console/jaeger_tracing_configurator.h b/ydb/core/cms/console/jaeger_tracing_configurator.h index d7b98595b198..54b22bd0741c 100644 --- a/ydb/core/cms/console/jaeger_tracing_configurator.h +++ b/ydb/core/cms/console/jaeger_tracing_configurator.h @@ -8,6 +8,6 @@ namespace NKikimr::NConsole { IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator, - const NKikimrConfig::TTracingConfig& cfg); + NKikimrConfig::TTracingConfig cfg); } // namespace NKikimr::NConsole diff --git a/ydb/core/jaeger_tracing/control.h b/ydb/core/jaeger_tracing/control.h deleted file mode 100644 index 3b4905260310..000000000000 --- a/ydb/core/jaeger_tracing/control.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once - -#include "library/cpp/deprecated/atomic/atomic.h" - -#include - -namespace NKikimr::NJaegerTracing { - -struct TControl - : public TThrRefBase -{ - TControl(ui64 initial = 0) - : Value(initial) - {} - - ui64 Get() const { - return AtomicGet(Value); - } - - void Set(ui64 newValue) { - AtomicSet(Value, newValue); - } - - TAtomic Value; -}; - -} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/control_wrapper.h b/ydb/core/jaeger_tracing/control_wrapper.h deleted file mode 100644 index fae30b7a4da3..000000000000 --- a/ydb/core/jaeger_tracing/control_wrapper.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include "control.h" - -namespace NKikimr::NJaegerTracing { - -class TControlWrapper { -public: - explicit TControlWrapper(ui64 initial = 0) : Control(MakeIntrusive(initial)) {} - - ui64 Get() const { - return Control->Get(); - } - - void Set(ui64 newValue) { - Control->Set(newValue); - } - -private: - TIntrusivePtr Control; -}; - -} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampler.h b/ydb/core/jaeger_tracing/sampler.h index 24ba3247d87c..1adaed88f2b5 100644 --- a/ydb/core/jaeger_tracing/sampler.h +++ b/ydb/core/jaeger_tracing/sampler.h @@ -1,25 +1,23 @@ #pragma once -#include "control_wrapper.h" - #include namespace NKikimr::NJaegerTracing { class TSampler { public: - TSampler(TControlWrapper samplingPPM, ui64 seed) - : SamplingPPM(std::move(samplingPPM)) + TSampler(double fraction, ui64 seed) + : SamplingFraction(fraction) , Rng(seed) {} bool Sample() { - return Rng() % 1'000'000 < SamplingPPM.Get(); + return Rng.GenRandReal1() < SamplingFraction; } private: - TControlWrapper SamplingPPM; - TReallyFastRng32 Rng; + const double SamplingFraction; + TFastRng64 Rng; }; } // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampler_ut.cpp b/ydb/core/jaeger_tracing/sampler_ut.cpp index 5e280ba5201f..2d1cd587b462 100644 --- a/ydb/core/jaeger_tracing/sampler_ut.cpp +++ b/ydb/core/jaeger_tracing/sampler_ut.cpp @@ -16,8 +16,7 @@ Y_UNIT_TEST_SUITE(SamplingControlTests) { } Y_UNIT_TEST(Simple) { - TControlWrapper ppm(500'000); - TSampler sampler(ppm, 42); + TSampler sampler(0.5, 42); auto samples = RunTrials(sampler, 100'000); UNIT_ASSERT_GE(samples, 48'000); @@ -25,38 +24,18 @@ Y_UNIT_TEST_SUITE(SamplingControlTests) { } Y_UNIT_TEST(EdgeCaseLower) { - TControlWrapper ppm(0); - TSampler sampler(ppm, 42); + TSampler sampler(0, 42); auto samples = RunTrials(sampler, 100'000); UNIT_ASSERT_EQUAL(samples, 0); } Y_UNIT_TEST(EdgeCaseUpper) { - TControlWrapper ppm(1'000'000); - TSampler sampler(ppm, 42); + TSampler sampler(1, 42); auto samples = RunTrials(sampler, 100'000); UNIT_ASSERT_EQUAL(samples, 100'000); } - - Y_UNIT_TEST(ChangingControl) { - TControlWrapper ppm(250'000); - TSampler sampler(ppm, 42); - - { - auto samples = RunTrials(sampler, 100'000); - UNIT_ASSERT_GE(samples, 23'000); - UNIT_ASSERT_LE(samples, 27'000); - } - - ppm.Set(750'000); - { - auto samples = RunTrials(sampler, 100'000); - UNIT_ASSERT_GE(samples, 73'000); - UNIT_ASSERT_LE(samples, 77'000); - } - } } } // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampling_throttling_configurator.cpp b/ydb/core/jaeger_tracing/sampling_throttling_configurator.cpp index 24383d5adda6..856b5f30c489 100644 --- a/ydb/core/jaeger_tracing/sampling_throttling_configurator.cpp +++ b/ydb/core/jaeger_tracing/sampling_throttling_configurator.cpp @@ -1,7 +1,7 @@ -#include "sampler.h" #include "sampling_throttling_configurator.h" + #include "sampling_throttling_control.h" -#include "throttler.h" +#include "sampling_throttling_control_internals.h" #include #include @@ -9,50 +9,40 @@ namespace NKikimr::NJaegerTracing { -TIntrusivePtr TSamplingThrottlingConfigurator::GetControl() { - TSampler sampler(SamplingPPM, RandomProvider->GenRand64()); - TThrottler sampledThrottler(MaxSampledPerMinute, MaxSampledBurst, TimeProvider); - TThrottler externalThrottler(MaxExternalPerMinute, MaxExternalBurst, TimeProvider); +TSamplingThrottlingConfigurator::TSamplingThrottlingConfigurator(TIntrusivePtr timeProvider, + TIntrusivePtr& randomProvider) + : TimeProvider(std::move(timeProvider)) + , Rng(randomProvider->GenRand64()) + , CurrentSettings(GenerateThrottlers({})) +{} - return TIntrusivePtr(new TSamplingThrottlingControl(std::move(sampler), SamplingLevel, std::move(sampledThrottler), std::move(externalThrottler))); +TIntrusivePtr TSamplingThrottlingConfigurator::GetControl() { + auto control = TIntrusivePtr(new TSamplingThrottlingControl(GenerateSetup())); + IssuedControls.push_back(control); + return control; } -TMaybe TSamplingThrottlingConfigurator::HandleConfigs(const NKikimrConfig::TTracingConfig& config) { - if (config.SamplingSize() > 1) { - return "Only global scope is supported, thus no more than one sampling scope is allowed"; - } - if (config.ExternalThrottlingSize() > 1) { - return "Only global scope is supported, thus no more than one throttling scope is allowed"; - } +void TSamplingThrottlingConfigurator::UpdateSettings(TSettings settings) { + CurrentSettings = GenerateThrottlers(std::move(settings)); - if (config.SamplingSize() == 1) { - const auto& samplingConfig = config.GetSampling(0); - if (!(samplingConfig.HasFraction() && samplingConfig.HasLevel() && samplingConfig.HasMaxRatePerMinute())) { - return "At least one required field (fraction, level, max_rate_per_minute) is missing in scope " + samplingConfig.ShortDebugString(); - } - const auto samplingFraction = samplingConfig.GetFraction(); - - if (samplingFraction < 0 || samplingFraction > 1) { - return "Sampling fraction violates range [0, 1] " + ToString(samplingFraction); - } - - SamplingPPM.Set(samplingFraction * 1000000); - SamplingLevel.Set(samplingConfig.GetLevel()); - MaxSampledPerMinute.Set(samplingConfig.GetMaxRatePerMinute()); - MaxSampledBurst.Set(samplingConfig.GetMaxBurst()); + for (auto& control : IssuedControls) { + control->UpdateImpl(GenerateSetup()); } +} - if (config.ExternalThrottlingSize()) { - const auto& throttlingConfig = config.externalthrottling(0); - if (!throttlingConfig.HasMaxRatePerMinute()) { - return "max_rate_per_minute is missing in this scope: " + throttlingConfig.ShortDebugString(); - } +TSettings> TSamplingThrottlingConfigurator::GenerateThrottlers( + TSettings settings) { + return settings.MapThrottler([this](const TThrottlingSettings& settings) { + return MakeIntrusive(settings.MaxRatePerMinute, settings.MaxBurst, TimeProvider); + }); +} - MaxExternalPerMinute.Set(throttlingConfig.GetMaxRatePerMinute()); - MaxExternalBurst.Set(throttlingConfig.GetMaxBurst()); - } +std::unique_ptr TSamplingThrottlingConfigurator::GenerateSetup() { + auto setup = CurrentSettings.MapSampler([this](double fraction) { + return TSampler(fraction, Rng()); + }); - return {}; + return std::make_unique(std::move(setup)); } } // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampling_throttling_configurator.h b/ydb/core/jaeger_tracing/sampling_throttling_configurator.h index 72a2babb05d1..3e70aaf64be7 100644 --- a/ydb/core/jaeger_tracing/sampling_throttling_configurator.h +++ b/ydb/core/jaeger_tracing/sampling_throttling_configurator.h @@ -1,40 +1,39 @@ #pragma once -#include "control_wrapper.h" #include "sampling_throttling_control.h" +#include "throttler.h" +#include "settings.h" + #include #include +#include #include +#include namespace NKikimr::NJaegerTracing { class TSamplingThrottlingConfigurator { public: - TSamplingThrottlingConfigurator(TIntrusivePtr timeProvider, TIntrusivePtr randomProvider) - : SamplingLevel(15) - , TimeProvider(std::move(timeProvider)) - , RandomProvider(std::move(randomProvider)) - {} + TSamplingThrottlingConfigurator(TIntrusivePtr timeProvider, + TIntrusivePtr& randomProvider); TIntrusivePtr GetControl(); - TMaybe HandleConfigs(const NKikimrConfig::TTracingConfig& config); + void UpdateSettings(TSettings settings); private: - TControlWrapper SamplingPPM; - TControlWrapper SamplingLevel; - - TControlWrapper MaxSampledPerMinute; - TControlWrapper MaxSampledBurst; - - TControlWrapper MaxExternalPerMinute; - TControlWrapper MaxExternalBurst; + TSettings> GenerateThrottlers( + TSettings settings); + + std::unique_ptr GenerateSetup(); + TVector> IssuedControls; TIntrusivePtr TimeProvider; - TIntrusivePtr RandomProvider; + TFastRng64 Rng; + TSettings> CurrentSettings; }; } // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampling_throttling_control.cpp b/ydb/core/jaeger_tracing/sampling_throttling_control.cpp new file mode 100644 index 000000000000..584eeb54d4aa --- /dev/null +++ b/ydb/core/jaeger_tracing/sampling_throttling_control.cpp @@ -0,0 +1,28 @@ +#include "sampling_throttling_control.h" + +#include "sampling_throttling_control_internals.h" + +namespace NKikimr::NJaegerTracing { + +TSamplingThrottlingControl::TSamplingThrottlingControl(std::unique_ptr initialImpl) + : Impl(std::move(initialImpl)) +{} + +TSamplingThrottlingControl::~TSamplingThrottlingControl() { + UpdateImpl(nullptr); +} + +void TSamplingThrottlingControl::HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator) { + if (ImplUpdate.load(std::memory_order_relaxed)) { + auto newImpl = std::unique_ptr(ImplUpdate.exchange(nullptr, std::memory_order_relaxed)); + Y_ABORT_UNLESS(newImpl); + Impl = std::move(newImpl); + } + Impl->HandleTracing(traceId, discriminator); +} + +void TSamplingThrottlingControl::UpdateImpl(std::unique_ptr newImpl) { + std::unique_ptr guard(ImplUpdate.exchange(newImpl.release(), std::memory_order_relaxed)); +} + +} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampling_throttling_control.h b/ydb/core/jaeger_tracing/sampling_throttling_control.h index b9584061037e..fce17a0730dd 100644 --- a/ydb/core/jaeger_tracing/sampling_throttling_control.h +++ b/ydb/core/jaeger_tracing/sampling_throttling_control.h @@ -1,48 +1,32 @@ #pragma once -#include "control_wrapper.h" -#include "sampler.h" -#include "throttler.h" #include "request_discriminator.h" #include namespace NKikimr::NJaegerTracing { -class TSamplingThrottlingControl - : public TThrRefBase - , private TMoveOnly { +class TSamplingThrottlingControl: public TThrRefBase { friend class TSamplingThrottlingConfigurator; public: - void HandleTracing(NWilson::TTraceId& traceId, TRequestDiscriminator discriminator) { - Y_UNUSED(discriminator); - if (traceId && ExternalThrottler.Throttle()) { - traceId = {}; - } - if (!traceId && Sampler.Sample() && !SampledThrottler.Throttle()) { - traceId = NWilson::TTraceId::NewTraceId(SampledLevel.Get(), 4095); - } - } + void HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator); + + ~TSamplingThrottlingControl(); private: + struct TSamplingThrottlingImpl; + // Should only be obtained from TSamplingThrottlingConfigurator - TSamplingThrottlingControl( - TSampler sampler, - TControlWrapper sampledLevel, - TThrottler sampledThrottler, - TThrottler externalThrottler - ) - : Sampler(std::move(sampler)) - , SampledLevel(std::move(sampledLevel)) - , SampledThrottler(std::move(sampledThrottler)) - , ExternalThrottler(std::move(externalThrottler)) - {} - - TSampler Sampler; - TControlWrapper SampledLevel; - TThrottler SampledThrottler; - TThrottler ExternalThrottler; + TSamplingThrottlingControl(std::unique_ptr initialImpl); + + void UpdateImpl(std::unique_ptr newParams); + + // Exclusively owned by the only thread, that may call HandleTracing + std::unique_ptr Impl; + + // Shared between the thread calling HandleTracing and the thread calling UpdateParams + std::atomic ImplUpdate{nullptr}; }; } // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampling_throttling_control_internals.cpp b/ydb/core/jaeger_tracing/sampling_throttling_control_internals.cpp new file mode 100644 index 000000000000..a71664de466c --- /dev/null +++ b/ydb/core/jaeger_tracing/sampling_throttling_control_internals.cpp @@ -0,0 +1,56 @@ +#include "sampling_throttling_control_internals.h" + + +namespace NKikimr::NJaegerTracing { + +void TSamplingThrottlingControl::TSamplingThrottlingImpl::HandleTracing( + NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator) { + auto requestType = discriminator.RequestType; + + if (traceId) { + bool throttle = Throttle(requestType); + throttle = Throttle(ERequestType::UNSPECIFIED) && throttle; + if (throttle) { + traceId = {}; + } + } + + if (!traceId) { + TMaybe level; + if (auto sampled_level = Sample(requestType)) { + level = sampled_level; + } + if (auto sampled_level = Sample(ERequestType::UNSPECIFIED)) { + if (!level || *sampled_level > *level) { + level = sampled_level; + } + } + + if (level) { + traceId = NWilson::TTraceId::NewTraceId(*level, Max()); + } + } +} + +bool TSamplingThrottlingControl::TSamplingThrottlingImpl::Throttle(ERequestType requestType) { + auto& throttlingRule = Setup.ExternalThrottlingRules[static_cast(requestType)]; + if (throttlingRule) { + return throttlingRule->Throttler->Throttle(); + } else { + return true; + } +} + +TMaybe TSamplingThrottlingControl::TSamplingThrottlingImpl::Sample(ERequestType requestType) { + TMaybe level; + for (auto& samplingRule : Setup.SamplingRules[static_cast(requestType)]) { + if (samplingRule.Sampler.Sample() && !samplingRule.Throttler->Throttle()) { + if (!level || *level < samplingRule.Level) { + level = samplingRule.Level; + } + } + } + return level; +} + +} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/sampling_throttling_control_internals.h b/ydb/core/jaeger_tracing/sampling_throttling_control_internals.h new file mode 100644 index 000000000000..4452d978a4c5 --- /dev/null +++ b/ydb/core/jaeger_tracing/sampling_throttling_control_internals.h @@ -0,0 +1,26 @@ +#pragma once + +#include "sampler.h" +#include "throttler.h" +#include "sampling_throttling_control.h" +#include "settings.h" + +#include +#include + +#include + +namespace NKikimr::NJaegerTracing { + +struct TSamplingThrottlingControl::TSamplingThrottlingImpl { + TSettings> Setup; + + void HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator); + +private: + bool Throttle(ERequestType requestType); + + TMaybe Sample(ERequestType requestType); +}; + +} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/settings.h b/ydb/core/jaeger_tracing/settings.h new file mode 100644 index 000000000000..2731acfadd51 --- /dev/null +++ b/ydb/core/jaeger_tracing/settings.h @@ -0,0 +1,78 @@ +#pragma once + +#include "request_discriminator.h" + +#include + +#include + +namespace NKikimr::NJaegerTracing { + +struct TThrottlingSettings { + ui64 MaxRatePerMinute; + ui64 MaxBurst; +}; + +template +struct TSamplingRule { + ui8 Level; + TSampling Sampler; + TThrottling Throttler; +}; + +template +struct TExternalThrottlingRule { + TThrottling Throttler; +}; + +template +struct TSettings { + std::array, 2>, kRequestTypesCnt> SamplingRules; + std::array>, kRequestTypesCnt> ExternalThrottlingRules; + + template + auto MapSampler(TFunc&& f) const { + using TNewSamplingType = std::invoke_result_t; + + TSettings newSettings; + for (size_t i = 0; i < kRequestTypesCnt; ++i) { + for (auto& samplingRule : SamplingRules[i]) { + newSettings.SamplingRules[i].push_back(TSamplingRule { + .Level = samplingRule.Level, + .Sampler = f(samplingRule.Sampler), + .Throttler = samplingRule.Throttler, + }); + } + newSettings.ExternalThrottlingRules[i] = ExternalThrottlingRules[i]; + } + + return newSettings; + } + + template + auto MapThrottler(TFunc&& f) const { + using TNewThrottlingType = std::invoke_result_t; + + TSettings newSettings; + for (size_t i = 0; i < kRequestTypesCnt; ++i) { + for (auto& samplingRule : SamplingRules[i]) { + newSettings.SamplingRules[i].push_back(TSamplingRule { + .Level = samplingRule.Level, + .Sampler = samplingRule.Sampler, + .Throttler = f(samplingRule.Throttler), + }); + } + newSettings.ExternalThrottlingRules[i] = ExternalThrottlingRules[i].Transform( + [&f](const TExternalThrottlingRule& rule) { + return TExternalThrottlingRule { + .Throttler = f(rule.Throttler), + }; + } + ); + } + + return newSettings; + } +}; + +} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/throttler.cpp b/ydb/core/jaeger_tracing/throttler.cpp new file mode 100644 index 000000000000..5465a6624a36 --- /dev/null +++ b/ydb/core/jaeger_tracing/throttler.cpp @@ -0,0 +1,78 @@ +#include "throttler.h" + +#include + +namespace NKikimr::NJaegerTracing { + +TThrottler::TThrottler(ui64 maxRatePerMinute, ui64 maxBurst, TIntrusivePtr timeProvider) + : MaxRatePerMinute(maxRatePerMinute) + , MaxBurst(maxBurst + 1) + , BetweenSends(TDuration::Minutes(1).MicroSeconds() / MaxRatePerMinute) + , TimeProvider(std::move(timeProvider)) + , EffectiveTs(TimeProvider->Now().MicroSeconds()) +{} + +bool TThrottler::Throttle() { + auto now = TimeProvider->Now().MicroSeconds(); + auto ts = EffectiveTs.load(std::memory_order_relaxed); + auto maxFinalTs = ClampAdd(now, ClampMultiply(BetweenSends, MaxBurst)); + while (true) { + if (ts < now) { + if (EffectiveTs.compare_exchange_weak(ts, now + BetweenSends, std::memory_order_relaxed)) { + return false; + } + SpinLockPause(); + } else if (ts + BetweenSends > maxFinalTs) { + return true; + } else if (EffectiveTs.fetch_add(BetweenSends, std::memory_order_relaxed) + BetweenSends > maxFinalTs) { + EffectiveTs.fetch_sub(BetweenSends, std::memory_order_relaxed); + return true; + } else { + return false; + } + } +} + +ui64 TThrottler::ClampAdd(ui64 a, ui64 b) { +#if defined(__has_builtin) && __has_builtin(__builtin_add_overflow) + + ui64 res; + if (__builtin_add_overflow(a, b, &res)) { + return Max(); + } else { + return res; + } + +#else + + if (a > Max() - b) { + return Max(); + } + return a + b; + +#endif +} + +ui64 TThrottler::ClampMultiply(ui64 a, ui64 b) { +#if defined(__has_builtin) && __has_builtin(__builtin_mul_overflow) + + ui64 res; + if (__builtin_mul_overflow(a, b, &res)) { + return Max(); + } else { + return res; + } + +#else + + ui128 prod = a; + prod *= b; + if (prod > Max()) { + return Max(); + } + return static_cast(prod); + +#endif +} + +} // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/throttler.h b/ydb/core/jaeger_tracing/throttler.h index 7deebe9bbd77..951d1aa80a0a 100644 --- a/ydb/core/jaeger_tracing/throttler.h +++ b/ydb/core/jaeger_tracing/throttler.h @@ -1,64 +1,27 @@ #pragma once -#include "control_wrapper.h" - +#include #include namespace NKikimr::NJaegerTracing { -class TThrottler { +class TThrottler: public TThrRefBase { public: - TThrottler(TControlWrapper maxRatePerMinute, TControlWrapper maxBurst, - TIntrusivePtr timeProvider) - : TimeProvider(std::move(timeProvider)) - , MaxRatePerMinute(std::move(maxRatePerMinute)) - , MaxBurst(std::move(maxBurst)) - , LastUpdate(TimeProvider->Now()) - {} - - bool Throttle() { - auto maxRatePerMinute = MaxRatePerMinute.Get(); - auto maxBurst = MaxBurst.Get(); - auto maxTotal = maxBurst + 1; - CurrentBurst = std::min(CurrentBurst, maxTotal); - if (maxRatePerMinute == 0) { - return true; - } - - auto now = TimeProvider->Now(); - if (now < LastUpdate) { - return true; - } + TThrottler(ui64 maxRatePerMinute, ui64 maxBurst, TIntrusivePtr timeProvider); - const auto deltaBetweenSends = TDuration::Minutes(1) / maxRatePerMinute; - UpdateStats(now, deltaBetweenSends); - - if (CurrentBurst < maxTotal) { - CurrentBurst += 1; - return false; - } - - return true; - } + bool Throttle(); private: - void UpdateStats(TInstant now, TDuration deltaBetweenSends) { - ui64 decrease = (now - LastUpdate) / deltaBetweenSends; - decrease = std::min(decrease, CurrentBurst); - CurrentBurst -= decrease; - LastUpdate += decrease * deltaBetweenSends; - if (CurrentBurst == 0) { - LastUpdate = now; - } - } + static ui64 ClampAdd(ui64 a, ui64 b); + static ui64 ClampMultiply(ui64 a, ui64 b); + const ui64 MaxRatePerMinute; + const ui64 MaxBurst; + const ui64 BetweenSends; TIntrusivePtr TimeProvider; + std::atomic EffectiveTs; - TControlWrapper MaxRatePerMinute; - TControlWrapper MaxBurst; - - TInstant LastUpdate = TInstant::Zero(); - ui64 CurrentBurst = 0; + static_assert(decltype(EffectiveTs)::is_always_lock_free); }; } // namespace NKikimr::NJaegerTracing diff --git a/ydb/core/jaeger_tracing/throttler_ut.cpp b/ydb/core/jaeger_tracing/throttler_ut.cpp index 12f484de73b0..e698c86f7a21 100644 --- a/ydb/core/jaeger_tracing/throttler_ut.cpp +++ b/ydb/core/jaeger_tracing/throttler_ut.cpp @@ -33,12 +33,9 @@ Y_UNIT_TEST_SUITE(ThrottlerControlTests) { } Y_UNIT_TEST(Simple) { - TControlWrapper maxPerMinute(6); - TControlWrapper maxBurst(2); - auto timeProvider = MakeIntrusive(TInstant::Now()); - TThrottler throttler(maxPerMinute, maxBurst, timeProvider); + TThrottler throttler(6, 2, timeProvider); CheckExact(throttler, 3); CheckExact(throttler, 0); @@ -55,71 +52,37 @@ Y_UNIT_TEST_SUITE(ThrottlerControlTests) { } Y_UNIT_TEST(LongIdle) { - TControlWrapper maxPerMinute(10); - TControlWrapper maxBurst(2); - auto timeProvider = MakeIntrusive(TInstant::Now()); - TThrottler throttler(maxPerMinute, maxBurst, timeProvider); + TThrottler throttler(10, 2, timeProvider); CheckAtLeast(throttler, 3); timeProvider->Advance(TDuration::Hours(1)); CheckExact(throttler, 3); } - Y_UNIT_TEST(Overflow) { - TControlWrapper maxPerMinute(6'000); - TControlWrapper maxBurst(6'000); - + Y_UNIT_TEST(Overflow_1) { auto timeProvider = MakeIntrusive(TInstant::Now()); - TThrottler throttler(maxPerMinute, maxBurst, timeProvider); - CheckExact(throttler, 6'001); + TThrottler throttler(1'000'000'000'000'000'000, 20'000, timeProvider); + + // TODO(pumpurum): switch back to CheckExact when we figure out how to limit properly + CheckAtLeast(throttler, 20'001); timeProvider->Advance(TDuration::Days(365 * 10)); - CheckExact(throttler, 6'001); + CheckAtLeast(throttler, 20'001); } - Y_UNIT_TEST(ChangingControls) { - TControlWrapper maxPerMinute(6); - TControlWrapper maxBurst(2); - + Y_UNIT_TEST(Overflow_2) { auto timeProvider = MakeIntrusive(TInstant::Now()); - TThrottler throttler(maxPerMinute, maxBurst, timeProvider); - CheckExact(throttler, 3); - - maxBurst.Set(4); - CheckExact(throttler, 2); - - maxBurst.Set(0); - CheckExact(throttler, 0); - - timeProvider->Advance(TDuration::Seconds(9)); - CheckExact(throttler, 0); - timeProvider->Advance(TDuration::Seconds(1)); - CheckExact(throttler, 1); - - maxPerMinute.Set(12 * 60); - timeProvider->Advance(TDuration::Seconds(1)); - CheckExact(throttler, 1); - - maxBurst.Set(20); + TThrottler throttler(1'000'000'000'000'000'000, 1, timeProvider); + CheckAtLeast(throttler, 1); - timeProvider->Advance(TDuration::Seconds(3)); - CheckExact(throttler, 21); + timeProvider->Advance(TDuration::Days(365 * 10)); - maxBurst.Set(0); - timeProvider->Advance(TDuration::Seconds(59)); CheckAtLeast(throttler, 1); - maxPerMinute.Set(1); - CheckExact(throttler, 0); - timeProvider->Advance(TDuration::Minutes(1)); - CheckExact(throttler, 1); - - maxBurst.Set(2); - CheckExact(throttler, 2); } } diff --git a/ydb/core/jaeger_tracing/ya.make b/ydb/core/jaeger_tracing/ya.make index 8d5625d41606..09da909d3bb8 100644 --- a/ydb/core/jaeger_tracing/ya.make +++ b/ydb/core/jaeger_tracing/ya.make @@ -5,15 +5,18 @@ PEERDIR( ) SRCS( - control.h - control_wrapper.h request_discriminator.h request_discriminator.cpp sampler.h - sampling_throttling_configurator.h sampling_throttling_configurator.cpp + sampling_throttling_configurator.h + sampling_throttling_control.cpp sampling_throttling_control.h + sampling_throttling_control_internals.cpp + sampling_throttling_control_internals.h + settings.h throttler.h + throttler.cpp ) END() diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8ef627415748..81d28b07c190 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1543,6 +1543,7 @@ message TTracingConfig { // Only global scope for now message TSelectors { + optional string RequestType = 1; } message TSamplingScope { diff --git a/ydb/tools/cfg/static.py b/ydb/tools/cfg/static.py index e6eaa5257e91..4db276eb7c19 100644 --- a/ydb/tools/cfg/static.py +++ b/ydb/tools/cfg/static.py @@ -1122,7 +1122,13 @@ def __generate_sys_txt(self): def __generate_tracing_txt(self): def get_selectors(selectors): - return config_pb2.TTracingConfig.TSelectors() + selectors_pb = config_pb2.TTracingConfig.TSelectors() + + request_type = selectors["request_type"] + if request_type is not None: + selectors_pb.RequestType = request_type + + return selectors_pb def get_sampling_scope(sampling): sampling_scope_pb = config_pb2.TTracingConfig.TSamplingScope() diff --git a/ydb/tools/cfg/validation.py b/ydb/tools/cfg/validation.py index 552b157c5b6b..e467113b89f9 100644 --- a/ydb/tools/cfg/validation.py +++ b/ydb/tools/cfg/validation.py @@ -129,7 +129,9 @@ SELECTORS_CONFIGS = dict( type="object", - properties={}, + properties=dict( + request_type=dict(type="string"), + ), required=[], additionalProperties=False, )