Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create shared context for updating span pipeline from TracerProvider and affecting Tracer. #650

Merged
merged 21 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/batch/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ void initTracer()
// We export `kNumSpans` after every `schedule_delay_millis` milliseconds.
options.max_export_batch_size = kNumSpans;

auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::BatchSpanProcessor(std::move(exporter), options));

auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor));
new sdktrace::TracerProvider(std::move(processor)));
// Set the global trace provider.
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
42 changes: 42 additions & 0 deletions examples/http/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
cc_binary(
name = "example_http_client",
srcs = [
"tracer_common.hpp",
"client.cc",
],
deps = [
"//api",
"//ext:headers",
"//ext/src/http/client/curl:http_client_curl",
"//exporters/ostream:ostream_span_exporter",
"//sdk/src/trace",
],
# TODO: Move copts/linkopts for static CURL usage into shared bzl file.
copts = [
"-DCURL_STATICLIB",
"-DWITH_CURL",
],
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
],
"//conditions:default": [],
}),
)

cc_binary(
name = "example_http_server",
srcs = [
"server.cc",
"tracer_common.hpp",
"server.hpp",
],
deps = [
"//api",
"//ext:headers",
"//exporters/ostream:ostream_span_exporter",
"//sdk/src/trace",
],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the bazel build for this.

9 changes: 5 additions & 4 deletions examples/http/tracer_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ namespace {
void initTracer() {
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor, opentelemetry::sdk::resource::Resource::Create({}),
std::make_shared<opentelemetry::sdk::trace::AlwaysOnSampler>()));
// Default is an always-on sampler.
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processor));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(context));
// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
7 changes: 4 additions & 3 deletions examples/multithreaded/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ void initTracer()
{
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor, opentelemetry::sdk::resource::Resource::Create({})));
auto provider =
nostd::shared_ptr<opentelemetry::trace::TracerProvider>(new sdktrace::TracerProvider(
std::move(processor), opentelemetry::sdk::resource::Resource::Create({})));
// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
5 changes: 3 additions & 2 deletions examples/otlp/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ void InitTracer()
{
// Create OTLP exporter instance
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(new otlp::OtlpExporter(opts));
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<trace::TracerProvider>(new sdktrace::TracerProvider(processor));
auto provider =
nostd::shared_ptr<trace::TracerProvider>(new sdktrace::TracerProvider(std::move(processor)));
// Set the global trace provider
trace::Provider::SetTracerProvider(provider);
}
Expand Down
5 changes: 2 additions & 3 deletions examples/simple/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ void initTracer()
{
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor, opentelemetry::sdk::resource::Resource::Create({}),
std::make_shared<opentelemetry::sdk::trace::AlwaysOnSampler>()));
new sdktrace::TracerProvider(std::move(processor)));

// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/test/otlp_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ TEST_F(OtlpExporterTestPeer, ExportIntegrationTest)

auto exporter = GetExporter(stub_interface);

auto processor = std::shared_ptr<sdk::trace::SpanProcessor>(
auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::SimpleSpanProcessor(std::move(exporter)));
auto provider =
nostd::shared_ptr<trace::TracerProvider>(new sdk::trace::TracerProvider(processor));
nostd::shared_ptr<trace::TracerProvider>(new sdk::trace::TracerProvider(std::move(processor)));
auto tracer = provider->GetTracer("test");

EXPECT_CALL(*mock_stub, Export(_, _, _))
Expand Down
6 changes: 3 additions & 3 deletions ext/include/opentelemetry/ext/zpages/zpages.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ class ZPages
/** Replaces the global tracer provider with an instance that exports to tracez. */
void ReplaceGlobalProvider()
{
std::shared_ptr<opentelemetry::sdk::trace::SpanProcessor> tracez_processor(
MakeSpanProcessor().release());
// GCC 4.8 can't infer the type coercion.
std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor> processor(MakeSpanProcessor().release());
auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(tracez_processor));
new opentelemetry::sdk::trace::TracerProvider(std::move(processor)));
opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_);
}

