diff --git a/ydb/core/cms/json_proxy_proto.h b/ydb/core/cms/json_proxy_proto.h index a422cbe8292e..4844919de311 100644 --- a/ydb/core/cms/json_proxy_proto.h +++ b/ydb/core/cms/json_proxy_proto.h @@ -76,6 +76,16 @@ class TJsonProxyProto : public TActorBootstrapped { return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TSchemeShardControls::descriptor(), ctx); else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTCMallocControls") return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTCMallocControls::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TSamplingThrottlingOptions") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TSamplingThrottlingOptions.TThrottlingOptions") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TThrottlingOptions::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TSamplingThrottlingOptions.TSamplingOptions") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TSamplingOptions::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTracingControls.TKeyValue") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTracingControls::TKeyValue::descriptor(), ctx); } ctx.Send(RequestEvent->Sender, diff --git a/ydb/core/control/common_controls/tracing_control.cpp b/ydb/core/control/common_controls/tracing_control.cpp new file mode 100644 index 000000000000..536738dd289f --- /dev/null +++ b/ydb/core/control/common_controls/tracing_control.cpp @@ -0,0 +1,62 @@ +#include "tracing_control.h" + +#include +#include +#include + +namespace NKikimr { + +namespace { + +const NKikimrConfig::TImmediateControlOptions& GetImmediateControlOptionsForField( + const google::protobuf::Descriptor& descriptor, TString fieldName) { + auto field = descriptor.FindFieldByName(fieldName); + Y_ABORT_UNLESS(field); + auto& fieldOptions = field->options(); + return fieldOptions.GetExtension(NKikimrConfig::ControlOptions); +} + +TThrottler CreateThrottler(TIntrusivePtr& icb, TIntrusivePtr timeProvider, TString domain) { + TControlWrapper maxRatePerMinute; + TControlWrapper maxBurst; + + const std::array, 2> controls = {{ + {maxRatePerMinute, "MaxRatePerMinute"}, + {maxBurst, "MaxBurst"}, + }}; + const auto& throttlingOptions = *NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TThrottlingOptions::descriptor(); + for (auto& [control, fieldName] : controls) { + const auto& controlOptions = GetImmediateControlOptionsForField(throttlingOptions, TString(fieldName)); + + control.Reset(controlOptions.GetDefaultValue(), controlOptions.GetMinValue(), controlOptions.GetMaxValue()); + icb->RegisterSharedControl(control, domain + "." + fieldName); + } + + return TThrottler(std::move(maxRatePerMinute), std::move(maxBurst), std::move(timeProvider)); +} + +} + +TTracingControl::TTracingControl(TIntrusivePtr& icb, TIntrusivePtr timeProvider, + TIntrusivePtr& randomProvider, TString controlDomain) +{ + SampledThrottler = CreateThrottler(icb, timeProvider, controlDomain + ".SampledThrottling"); + ExternalThrottler = CreateThrottler(icb, timeProvider, controlDomain + ".ExternalThrottling"); + + TControlWrapper samplingPPM; + const std::array, 2> controls = {{ + {samplingPPM, "PPM"}, + {SampledLevel, "Level"}, + }}; + + const auto& samplingOptions = *NKikimrConfig::TImmediateControlsConfig::TTracingControls::TSamplingThrottlingOptions::TSamplingOptions::descriptor(); + for (auto [control, name] : controls) { + const auto& controlOptions = GetImmediateControlOptionsForField(samplingOptions, TString(name)); + control.Reset(controlOptions.GetDefaultValue(), controlOptions.GetMinValue(), controlOptions.GetMaxValue()); + icb->RegisterSharedControl(control, controlDomain + ".Sampling." + name); + } + + Sampler = TSampler(std::move(samplingPPM), randomProvider->GenRand64()); +} + +} // namespace NKikimr diff --git a/ydb/core/control/common_controls/tracing_control.h b/ydb/core/control/common_controls/tracing_control.h new file mode 100644 index 000000000000..56b7f45966da --- /dev/null +++ b/ydb/core/control/common_controls/tracing_control.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr { + +class TTracingControl { +public: + TTracingControl(TIntrusivePtr& icb, TIntrusivePtr timeProvider, + TIntrusivePtr& randomProvider, TString controlDomain); + + bool SampleThrottle() { + return Sampler.Sample() && !SampledThrottler.Throttle(); + } + + bool ThrottleExternal() { + return ExternalThrottler.Throttle(); + } + + ui8 SampledVerbosity() const { + return SampledLevel; + } + +private: + TSampler Sampler; + TThrottler SampledThrottler; + TThrottler ExternalThrottler; + TControlWrapper SampledLevel; +}; + +} // namespace NKikimr diff --git a/ydb/core/control/common_controls/ya.make b/ydb/core/control/common_controls/ya.make new file mode 100644 index 000000000000..afc6df1f79d2 --- /dev/null +++ b/ydb/core/control/common_controls/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +PEERDIR( + ydb/library/actors/wilson + ydb/core/protos +) + +SRCS( + tracing_control.h + tracing_control.cpp +) + +END() diff --git a/ydb/core/control/immediate_control_board_sampler.h b/ydb/core/control/immediate_control_board_sampler.h new file mode 100644 index 000000000000..e6d6784540b7 --- /dev/null +++ b/ydb/core/control/immediate_control_board_sampler.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace NKikimr { + +class TSampler { +public: + TSampler() : Rng(0) {} + + TSampler(TControlWrapper samplingPPM, ui64 seed) + : SamplingPPM(std::move(samplingPPM)) + , Rng(seed) + {} + + bool Sample() { + return Rng() % 1'000'000 < SamplingPPM; + } + +private: + TControlWrapper SamplingPPM; + TReallyFastRng32 Rng; +}; + +} // namespace NKikimr diff --git a/ydb/core/control/immediate_control_board_throttler.h b/ydb/core/control/immediate_control_board_throttler.h new file mode 100644 index 000000000000..c3ee83bf0f43 --- /dev/null +++ b/ydb/core/control/immediate_control_board_throttler.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include + +namespace NKikimr { + +class TThrottler { +public: + TThrottler() = default; + + TThrottler(TControlWrapper maxRatePerMinute, TControlWrapper maxBurst, + TIntrusivePtr timeProvider) + : TimeProvider(std::move(timeProvider)) + , MaxRatePerMinute(std::move(maxRatePerMinute)) + , MaxBurst(std::move(maxBurst)) + , LastUpdate(TimeProvider->Now()) + {} + + bool Throttle() { + auto maxRatePerMinute = static_cast(MaxRatePerMinute); + auto maxBurst = static_cast(MaxBurst); + auto maxTotal = maxBurst + 1; + CurrentBurst = std::min(CurrentBurst, maxTotal); + if (maxRatePerMinute == 0) { + return true; + } + + auto now = TimeProvider->Now(); + if (now < LastUpdate) { + return true; + } + + const auto deltaBetweenSends = TDuration::Minutes(1) / maxRatePerMinute; + UpdateStats(now, deltaBetweenSends); + + if (CurrentBurst < maxTotal) { + CurrentBurst += 1; + return false; + } + + return true; + } + +private: + void UpdateStats(TInstant now, TDuration deltaBetweenSends) { + i64 decrease = (now - LastUpdate) / deltaBetweenSends; + decrease = std::min(decrease, CurrentBurst); + Y_ABORT_UNLESS(decrease >= 0); + CurrentBurst -= decrease; + LastUpdate += decrease * deltaBetweenSends; + if (CurrentBurst == 0) { + LastUpdate = now; + } + } + + TIntrusivePtr TimeProvider; + + TControlWrapper MaxRatePerMinute; + TControlWrapper MaxBurst; + + TInstant LastUpdate = TInstant::Zero(); + i64 CurrentBurst = 0; +}; + +} // namespace NKikimr diff --git a/ydb/core/control/ya.make b/ydb/core/control/ya.make index 8c1c83bff961..4faca2369cf5 100644 --- a/ydb/core/control/ya.make +++ b/ydb/core/control/ya.make @@ -19,10 +19,16 @@ SRCS( immediate_control_board_impl.cpp immediate_control_board_impl.h immediate_control_board_wrapper.h + immediate_control_board_throttler.h + immediate_control_board_sampler.h ) END() +RECURSE( + common_controls +) + RECURSE_FOR_TESTS( ut ) diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index f327d04ad7c5..38da10016654 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1619,7 +1619,7 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se for (size_t i = 0; i < proxyCount; ++i) { auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck() ? NGRpcService::CreateGRpcRequestProxySimple(Config) - : NGRpcService::CreateGRpcRequestProxy(Config); + : NGRpcService::CreateGRpcRequestProxy(Config, appData->Icb); setup->LocalServices.push_back(std::pair(NGRpcService::CreateGRpcRequestProxyId(i), TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled, diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index bd5ae1bc00b6..12272ce9af6c 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -367,6 +367,9 @@ class IRequestProxyCtx : public virtual IRequestCtxBase { virtual void StartTracing(NWilson::TSpan&& span) = 0; virtual void LegacyFinishSpan() = 0; + // Used for per-type sampling + virtual const TString& GetInternalRequestType() const = 0; + // validation virtual bool Validate(TString& error) = 0; @@ -485,6 +488,10 @@ class TRefreshTokenImpl void StartTracing(NWilson::TSpan&& /*span*/) override {} void LegacyFinishSpan() override {} + const TString& GetInternalRequestType() const final { + static const TString empty = ""; + return empty; + } void UpdateAuthState(NYdbGrpc::TAuthState::EAuthState state) override { State_.State = state; @@ -890,6 +897,10 @@ class TGRpcRequestBiStreamWrapper Span_.End(); } + const TString& GetInternalRequestType() const final { + return TRequest::descriptor()->full_name(); + } + // IRequestCtxBase // void AddAuditLogPart(const TStringBuf&, const TString&) override { @@ -1302,6 +1313,10 @@ class TGRpcRequestWrapperImpl void LegacyFinishSpan() override {} + const TString& GetInternalRequestType() const final { + return TRequest::descriptor()->full_name(); + } + void ReplyGrpcError(grpc::StatusCode code, const TString& msg, const TString& details = "") { Ctx_->ReplyError(code, msg, details); } diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 3ef705d9954f..3e44414f5ded 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -60,8 +61,9 @@ class TGRpcRequestProxyImpl { using TBase = TActorBootstrapped; public: - explicit TGRpcRequestProxyImpl(const NKikimrConfig::TAppConfig& appConfig) + explicit TGRpcRequestProxyImpl(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr icb) : ChannelBufferSize(appConfig.GetTableServiceConfig().GetResourceManager().GetChannelBufferSize()) + , Icb(std::move(icb)) { } void Bootstrap(const TActorContext& ctx); @@ -80,7 +82,9 @@ class TGRpcRequestProxyImpl void HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev); void ReplayEvents(const TString& databaseName, const TActorContext& ctx); - void StartTracing(IRequestProxyCtx& ctx); + static TString InternalRequestTypeToControlDomain(const TString& type); + TTracingControl& GetTracingControl(const TString& type); + void MaybeStartTracing(IRequestProxyCtx& ctx); static bool IsAuthStateOK(const IRequestProxyCtx& ctx); @@ -151,7 +155,7 @@ class TGRpcRequestProxyImpl } - StartTracing(*requestBaseCtx); + MaybeStartTracing(*requestBaseCtx); if (IsAuthStateOK(*requestBaseCtx)) { Handle(event, ctx); @@ -311,6 +315,8 @@ class TGRpcRequestProxyImpl bool DynamicNode = false; TString RootDatabase; IGRpcProxyCounters::TPtr Counters; + THashMap TracingControls; + TIntrusivePtr Icb; }; void TGRpcRequestProxyImpl::Bootstrap(const TActorContext& ctx) { @@ -409,12 +415,52 @@ bool TGRpcRequestProxyImpl::IsAuthStateOK(const IRequestProxyCtx& ctx) { state.NeedAuth == false && !ctx.GetYdbToken(); } -void TGRpcRequestProxyImpl::StartTracing(IRequestProxyCtx& ctx) { +TString TGRpcRequestProxyImpl::InternalRequestTypeToControlDomain(const TString& type) { + static constexpr TStringBuf ydbNamespacePrefix = "Ydb."; + static constexpr TStringBuf requestSuffix = "Request"; + + TString controlDomain = type; + if (controlDomain.StartsWith(ydbNamespacePrefix)) { + controlDomain.erase(0, ydbNamespacePrefix.size()); + } + if (controlDomain.EndsWith(requestSuffix)) { + controlDomain.erase(controlDomain.size() - requestSuffix.size()); + } + + return controlDomain; +} + +TTracingControl& TGRpcRequestProxyImpl::GetTracingControl(const TString& type) { + if (auto it = TracingControls.find(type); it != TracingControls.end()) { + return it->second; + } + auto tracingControlsDomain = InternalRequestTypeToControlDomain(type); + auto domain = TString::Join("TracingControls.", tracingControlsDomain); + TTracingControl control(Icb, TAppData::TimeProvider, TAppData::RandomProvider, std::move(domain)); + return TracingControls.emplace(type, std::move(control)).first->second; +} + +void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) { + auto requestType = ctx.GetInternalRequestType(); + if (requestType.empty()) { + return; + } + NWilson::TTraceId traceId; if (const auto otelHeader = ctx.GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER)) { - if (auto traceId = NWilson::TTraceId::FromTraceparentHeader(otelHeader.GetRef())) { - NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy"); - ctx.StartTracing(std::move(grpcRequestProxySpan)); - } + traceId = NWilson::TTraceId::FromTraceparentHeader(otelHeader.GetRef()); + } + auto& control = GetTracingControl(requestType); + if (traceId && control.ThrottleExternal()) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Dropping external traceId " << traceId.GetHexTraceId() << " for request type " << requestType); + traceId = {}; + } + if (!traceId && control.SampleThrottle()) { + traceId = NWilson::TTraceId::NewTraceId(control.SampledVerbosity(), 4095); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Created new traceId " << traceId.GetHexTraceId() << " for request type " << requestType); + } + if (traceId) { + NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy"); + ctx.StartTracing(std::move(grpcRequestProxySpan)); } } @@ -583,8 +629,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr& ev) { } } -IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig) { - return new TGRpcRequestProxyImpl(appConfig); +IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr icb) { + return new TGRpcRequestProxyImpl(appConfig, std::move(icb)); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index d2eb347c979b..b4eedb51c5fc 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -5,6 +5,8 @@ #include "grpc_request_proxy_handle_methods.h" +#include + #include #include @@ -21,7 +23,7 @@ struct TAppData; namespace NGRpcService { TString DatabaseFromDomain(const TAppData* appdata); -IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig); +IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr icb); IActor* CreateGRpcRequestProxySimple(const NKikimrConfig::TAppConfig& appConfig); class TGRpcRequestProxy : public TGRpcRequestProxyHandleMethods, public IFacilityProvider { diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 0e200ad08412..2616d4229d05 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -7,6 +7,7 @@ #include "keyvalue_simple_db.h" #include "keyvalue_simple_db_flat.h" #include "keyvalue_state.h" + #include #include #include diff --git a/ydb/core/keyvalue/ya.make b/ydb/core/keyvalue/ya.make index 3a014e31316d..740eae043d01 100644 --- a/ydb/core/keyvalue/ya.make +++ b/ydb/core/keyvalue/ya.make @@ -45,6 +45,7 @@ PEERDIR( ydb/library/actors/protos ydb/core/base ydb/core/blobstorage/base + ydb/core/control/common_controls ydb/core/engine/minikql ydb/core/keyvalue/protos ydb/core/protos diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index d9d1c9d377ab..46aa0dacfc34 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1245,11 +1245,61 @@ message TImmediateControlsConfig { DefaultValue: 8388608 }]; } + message TTracingControls { + message TSamplingThrottlingOptions { + message TThrottlingOptions { + optional uint64 MaxRatePerMinute = 1 [(ControlOptions) = { + Description: "Maximum amount of traced requests per minute", + MinValue: 0, + MaxValue: 300, + DefaultValue: 0, + }]; + optional uint64 MaxBurst = 2 [(ControlOptions) = { + Description: "Maximum burst of traced events", + MinValue: 0, + MaxValue: 300, + DefaultValue: 0, + }]; + } + + message TSamplingOptions { + optional uint64 PPM = 1 [(ControlOptions) = { + Description: "Average amount of sampled requests per one million", + MinValue: 0, + MaxValue: 1000000, + DefaultValue: 0, + }]; + optional uint64 Level = 2 [(ControlOptions) = { + Description: "Tracing level of sampled requests", + MinValue: 1, + MaxValue: 15, + DefaultValue: 15, + }]; + } + + optional TSamplingOptions Sampling = 1; + optional TThrottlingOptions SampledThrottling = 2; + optional TThrottlingOptions ExternalThrottling = 3; + } + + message TKeyValue { + optional TSamplingThrottlingOptions AcquireLock = 1; + optional TSamplingThrottlingOptions ExecuteTransaction = 2; + optional TSamplingThrottlingOptions Read = 3; + optional TSamplingThrottlingOptions ReadRange = 4; + optional TSamplingThrottlingOptions ListRange = 5; + optional TSamplingThrottlingOptions GetStorageChannelStatus = 6; + } + + optional TKeyValue KeyValue = 1; + } + optional TDataShardControls DataShardControls = 1; optional TTxLimitControls TxLimitControls = 2; optional TCoordinatorControls CoordinatorControls = 3; optional TSchemeShardControls SchemeShardControls = 4; optional TTCMallocControls TCMallocControls = 5; + optional TTracingControls TracingControls = 6; }; message TMeteringConfig { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 6ef5ad171ce5..58b5e29299b8 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -307,8 +307,11 @@ namespace Tests { const size_t proxyCount = Max(ui32{1}, Settings->AppConfig->GetGRpcConfig().GetGRpcProxyCount()); TVector grpcRequestProxies; grpcRequestProxies.reserve(proxyCount); + + auto& appData = Runtime->GetAppData(); + for (size_t i = 0; i < proxyCount; ++i) { - auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(*Settings->AppConfig); + auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(*Settings->AppConfig, appData.Icb); auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled); system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId); grpcRequestProxies.push_back(grpcRequestProxyId); @@ -320,8 +323,6 @@ namespace Tests { GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); auto& counters = GRpcServerRootCounters; - auto& appData = Runtime->GetAppData(); - // Setup discovery for typically used services on the node { TIntrusivePtr desc = new NGRpcService::TGrpcEndpointDescription();