Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Zpages webserver hosting from Exporter #626

Merged
merged 4 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

#include "opentelemetry/ext/zpages/latency_boundaries.h"
#include "opentelemetry/ext/zpages/tracez_data.h"
#include "opentelemetry/ext/zpages/tracez_processor.h"
#include "opentelemetry/ext/zpages/tracez_shared_data.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/trace/span_data.h"
Expand Down Expand Up @@ -46,10 +46,10 @@ class TracezDataAggregator
/**
* Constructor creates a thread that calls a function to aggregate span data
* at regular intervals.
* @param span_processor is the tracez span processor to be set
* @param shared_data is the shared set of spans to expose.
* @param update_interval the time duration for updating the aggregated data.
*/
TracezDataAggregator(std::shared_ptr<TracezSpanProcessor> span_processor,
TracezDataAggregator(std::shared_ptr<TracezSharedData> shared_data,
milliseconds update_interval = milliseconds(10));

/** Ends the thread set up in the constructor and destroys the object **/
Expand Down Expand Up @@ -135,8 +135,8 @@ class TracezDataAggregator
void InsertIntoSampleSpanList(std::list<ThreadsafeSpanData> &sample_spans,
ThreadsafeSpanData &span_data);

/** Instance of span processor used to collect raw data **/
std::shared_ptr<TracezSpanProcessor> tracez_span_processor_;
/** Instance of shared spans used to collect raw data **/
std::shared_ptr<TracezSharedData> tracez_shared_data_;

/**
* Tree map with key being the name of the span and value being a unique ptr
Expand Down
25 changes: 5 additions & 20 deletions ext/include/opentelemetry/ext/zpages/tracez_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <vector>

#include "opentelemetry/ext/zpages/threadsafe_span_data.h"
#include "opentelemetry/ext/zpages/tracez_shared_data.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"

Expand All @@ -23,16 +24,12 @@ namespace zpages
class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
{
public:
struct CollectedSpans
{
std::unordered_set<ThreadsafeSpanData *> running;
std::vector<std::unique_ptr<ThreadsafeSpanData>> completed;
};

/*
* Initialize a span processor.
*/
explicit TracezSpanProcessor() noexcept {}
explicit TracezSpanProcessor(std::shared_ptr<TracezSharedData> shared_data) noexcept
: shared_data_(shared_data)
{}

/*
* Create a span recordable, which is span_data
Expand All @@ -58,17 +55,6 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
*/
void OnEnd(std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept override;

/*
* Returns a snapshot of all spans stored. This snapshot has a copy of the
* stored running_spans and gives ownership of completed spans to the caller.
* Stored completed_spans are cleared from the processor. Currently,
* copy-on-write is utilized where possible to minimize contention, but locks
* may be added in the future.
* @return snapshot of all currently running spans and newly completed spans
* (spans never sent while complete) at the time that the function is called
*/
CollectedSpans GetSpanSnapshot() noexcept;

/*
* For now, does nothing. In the future, it
* may send all ended spans that have not yet been sent to the aggregator.
Expand Down Expand Up @@ -96,8 +82,7 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
}

private:
mutable std::mutex mtx_;
CollectedSpans spans_;
std::shared_ptr<TracezSharedData> shared_data_;
};
} // namespace zpages
} // namespace ext
Expand Down
64 changes: 64 additions & 0 deletions ext/include/opentelemetry/ext/zpages/tracez_shared_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once

#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <utility>
#include <vector>

#include "opentelemetry/ext/zpages/threadsafe_span_data.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{
/*
* The span processor passes and stores running and completed recordables (casted as span_data)
* to be used by the TraceZ Data Aggregator.
*/
class TracezSharedData
{
public:
struct CollectedSpans
{
std::unordered_set<ThreadsafeSpanData *> running;
std::vector<std::unique_ptr<ThreadsafeSpanData>> completed;
};

/*
* Initialize a shared data storage.
*/
explicit TracezSharedData() noexcept {}

/*
* Called when a span has been started.
*/
void OnStart(ThreadsafeSpanData *span) noexcept;

/*
* Called when a span has ended.
*/
void OnEnd(std::unique_ptr<ThreadsafeSpanData> &&span) noexcept;

/*
* Returns a snapshot of all spans stored. This snapshot has a copy of the
* stored running_spans and gives ownership of completed spans to the caller.
* Stored completed_spans are cleared from the processor. Currently,
* copy-on-write is utilized where possible to minimize contention, but locks
* may be added in the future.
* @return snapshot of all currently running spans and newly completed spans
* (spans never sent while complete) at the time that the function is called
*/
CollectedSpans GetSpanSnapshot() noexcept;

private:
mutable std::mutex mtx_;
CollectedSpans spans_;
};
} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
49 changes: 38 additions & 11 deletions ext/include/opentelemetry/ext/zpages/zpages.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
#include "opentelemetry/ext/zpages/tracez_data_aggregator.h"
#include "opentelemetry/ext/zpages/tracez_http_server.h"
#include "opentelemetry/ext/zpages/tracez_processor.h"
#include "opentelemetry/ext/zpages/tracez_shared_data.h"

#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/provider.h"

using opentelemetry::ext::zpages::TracezDataAggregator;
using opentelemetry::ext::zpages::TracezHttpServer;
using opentelemetry::ext::zpages::TracezSharedData;
using opentelemetry::ext::zpages::TracezSpanProcessor;
using std::chrono::microseconds;

Expand All @@ -28,9 +30,39 @@ class ZPages
public:
/**
* This function is called if the user wishes to include zPages in their
* application. It creates a static instance of this class.
* application. It creates a static instance of this class and replaces the
* global TracerProvider with one that delegates spans to tracez.
*/
static void Initialize() { static ZPages instance; }
static void Initialize() { Instance().ReplaceGlobalProvider(); }

/**
* Returns the singletone instnace of ZPages, useful for attaching z-pages span processors to
* non-global providers.
*
* Note: This will instantiate the Tracez instance and webserver if it hasn't already been
* instantiated.
*/
static ZPages &Instance()
{
static ZPages instance;
return instance;
}

/** Replaces the global tracer provider with an instance that exports to tracez. */
void ReplaceGlobalProvider()
{
std::shared_ptr<opentelemetry::sdk::trace::SpanProcessor> tracez_processor(
MakeSpanProcessor().release());
auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(tracez_processor));
opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_);
}