Expand Down
14 changes: 14 additions & 0 deletions ext/src/http/client/curl/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ cc_library(
"http_client_factory_curl.cc",
],
include_prefix = "src/http/client/curl",
# TODO: Move copts/linkopts for static CURL usage into shared bzl file.
copts = [
"-DCURL_STATICLIB",
"-DWITH_CURL",
],
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
"-DEFAULTLIB:Ws2_32.lib",
],
"//conditions:default": [],
}),
deps = [
"//api",
"//ext:headers",
Expand Down
13 changes: 0 additions & 13 deletions ext/test/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@ cc_test(
srcs = [
"curl_http_test.cc",
],
# TODO: Move copts/linkopts for static CURL usage into shared bzl file.
copts = [
"-DCURL_STATICLIB",
"-DWITH_CURL",
],
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
],
"//conditions:default": [],
}),
deps = [
"//ext:headers",
"//ext/src/http/client/curl:http_client_curl",
Expand Down
5 changes: 3 additions & 2 deletions ext/test/w3c_tracecontext_test/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ void initTracer()
{
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processor));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor));
new sdktrace::TracerProvider(context));
// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
5 changes: 3 additions & 2 deletions ext/test/zpages/tracez_data_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ class TracezDataAggregatorTest : public ::testing::Test
void SetUp() override
{
std::shared_ptr<TracezSharedData> shared_data(new TracezSharedData());
std::shared_ptr<TracezSpanProcessor> processor(new TracezSpanProcessor(shared_data));
auto resource = opentelemetry::sdk::resource::Resource::Create({});
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(processor, resource));
auto context = std::make_shared<TracerContext>(
std::unique_ptr<SpanProcessor>(new TracezSpanProcessor(shared_data)), resource);
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context));
tracez_data_aggregator = std::unique_ptr<TracezDataAggregator>(
new TracezDataAggregator(shared_data, milliseconds(10)));
}
Expand Down
6 changes: 5 additions & 1 deletion ext/test/zpages/tracez_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ class TracezProcessor : public ::testing::Test
shared_data = std::shared_ptr<TracezSharedData>(new TracezSharedData());
processor = std::shared_ptr<TracezSpanProcessor>(new TracezSpanProcessor(shared_data));
auto resource = opentelemetry::sdk::resource::Resource::Create({});
// Note: we make a *different* processor for the tracercontext. THis is because
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Note: we make a *different* processor for the tracercontext. THis is because
// Note: we make a *different* processor for the tracercontext. This is because

// all the tests use shared data, and we want to make sure this works correctly.
auto context = std::make_shared<TracerContext>(
std::unique_ptr<SpanProcessor>(new TracezSpanProcessor(shared_data)), resource);

tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(processor, resource));
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context));
auto spans = shared_data->GetSpanSnapshot();
running = spans.running;
completed = std::move(spans.completed);
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/common/atomic_unique_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class AtomicUniquePtr
public:
AtomicUniquePtr() noexcept {}

explicit AtomicUniquePtr(std::unique_ptr<T> &&other) noexcept : ptr_(other.release()) {}

~AtomicUniquePtr() noexcept { Reset(); }

T &operator*() const noexcept { return *Get(); }
Expand Down
40 changes: 10 additions & 30 deletions sdk/include/opentelemetry/sdk/trace/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/samplers/always_on.h"
#include "opentelemetry/sdk/trace/tracer_context.h"
#include "opentelemetry/trace/noop.h"
#include "opentelemetry/trace/tracer.h"
#include "opentelemetry/version.h"
Expand All @@ -18,33 +19,8 @@ namespace trace
class Tracer final : public trace_api::Tracer, public std::enable_shared_from_this<Tracer>
{
public:
/**
* Initialize a new tracer.
* @param processor The span processor for this tracer. This must not be a
* nullptr.
*/
explicit Tracer(std::shared_ptr<SpanProcessor> processor,
const opentelemetry::sdk::resource::Resource &resource,
std::shared_ptr<Sampler> sampler = std::make_shared<AlwaysOnSampler>()) noexcept;

/**
* Set the span processor associated with this tracer.
* @param processor The new span processor for this tracer. This must not be
* a nullptr.
*/
void SetProcessor(std::shared_ptr<SpanProcessor> processor) noexcept;

/**
* Obtain the span processor associated with this tracer.
* @return The span processor for this tracer.
*/
std::shared_ptr<SpanProcessor> GetProcessor() const noexcept;

/**
* Obtain the sampler associated with this tracer.
* @return The sampler for this tracer.
*/
std::shared_ptr<Sampler> GetSampler() const noexcept;
/** Construct a new Tracer with the given context pipeline. */
explicit Tracer(std::shared_ptr<sdk::trace::TracerContext> context) noexcept;

nostd::shared_ptr<trace_api::Span> StartSpan(
nostd::string_view name,
Expand All @@ -56,10 +32,14 @@ class Tracer final : public trace_api::Tracer, public std::enable_shared_from_th

void CloseWithMicroseconds(uint64_t timeout) noexcept override;

/** Returns the currently active span processor. */
SpanProcessor &GetActiveProcessor() noexcept { return context_->GetActiveProcessor(); }

// Note: Test only
Sampler &GetSampler() { return context_->GetSampler(); }

private:
opentelemetry::sdk::common::AtomicSharedPtr<SpanProcessor> processor_;
const std::shared_ptr<Sampler> sampler_;
const opentelemetry::sdk::resource::Resource &resource_;
std::shared_ptr<sdk::trace::TracerContext> context_;
};
} // namespace trace
} // namespace sdk
Expand Down
85 changes: 85 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/tracer_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#pragma once

