diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h index b2ee874d26e8..e309fda49a0f 100644 --- a/ydb/core/driver_lib/run/factories.h +++ b/ydb/core/driver_lib/run/factories.h @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -55,6 +56,8 @@ struct TModuleFactories { std::shared_ptr DataStreamsAuthFactory; std::vector AdditionalComputationNodeFactories; + std::unique_ptr(*WilsonGrpcSignerFactory)(const NKikimrConfig::TTracingConfig::TAuthConfig&); + ~TModuleFactories(); }; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 738ce4196258..0e2c31332004 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -370,8 +370,9 @@ static bool IsServiceInitialized(NActors::TActorSystemSetup* setup, TActorId ser return false; } -TBasicServicesInitializer::TBasicServicesInitializer(const TKikimrRunConfig& runConfig) +TBasicServicesInitializer::TBasicServicesInitializer(const TKikimrRunConfig& runConfig, std::shared_ptr factories) : IKikimrServicesInitializer(runConfig) + , Factories(std::move(factories)) { } @@ -827,10 +828,20 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s if (Config.HasTracingConfig()) { const auto& tracing = Config.GetTracingConfig(); + std::unique_ptr grpcSigner; + if (tracing.HasAuthConfig() && Factories && Factories->WilsonGrpcSignerFactory) { + grpcSigner = Factories->WilsonGrpcSignerFactory(tracing.GetAuthConfig()); + } + auto wilsonUploader = NWilson::WilsonUploaderParams { + .Host = tracing.GetHost(), + .Port = static_cast(tracing.GetPort()), + .RootCA = tracing.GetRootCA(), + .ServiceName = tracing.GetServiceName(), + .GrpcSigner = std::move(grpcSigner), + }.CreateUploader(); setup->LocalServices.emplace_back( NWilson::MakeWilsonUploaderId(), - TActorSetupCmd(NWilson::CreateWilsonUploader(tracing.GetHost(), tracing.GetPort(), tracing.GetRootCA(), tracing.GetServiceName()), - TMailboxType::ReadAsFilled, appData->BatchPoolId)); + TActorSetupCmd(wilsonUploader, TMailboxType::ReadAsFilled, appData->BatchPoolId)); } } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index a291b4383064..2b6cd37f7c6d 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -49,8 +49,10 @@ class TBasicServicesInitializer : public IKikimrServicesInitializer { static ISchedulerThread* CreateScheduler(const NKikimrConfig::TActorSystemConfig::TScheduler &config); + std::shared_ptr Factories; + public: - TBasicServicesInitializer(const TKikimrRunConfig& runConfig); + TBasicServicesInitializer(const TKikimrRunConfig& runConfig, std::shared_ptr factories); void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override; }; diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index b656afb5608a..160affb9d238 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1375,7 +1375,7 @@ TIntrusivePtr TKikimrRunner::CreateServiceInitializers } if (serviceMask.EnableBasicServices) { - sil->AddServiceInitializer(new TBasicServicesInitializer(runConfig)); + sil->AddServiceInitializer(new TBasicServicesInitializer(runConfig, ModuleFactories)); } if (serviceMask.EnableIcbService) { sil->AddServiceInitializer(new TImmediateControlBoardInitializer(runConfig)); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8b16de18701c..91b546f145fe 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1517,10 +1517,33 @@ message TCompactionConfig { } message TTracingConfig { + message TAuthConfig { + message TTvm { + optional string Host = 1; + optional uint32 Port = 2; + + required uint32 SelfTvmId = 3; + required uint32 TracingTvmId = 4; + + optional string DiskCacheDir = 5; + + oneof Secret { + string PlainTextSecret = 6; + string SecretFile = 7; + string SecretEnvironmentVariable = 8; + } + } + + oneof Method { + TTvm Tvm = 1; + } + } + optional string Host = 1; optional uint32 Port = 2; optional string RootCA = 3; optional string ServiceName = 4; + optional TAuthConfig AuthConfig = 5; } message TFailureInjectionConfig { diff --git a/ydb/library/actors/wilson/wilson_uploader.cpp b/ydb/library/actors/wilson/wilson_uploader.cpp index 24063fe625a6..2e604e48a1af 100644 --- a/ydb/library/actors/wilson/wilson_uploader.cpp +++ b/ydb/library/actors/wilson/wilson_uploader.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include namespace NWilson { @@ -32,6 +31,7 @@ namespace NWilson { std::unique_ptr Stub; grpc::CompletionQueue CQ; + std::unique_ptr GrpcSigner; std::unique_ptr Context; std::unique_ptr> Reader; NServiceProto::ExportTraceServiceResponse Response; @@ -53,11 +53,12 @@ namespace NWilson { bool WakeupScheduled = false; public: - TWilsonUploader(TString host, ui16 port, TString rootCA, TString serviceName) - : Host(std::move(host)) - , Port(std::move(port)) - , RootCA(std::move(rootCA)) - , ServiceName(std::move(serviceName)) + TWilsonUploader(WilsonUploaderParams params) + : Host(std::move(params.Host)) + , Port(std::move(params.Port)) + , RootCA(std::move(params.RootCA)) + , ServiceName(std::move(params.ServiceName)) + , GrpcSigner(std::move(params.GrpcSigner)) {} ~TWilsonUploader() { @@ -142,6 +143,9 @@ namespace NWilson { ScheduleWakeup(NextSendTimestamp); Context = std::make_unique(); + if (GrpcSigner) { + GrpcSigner->SignClientContext(*Context); + } Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ); Reader->Finish(&Response, &Status, nullptr); } @@ -192,8 +196,12 @@ namespace NWilson { } // anonymous - IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA, TString serviceName) { - return new TWilsonUploader(std::move(host), port, std::move(rootCA), std::move(serviceName)); + IActor* CreateWilsonUploader(WilsonUploaderParams params) { + return new TWilsonUploader(std::move(params)); + } + + IActor* WilsonUploaderParams::CreateUploader() && { + return CreateWilsonUploader(std::move(*this)); } } // NWilson diff --git a/ydb/library/actors/wilson/wilson_uploader.h b/ydb/library/actors/wilson/wilson_uploader.h index 680f30f1dced..be1cbcac4e7a 100644 --- a/ydb/library/actors/wilson/wilson_uploader.h +++ b/ydb/library/actors/wilson/wilson_uploader.h @@ -4,8 +4,14 @@ #include #include #include +#include namespace NWilson { + struct IGrpcSigner { + virtual void SignClientContext(grpc::ClientContext& context) = 0; + + virtual ~IGrpcSigner() = default; + }; struct TEvWilson : NActors::TEventLocal { opentelemetry::proto::trace::v1::Span Span; @@ -19,6 +25,16 @@ namespace NWilson { return NActors::TActorId(0, TStringBuf("WilsonUpload", 12)); } - NActors::IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA, TString serviceName); + struct WilsonUploaderParams { + TString Host; + ui16 Port; + TString RootCA; + TString ServiceName; + std::unique_ptr GrpcSigner; + + NActors::IActor* CreateUploader() &&; + }; + + NActors::IActor* CreateWilsonUploader(WilsonUploaderParams params); } // NWilson diff --git a/ydb/tools/cfg/static.py b/ydb/tools/cfg/static.py index 09cb9bbba132..84bf540891da 100644 --- a/ydb/tools/cfg/static.py +++ b/ydb/tools/cfg/static.py @@ -114,11 +114,13 @@ def __init__( ) self._enable_cms_config_cache = template.get("enable_cms_config_cache", enable_cms_config_cache) if "tracing" in template: + tracing = template["tracing"] self.__tracing = ( - template["tracing"]["host"], - template["tracing"]["port"], - template["tracing"]["root_ca"], - template["tracing"]["service_name"], + tracing["host"], + tracing["port"], + tracing["root_ca"], + tracing["service_name"], + tracing.get("auth_config") ) else: self.__tracing = None @@ -1121,12 +1123,36 @@ def __generate_sys_txt(self): def __generate_tracing_txt(self): pb = config_pb2.TAppConfig() if self.__tracing: + tracing_pb = pb.TracingConfig ( - pb.TracingConfig.Host, - pb.TracingConfig.Port, - pb.TracingConfig.RootCA, - pb.TracingConfig.ServiceName, + tracing_pb.Host, + tracing_pb.Port, + tracing_pb.RootCA, + tracing_pb.ServiceName, + auth_config ) = self.__tracing + + if auth_config: + auth_pb = tracing_pb.AuthConfig + if "tvm" in auth_config: + tvm = auth_config.get("tvm") + tvm_pb = auth_pb.Tvm + + if "host" in tvm: + tvm_pb.Host = tvm["host"] + if "port" in tvm: + tvm_pb.Port = tvm["port"] + tvm_pb.SelfTvmId = tvm["self_tvm_id"] + tvm_pb.TracingTvmId = tvm["tracing_tvm_id"] + tvm_pb.DiskCacheDir = tvm["disk_cache_dir"] + + if "plain_text_secret" in tvm: + tvm_pb.PlainTextSecret = tvm["plain_text_secret"] + elif "secret_file" in tvm: + tvm_pb.SecretFile = tvm["secret_file"] + elif "secret_environment_variable" in tvm: + tvm_pb.SecretEnvironmentVariable = tvm["secret_environment_variable"] + self.__proto_configs["tracing.txt"] = pb def __generate_sys_txt_advanced(self): diff --git a/ydb/tools/cfg/validation.py b/ydb/tools/cfg/validation.py index 8f385718b803..4e17e4ac78b2 100644 --- a/ydb/tools/cfg/validation.py +++ b/ydb/tools/cfg/validation.py @@ -133,6 +133,7 @@ port=dict(type="integer"), root_ca=dict(type="string"), service_name=dict(type="string"), + auth_config=dict(type="object"), ), required=[ "host",