Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,10 +874,18 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
break;
}

const auto& headersProto = opentelemetry.GetHeaders();
TMap<TString, TString> headers;

for (const auto& header : headersProto) {
headers.insert({header.first, header.second});
}

NWilson::TWilsonUploaderParams uploaderParams {
.CollectorUrl = opentelemetry.GetCollectorUrl(),
.ServiceName = opentelemetry.GetServiceName(),
.GrpcSigner = std::move(grpcSigner),
.Headers = headers,
};

if (tracingConfig.HasUploader()) {
Expand All @@ -901,6 +909,13 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
#undef GET_FIELD_FROM_CONFIG
}

if (const auto& mon = appData->Mon) {
uploaderParams.RegisterMonPage = [mon](TActorSystem *actorSystem, const TActorId& actorId) {
NMonitoring::TIndexMonPage *actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
mon->RegisterActorPage(actorsMonPage, "wilson_uploader", "Wilson Trace Uploader", false, actorSystem, actorId);
};
}

wilsonUploader.reset(std::move(uploaderParams).CreateUploader());
break;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,7 @@ message TTracingConfig {
message TOpentelemetryBackend {
optional string CollectorUrl = 1;
optional string ServiceName = 2;
map<string, string> Headers = 3;
}


Expand Down
105 changes: 103 additions & 2 deletions ydb/library/actors/wilson/wilson_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ namespace NWilson {

TString CollectorUrl;
TString ServiceName;
TMap<TString, TString> Headers;

TRegisterMonPageCallback RegisterMonPage;

std::shared_ptr<grpc::Channel> Channel;
std::unique_ptr<NServiceProto::TraceService::Stub> Stub;
Expand All @@ -126,6 +129,9 @@ namespace NWilson {
TIntrusiveListWithAutoDelete<TExportRequestData, TDelete> ExportRequests;
size_t ExportRequestsCount = 0;

TString ErrStr;
TString LastCommitTraceErrStr;

public:
TWilsonUploader(TWilsonUploaderParams params)
: MaxSpansPerSecond(params.MaxExportedSpansPerSecond)
Expand All @@ -136,6 +142,8 @@ namespace NWilson {
, MaxExportInflight(params.MaxExportRequestsInflight)
, CollectorUrl(std::move(params.CollectorUrl))
, ServiceName(std::move(params.ServiceName))
, Headers(params.Headers)
, RegisterMonPage(params.RegisterMonPage)
, GrpcSigner(std::move(params.GrpcSigner))
, CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
{}
Expand Down Expand Up @@ -166,11 +174,15 @@ namespace NWilson {
TStringBuf host;
ui16 port;
if (!TryGetSchemeHostAndPort(CollectorUrl, scheme, host, port)) {
ALOG_ERROR(WILSON_SERVICE_ID, "Failed to parse collector url (" << CollectorUrl << " was provided). Wilson wouldn't work");
ErrStr = "Failed to parse collector url (" + CollectorUrl + " was provided). Wilson wouldn't work";
ALOG_ERROR(WILSON_SERVICE_ID, ErrStr);
Become(&TThis::StateBroken);
return;
} else if (scheme != "grpc://" && scheme != "grpcs://") {
ALOG_ERROR(WILSON_SERVICE_ID, "Wrong scheme provided: " << scheme << " (only grpc:// and grpcs:// are supported). Wilson wouldn't work");
TStringStream ss;
ss << "Wrong scheme provided: " << scheme << " (only grpc:// and grpcs:// are supported). Wilson wouldn't work";
ErrStr = ss.Str();
ALOG_ERROR(WILSON_SERVICE_ID, ErrStr);
Become(&TThis::StateBroken);
return;
}
Expand All @@ -181,6 +193,14 @@ namespace NWilson {
ALOG_INFO(WILSON_SERVICE_ID, "TWilsonUploader::Bootstrap");
}

void Registered(TActorSystem* sys, const TActorId& owner) override {
TActorBootstrapped<TWilsonUploader>::Registered(sys, owner);

if (const auto& mon = RegisterMonPage) {
mon(sys, SelfId());
}
}

void Handle(TEvWilson::TPtr ev) {
if (SpansSizeBytes >= MaxPendingSpanBytes) {
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span due to overflow");
Expand Down Expand Up @@ -284,6 +304,9 @@ namespace NWilson {
if (GrpcSigner) {
GrpcSigner->SignClientContext(*context);
}
for (const auto& [key, value] : Headers) {
context->AddMetadata(key, value);
}
auto reader = Stub->AsyncExport(context.get(), std::move(batch.Request), &CQ);
auto uploadData = std::unique_ptr<TExportRequestData>(new TExportRequestData {
.Context = std::move(context),
Expand All @@ -305,6 +328,8 @@ namespace NWilson {
auto node = std::unique_ptr<TExportRequestData>(static_cast<TExportRequestData*>(tag));
ALOG_TRACE(WILSON_SERVICE_ID, "finished export request " << (void*)node.get());
if (!node->Status.ok()) {
LastCommitTraceErrStr = node->Status.error_message();

ALOG_ERROR(WILSON_SERVICE_ID,
"failed to commit traces: " << node->Status.error_message());
}
Expand Down Expand Up @@ -351,13 +376,89 @@ namespace NWilson {
TryToSend();
}

void HandleHttp(NMon::TEvHttpInfo::TPtr &ev) {
TStringStream str;
str.Reserve(64 << 10);

bool isBroken = CurrentStateFunc() == &TThis::StateBroken;

HTML(str) {
TAG(TH4) {str << "Current state";}
PARA() {
str << (isBroken ? "Broken" : "Works");
}
if (ErrStr) {
PARA() {
str << "Error: " << ErrStr;
}
}
if (LastCommitTraceErrStr) {
PARA() {
str << "Last commit traces error: " << LastCommitTraceErrStr;
}
}
PARA() {
str << "Current batch size: " << CurrentBatch.SizeSpans();
}
PARA() {
str << "Current batch queue size: " << BatchQueue.size();
}
PARA() {
std::string state;
switch (Channel->GetState(false)) {
case GRPC_CHANNEL_IDLE:
state = "GRPC_CHANNEL_IDLE";
break;
case GRPC_CHANNEL_CONNECTING:
state = "GRPC_CHANNEL_CONNECTING";
break;
case GRPC_CHANNEL_READY:
state = "GRPC_CHANNEL_READY";
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
state = "GRPC_CHANNEL_TRANSIENT_FAILURE";
break;
case GRPC_CHANNEL_SHUTDOWN:
state = "GRPC_CHANNEL_SHUTDOWN";
break;
default:
state = "UNKNOWN_STATE";
break;
}
str << "Channel state# " << state;
}
TAG(TH4) {str << "Config";}
PRE() {
str << "MaxPendingSpanBytes# " << MaxPendingSpanBytes << '\n';
str << "MaxSpansPerSecond# " << MaxSpansPerSecond << '\n';
str << "MaxSpansInBatch# " << MaxSpansInBatch << '\n';
str << "MaxBytesInBatch# " << MaxBytesInBatch << '\n';
str << "MaxBatchAccumulation# " << MaxBatchAccumulation << '\n';
str << "MaxSpanTimeInQueue# " << MaxSpanTimeInQueue << '\n';
str << "MaxExportInflight# " << MaxExportInflight << '\n';
str << "CollectorUrl# " << CollectorUrl << '\n';
str << "ServiceName# " << ServiceName << '\n';
str << "Headers# " << '\n';
for (const auto& [key, value] : Headers) {
str << '\t' << key << ": " << value << '\n';
}
}
}

auto* result = new NMon::TEvHttpInfoRes(str.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Html);

Send(ev->Sender, result);
}

STRICT_STFUNC(StateWork,
hFunc(TEvWilson, Handle);
hFunc(TEvents::TEvWakeup, HandleWakeup);
hFunc(NMon::TEvHttpInfo, HandleHttp);
);

STRICT_STFUNC(StateBroken,
IgnoreFunc(TEvWilson);
hFunc(NMon::TEvHttpInfo, HandleHttp);
);
};

Expand Down
5 changes: 5 additions & 0 deletions ydb/library/actors/wilson/wilson_uploader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ namespace NWilson {
return NActors::TActorId(0, TStringBuf("WilsonUpload", 12));
}

using TRegisterMonPageCallback = std::function<void(NActors::TActorSystem* actorSystem, const NActors::TActorId& actorId)>;

struct TWilsonUploaderParams {
TString CollectorUrl;
TString ServiceName;
std::unique_ptr<IGrpcSigner> GrpcSigner;
TMap<TString, TString> Headers;

ui64 MaxExportedSpansPerSecond = Max<ui64>();
ui64 MaxSpansInBatch = 150;
Expand All @@ -37,6 +40,8 @@ namespace NWilson {
ui32 SpanExportTimeoutSeconds = 60 * 60 * 24 * 365;
ui64 MaxExportRequestsInflight = 1;

TRegisterMonPageCallback RegisterMonPage;

NActors::IActor* CreateUploader() &&;
};

Expand Down