/** Retruns a new span processor that will output to z-pages. */
std::unique_ptr<TracezSpanProcessor> MakeSpanProcessor()
{
return std::unique_ptr<TracezSpanProcessor>(new TracezSpanProcessor(tracez_shared_));
}

private:
/**
Expand All @@ -40,19 +72,13 @@ class ZPages
*/
ZPages()
{
auto tracez_processor_ = std::make_shared<TracezSpanProcessor>();
auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(tracez_processor_));

// Construct shared data nd start tracez webserver.
tracez_shared_ = std::make_shared<TracezSharedData>();
auto tracez_aggregator =
std::unique_ptr<TracezDataAggregator>(new TracezDataAggregator(tracez_processor_));

std::unique_ptr<TracezDataAggregator>(new TracezDataAggregator(tracez_shared_));
tracez_server_ =
std::unique_ptr<TracezHttpServer>(new TracezHttpServer(std::move(tracez_aggregator)));

tracez_server_->start();

opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_);
}

~ZPages()
Expand All @@ -61,5 +87,6 @@ class ZPages
// program)
tracez_server_->stop();
}
std::shared_ptr<TracezSharedData> tracez_shared_;
std::unique_ptr<TracezHttpServer> tracez_server_;
};
2 changes: 2 additions & 0 deletions ext/src/zpages/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
add_library(
opentelemetry_zpages
tracez_processor.cc
tracez_shared_data.cc
tracez_data_aggregator.cc
../../include/opentelemetry/ext/zpages/tracez_shared_data.h
../../include/opentelemetry/ext/zpages/tracez_processor.h
../../include/opentelemetry/ext/zpages/tracez_data_aggregator.h
../../include/opentelemetry/ext/zpages/tracez_http_server.h)
Expand Down
6 changes: 3 additions & 3 deletions ext/src/zpages/tracez_data_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace ext
namespace zpages
{

TracezDataAggregator::TracezDataAggregator(std::shared_ptr<TracezSpanProcessor> span_processor,
TracezDataAggregator::TracezDataAggregator(std::shared_ptr<TracezSharedData> shared_data,
milliseconds update_interval)
{
tracez_span_processor_ = span_processor;
tracez_shared_data_ = shared_data;

// Start a thread that calls AggregateSpans periodically or till notified.
execute_.store(true, std::memory_order_release);
Expand Down Expand Up @@ -153,7 +153,7 @@ void TracezDataAggregator::AggregateRunningSpans(

void TracezDataAggregator::AggregateSpans()
{
auto span_snapshot = tracez_span_processor_->GetSpanSnapshot();
auto span_snapshot = tracez_shared_data_->GetSpanSnapshot();
/**
* TODO: At this time in the project, there is no way of uniquely identifying
* a span(their id's are not being set yet).
Expand Down
26 changes: 3 additions & 23 deletions ext/src/zpages/tracez_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,14 @@ namespace zpages
void TracezSpanProcessor::OnStart(opentelemetry::sdk::trace::Recordable &span,
const opentelemetry::trace::SpanContext &parent_context) noexcept
{
std::lock_guard<std::mutex> lock(mtx_);
spans_.running.insert(static_cast<ThreadsafeSpanData *>(&span));
shared_data_->OnStart(static_cast<ThreadsafeSpanData *>(&span));
}

void TracezSpanProcessor::OnEnd(
std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept
{
if (span == nullptr)
return;
auto span_raw = static_cast<ThreadsafeSpanData *>(span.get());
std::lock_guard<std::mutex> lock(mtx_);
auto span_it = spans_.running.find(span_raw);
if (span_it != spans_.running.end())
{
spans_.running.erase(span_it);
spans_.completed.push_back(
std::unique_ptr<ThreadsafeSpanData>(static_cast<ThreadsafeSpanData *>(span.release())));
}
}

TracezSpanProcessor::CollectedSpans TracezSpanProcessor::GetSpanSnapshot() noexcept
{
CollectedSpans snapshot;
std::lock_guard<std::mutex> lock(mtx_);
snapshot.running = spans_.running;
snapshot.completed = std::move(spans_.completed);
spans_.completed.clear();
return snapshot;
shared_data_->OnEnd(
std::unique_ptr<ThreadsafeSpanData>(static_cast<ThreadsafeSpanData *>(span.release())));
}

} // namespace zpages
Expand Down
38 changes: 38 additions & 0 deletions ext/src/zpages/tracez_shared_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "opentelemetry/ext/zpages/tracez_shared_data.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{

void TracezSharedData::OnStart(ThreadsafeSpanData *span) noexcept
{
std::lock_guard<std::mutex> lock(mtx_);
spans_.running.insert(span);
}

void TracezSharedData::OnEnd(std::unique_ptr<ThreadsafeSpanData> &&span) noexcept
{
std::lock_guard<std::mutex> lock(mtx_);
auto span_it = spans_.running.find(span.get());
if (span_it != spans_.running.end())
{
spans_.running.erase(span_it);
spans_.completed.push_back(std::unique_ptr<ThreadsafeSpanData>(span.release()));
}
}

TracezSharedData::CollectedSpans TracezSharedData::GetSpanSnapshot() noexcept
{
CollectedSpans snapshot;
std::lock_guard<std::mutex> lock(mtx_);
snapshot.running = spans_.running;
snapshot.completed = std::move(spans_.completed);
spans_.completed.clear();
return snapshot;
}

} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
5 changes: 3 additions & 2 deletions ext/test/zpages/tracez_data_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ class TracezDataAggregatorTest : public ::testing::Test
protected:
void SetUp() override
{
std::shared_ptr<TracezSpanProcessor> processor(new TracezSpanProcessor());
std::shared_ptr<TracezSharedData> shared_data(new TracezSharedData());
std::shared_ptr<TracezSpanProcessor> processor(new TracezSpanProcessor(shared_data));
auto resource = opentelemetry::sdk::resource::Resource::Create({});
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(processor, resource));
tracez_data_aggregator = std::unique_ptr<TracezDataAggregator>(
new TracezDataAggregator(processor, milliseconds(10)));
new TracezDataAggregator(shared_data, milliseconds(10)));
}

std::unique_ptr<TracezDataAggregator> tracez_data_aggregator;
Expand Down
Loading