#include "opentelemetry/sdk/common/atomic_unique_ptr.h"
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/samplers/always_on.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace trace
{

/**
* A class which stores the TracerProvider context.
*
* This class meets the following design criteria:
* - A shared reference between TracerProvider and Tracers instantiated.
* - A thread-safe class that allows updating/altering processor/exporter pipelines
* and sampling config.
* - The owner/destroyer of Processors/Exporters. These will remain active until
* this class is destroyed. I.e. Sampling, Exporting, flushing etc. are all ok if this
* object is alive, and they will work together. If this object is destroyed, then
* no shared references to Processor, Exporter, Recordable etc. should exist, and all
* associated pipelines will have been flushed.
*/
class TracerContext
{
public:
explicit TracerContext(std::unique_ptr<SpanProcessor> processor,
opentelemetry::sdk::resource::Resource resource =
opentelemetry::sdk::resource::Resource::Create({}),
std::unique_ptr<Sampler> sampler =
std::unique_ptr<AlwaysOnSampler>(new AlwaysOnSampler)) noexcept;
/**
* Attaches a span processor to this tracer context.
*
* @param processor The new span processor for this tracer. This must not be
* a nullptr. Ownership is given to the `TracerContext`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* a nullptr. Ownership is given to the `TracerContext`.
* a nullptr. Ownership is given to the `TracerContext`.

*/
void RegisterPipeline(std::unique_ptr<SpanProcessor> processor) noexcept;

/**
* Obtain the sampler associated with this tracer.
* @return The sampler for this tracer.
*/
Sampler &GetSampler() const noexcept;

/**
* Obtain the (conceptual) active processor.
*
* Note: When more than one processor is active, this will
* return an "aggregate" processor
*/
SpanProcessor &GetActiveProcessor() const noexcept;
Comment on lines +51 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be an inactive processor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any inactive processor. @jsuereth can confirm though :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'inactive' would be any constructed but not registered processor.

I'm not tied to this name, just took it from Java ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps just name it GetProcessor to avoid the confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might theoretically run into issue here when some tracer does some work with a processor obtained by GetActiveProcessor, while another thread calls RegisterPipeline and thus causes the memory pointed to by the obtained the processor to be freed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be the case. The interface is "RegisterPipeline", and it's a shared pointer. The TracerContext can guarantee that ALL registered processors remain alive for its own lifecycle. I.e. we're not going to delete a pipeline, we're only going to allow registration of new ones specifically to prevent this issue.

We have the inverse problem around Recordable though, specifically if we have an "active" span that did NOT get a recordable registered when a new pipeline is registered, what happens when that span ends?

If you look at my design around "ExportableSpan" where a pipeline can ask for its instance of Recordable (and handle a missing recordable), that's how I planned to solve that issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand. So if a new pipeline is registered, the old one isn't just overwritten, but kept in store and just made inactive?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that all the pipelines(i.e, processors) remain active. And we can only add a new pipeline, but can't remove the existing one? And once we add multi-processor support, TracerContext would actually maintain a unique_ptr to (say) CompositeSpanProcessor which further manages all the active pipelines/processors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. i'm adopting the design I've seen elsewhere where you can "append a pipeline", but can't remove an existing pipeline.

I.e. once a pipeline is active, it's forever active. We don't have a register/unregister mechanism.


/**
* Obtain the resource associated with this tracer context.
* @return The resource for this tracer context.
*/
const opentelemetry::sdk::resource::Resource &GetResource() const noexcept;

/**
* Force all active SpanProcessors to flush any buffered spans
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by all active span processor. Wouldn't we just have a CompositeSpanProcessor? Which would encapsulate flushing and shutdown? So that this is a thing the CompositeSpanProcessor has to worry about, and not the TraceContext?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I wouldn't mind leaving ForceFlush and Shutdown out. GetActiveProcessor().ForceFlush() isn't that bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for users of TracerContext to understand what it does. Yes the implementation of this behavior may be implemented by "CompositeSpanProcessor" but it's not something that would be directly exposed, instead users just see "RegisterPipeline" and "Shutdown" and "ActiveProcessor". I.e. we still need to document the behavior of this class and how to use its methods.

Note: I'm not tied to "active" processor, I borrowed the name from the Java SDK.

* within the given timeout.
*/
bool ForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* Shutdown the span processor associated with this tracer provider.
*/
bool Shutdown() noexcept;

private:
// This is an atomic pointer so we can adapt the processor pipeline dynamically.
opentelemetry::sdk::common::AtomicUniquePtr<SpanProcessor> processor_;
opentelemetry::sdk::resource::Resource resource_;
std::unique_ptr<Sampler> sampler_;
};

} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Loading