Skip to content

Commit

Permalink
Implemented parallel upload in wilson
Browse files Browse the repository at this point in the history
  • Loading branch information
domwst committed Feb 13, 2024
1 parent 546c9c3 commit 32405e5
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions ydb/library/actors/wilson/wilson_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ namespace NWilson {
grpc::CompletionQueue CQ;

std::unique_ptr<IGrpcSigner> GrpcSigner;
std::unique_ptr<grpc::ClientContext> Context;
std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;

struct TSpanQueueItem {
TMonotonic ExpirationTimestamp;
Expand All @@ -51,6 +47,15 @@ namespace NWilson {

bool WakeupScheduled = false;

struct TUploadData : TIntrusiveListItem<TUploadData> {
std::unique_ptr<grpc::ClientContext> Context;
std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
grpc::Status Status;
NServiceProto::ExportTraceServiceResponse Response;
};

TIntrusiveListWithAutoDelete<TUploadData, TDelete> UploadRequests;

public:
TWilsonUploader(WilsonUploaderParams params)
: CollectorUrl(std::move(params.CollectorUrl))
Expand Down Expand Up @@ -131,7 +136,7 @@ namespace NWilson {
"dropped " << numSpansDropped << " span(s) due to expiration");
}

if (Context || Spans.empty()) {
if (Spans.empty()) {
return;
} else if (now < NextSendTimestamp) {
ScheduleWakeup(NextSendTimestamp);
Expand Down Expand Up @@ -162,29 +167,39 @@ namespace NWilson {
}

ScheduleWakeup(NextSendTimestamp);
Context = std::make_unique<grpc::ClientContext>();

auto context = std::make_unique<grpc::ClientContext>();
if (GrpcSigner) {
GrpcSigner->SignClientContext(*Context);
GrpcSigner->SignClientContext(*context);
}
Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
Reader->Finish(&Response, &Status, nullptr);
auto reader = Stub->AsyncExport(context.get(), std::move(request), &CQ);
auto uploadData = new TUploadData {
.Context = std::move(context),
.Reader = std::move(reader),
};
uploadData->Reader->Finish(&uploadData->Response, &uploadData->Status, uploadData);
UploadRequests.PushBack(uploadData);
}

void CheckIfDone() {
if (Context) {
void *tag;
bool ok;
if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
if (!Status.ok()) {
ALOG_ERROR(WILSON_SERVICE_ID,
"failed to commit traces: " << Status.error_message());
}

Reader.reset();
Context.reset();
} else {
ScheduleWakeup(TDuration::MilliSeconds(100));
void ReapCompletedRequests() {
if (UploadRequests.Empty()) {
return;
}
void* tag;
bool ok;
while (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
auto node = static_cast<TUploadData*>(tag);
if (!node->Status.ok()) {
ALOG_ERROR(WILSON_SERVICE_ID,
"failed to commit traces: " << node->Status.error_message());
}

node->Unlink();
delete node;
}

if (!UploadRequests.Empty()) {
ScheduleWakeup(TDuration::MilliSeconds(100));
}
}

Expand All @@ -204,7 +219,7 @@ namespace NWilson {
}

void TryMakeProgress() {
CheckIfDone();
ReapCompletedRequests();
TryToSend();
}

Expand Down

0 comments on commit 32405e5

Please sign in to comment.