diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 53a4ee27983c..e69ba68f1669 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -874,10 +874,18 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s break; } + const auto& headersProto = opentelemetry.GetHeaders(); + TMap headers; + + for (const auto& header : headersProto) { + headers.insert({header.first, header.second}); + } + NWilson::TWilsonUploaderParams uploaderParams { .CollectorUrl = opentelemetry.GetCollectorUrl(), .ServiceName = opentelemetry.GetServiceName(), .GrpcSigner = std::move(grpcSigner), + .Headers = headers, }; if (tracingConfig.HasUploader()) { @@ -901,6 +909,13 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s #undef GET_FIELD_FROM_CONFIG } + if (const auto& mon = appData->Mon) { + uploaderParams.RegisterMonPage = [mon](TActorSystem *actorSystem, const TActorId& actorId) { + NMonitoring::TIndexMonPage *actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); + mon->RegisterActorPage(actorsMonPage, "wilson_uploader", "Wilson Trace Uploader", false, actorSystem, actorId); + }; + } + wilsonUploader.reset(std::move(uploaderParams).CreateUploader()); break; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c782f69d1e72..130ba9c3f68d 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1672,6 +1672,7 @@ message TTracingConfig { message TOpentelemetryBackend { optional string CollectorUrl = 1; optional string ServiceName = 2; + map Headers = 3; } diff --git a/ydb/library/actors/wilson/wilson_uploader.cpp b/ydb/library/actors/wilson/wilson_uploader.cpp index 938f856e75dd..051e5dd20e95 100644 --- a/ydb/library/actors/wilson/wilson_uploader.cpp +++ b/ydb/library/actors/wilson/wilson_uploader.cpp @@ -108,6 +108,9 @@ namespace NWilson { TString CollectorUrl; TString ServiceName; + TMap Headers; + + TRegisterMonPageCallback RegisterMonPage; std::shared_ptr Channel; std::unique_ptr Stub; @@ -126,6 +129,9 @@ namespace NWilson { TIntrusiveListWithAutoDelete ExportRequests; size_t ExportRequestsCount = 0; + TString ErrStr; + TString LastCommitTraceErrStr; + public: TWilsonUploader(TWilsonUploaderParams params) : MaxSpansPerSecond(params.MaxExportedSpansPerSecond) @@ -136,6 +142,8 @@ namespace NWilson { , MaxExportInflight(params.MaxExportRequestsInflight) , CollectorUrl(std::move(params.CollectorUrl)) , ServiceName(std::move(params.ServiceName)) + , Headers(params.Headers) + , RegisterMonPage(params.RegisterMonPage) , GrpcSigner(std::move(params.GrpcSigner)) , CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName) {} @@ -166,11 +174,15 @@ namespace NWilson { TStringBuf host; ui16 port; if (!TryGetSchemeHostAndPort(CollectorUrl, scheme, host, port)) { - ALOG_ERROR(WILSON_SERVICE_ID, "Failed to parse collector url (" << CollectorUrl << " was provided). Wilson wouldn't work"); + ErrStr = "Failed to parse collector url (" + CollectorUrl + " was provided). Wilson wouldn't work"; + ALOG_ERROR(WILSON_SERVICE_ID, ErrStr); Become(&TThis::StateBroken); return; } else if (scheme != "grpc://" && scheme != "grpcs://") { - ALOG_ERROR(WILSON_SERVICE_ID, "Wrong scheme provided: " << scheme << " (only grpc:// and grpcs:// are supported). Wilson wouldn't work"); + TStringStream ss; + ss << "Wrong scheme provided: " << scheme << " (only grpc:// and grpcs:// are supported). Wilson wouldn't work"; + ErrStr = ss.Str(); + ALOG_ERROR(WILSON_SERVICE_ID, ErrStr); Become(&TThis::StateBroken); return; } @@ -181,6 +193,14 @@ namespace NWilson { ALOG_INFO(WILSON_SERVICE_ID, "TWilsonUploader::Bootstrap"); } + void Registered(TActorSystem* sys, const TActorId& owner) override { + TActorBootstrapped::Registered(sys, owner); + + if (const auto& mon = RegisterMonPage) { + mon(sys, SelfId()); + } + } + void Handle(TEvWilson::TPtr ev) { if (SpansSizeBytes >= MaxPendingSpanBytes) { ALOG_ERROR(WILSON_SERVICE_ID, "dropped span due to overflow"); @@ -284,6 +304,9 @@ namespace NWilson { if (GrpcSigner) { GrpcSigner->SignClientContext(*context); } + for (const auto& [key, value] : Headers) { + context->AddMetadata(key, value); + } auto reader = Stub->AsyncExport(context.get(), std::move(batch.Request), &CQ); auto uploadData = std::unique_ptr(new TExportRequestData { .Context = std::move(context), @@ -305,6 +328,8 @@ namespace NWilson { auto node = std::unique_ptr(static_cast(tag)); ALOG_TRACE(WILSON_SERVICE_ID, "finished export request " << (void*)node.get()); if (!node->Status.ok()) { + LastCommitTraceErrStr = node->Status.error_message(); + ALOG_ERROR(WILSON_SERVICE_ID, "failed to commit traces: " << node->Status.error_message()); } @@ -351,13 +376,89 @@ namespace NWilson { TryToSend(); } + void HandleHttp(NMon::TEvHttpInfo::TPtr &ev) { + TStringStream str; + str.Reserve(64 << 10); + + bool isBroken = CurrentStateFunc() == &TThis::StateBroken; + + HTML(str) { + TAG(TH4) {str << "Current state";} + PARA() { + str << (isBroken ? "Broken" : "Works"); + } + if (ErrStr) { + PARA() { + str << "Error: " << ErrStr; + } + } + if (LastCommitTraceErrStr) { + PARA() { + str << "Last commit traces error: " << LastCommitTraceErrStr; + } + } + PARA() { + str << "Current batch size: " << CurrentBatch.SizeSpans(); + } + PARA() { + str << "Current batch queue size: " << BatchQueue.size(); + } + PARA() { + std::string state; + switch (Channel->GetState(false)) { + case GRPC_CHANNEL_IDLE: + state = "GRPC_CHANNEL_IDLE"; + break; + case GRPC_CHANNEL_CONNECTING: + state = "GRPC_CHANNEL_CONNECTING"; + break; + case GRPC_CHANNEL_READY: + state = "GRPC_CHANNEL_READY"; + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + state = "GRPC_CHANNEL_TRANSIENT_FAILURE"; + break; + case GRPC_CHANNEL_SHUTDOWN: + state = "GRPC_CHANNEL_SHUTDOWN"; + break; + default: + state = "UNKNOWN_STATE"; + break; + } + str << "Channel state# " << state; + } + TAG(TH4) {str << "Config";} + PRE() { + str << "MaxPendingSpanBytes# " << MaxPendingSpanBytes << '\n'; + str << "MaxSpansPerSecond# " << MaxSpansPerSecond << '\n'; + str << "MaxSpansInBatch# " << MaxSpansInBatch << '\n'; + str << "MaxBytesInBatch# " << MaxBytesInBatch << '\n'; + str << "MaxBatchAccumulation# " << MaxBatchAccumulation << '\n'; + str << "MaxSpanTimeInQueue# " << MaxSpanTimeInQueue << '\n'; + str << "MaxExportInflight# " << MaxExportInflight << '\n'; + str << "CollectorUrl# " << CollectorUrl << '\n'; + str << "ServiceName# " << ServiceName << '\n'; + str << "Headers# " << '\n'; + for (const auto& [key, value] : Headers) { + str << '\t' << key << ": " << value << '\n'; + } + } + } + + auto* result = new NMon::TEvHttpInfoRes(str.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Html); + + Send(ev->Sender, result); + } + STRICT_STFUNC(StateWork, hFunc(TEvWilson, Handle); hFunc(TEvents::TEvWakeup, HandleWakeup); + hFunc(NMon::TEvHttpInfo, HandleHttp); ); STRICT_STFUNC(StateBroken, IgnoreFunc(TEvWilson); + hFunc(NMon::TEvHttpInfo, HandleHttp); ); }; diff --git a/ydb/library/actors/wilson/wilson_uploader.h b/ydb/library/actors/wilson/wilson_uploader.h index 3ba1f6109d4b..09703cb59b7b 100644 --- a/ydb/library/actors/wilson/wilson_uploader.h +++ b/ydb/library/actors/wilson/wilson_uploader.h @@ -25,10 +25,13 @@ namespace NWilson { return NActors::TActorId(0, TStringBuf("WilsonUpload", 12)); } + using TRegisterMonPageCallback = std::function; + struct TWilsonUploaderParams { TString CollectorUrl; TString ServiceName; std::unique_ptr GrpcSigner; + TMap Headers; ui64 MaxExportedSpansPerSecond = Max(); ui64 MaxSpansInBatch = 150; @@ -37,6 +40,8 @@ namespace NWilson { ui32 SpanExportTimeoutSeconds = 60 * 60 * 24 * 365; ui64 MaxExportRequestsInflight = 1; + TRegisterMonPageCallback RegisterMonPage; + NActors::IActor* CreateUploader() &&; };