Skip to content

Commit

Permalink
Attempt to allow multiple span processors.
Browse files Browse the repository at this point in the history
- Migrate SpanProcessor to use `ExportableSpan`
- `ExportableSpan` has a registry of `Recordable`s denoated from processors
- `ExportableSpan` replaces `Recordable` in `Span` implementation for now
- Do some gymnastics around unique_ptr + ownership
- Update SDK tests (exporters/ext tests still borked)
- For now, `ExportableSpan` has shared ptr reference to originating Tracer.  TBD on whether this stays.
  • Loading branch information
jsuereth committed Mar 24, 2021
1 parent e3159b7 commit eacd67a
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 87 deletions.
10 changes: 4 additions & 6 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ class BatchSpanProcessor : public SpanProcessor

/**
* Requests a Recordable(Span) from the configured exporter.
*
* @return A recordable generated by the backend exporter
*/
std::unique_ptr<Recordable> MakeRecordable() noexcept override;
void RegisterRecordable(ExportableSpan& span) noexcept override;

/**
* Called when a span is started.
Expand All @@ -68,15 +66,15 @@ class BatchSpanProcessor : public SpanProcessor
* @param span - The span that just started
* @param parent_context - The parent context of the span that just started
*/
void OnStart(Span &span,
void OnStart(ExportableSpan &span,
const opentelemetry::trace::SpanContext &parent_context) noexcept override;

/**
* Called when a span ends.
*
* @param span - A recordable for a span that just ended
*/
void OnEnd(Span &span) noexcept override;
void OnEnd(std::unique_ptr<ExportableSpan> &&span) noexcept override;

/**
* Export all ended spans that have not been exported yet.
Expand Down Expand Up @@ -139,7 +137,7 @@ class BatchSpanProcessor : public SpanProcessor
std::mutex cv_m_, force_flush_cv_m_;

/* The buffer/queue to which the ended spans are added */
common::CircularBuffer<Recordable> buffer_;
common::CircularBuffer<ExportableSpan> buffer_;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_{false};
Expand Down
73 changes: 73 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/exportable_span.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/sdk/trace/tracer.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace trace
{

class SpanExporter;

/**
* A representation of only information used to export spans.
*
* Unlike `sdk::Span`, this simply tracks the originating recorable + Tracer which generated
* a span. Additionally it delegates Recordable calls to underlying recordadble interfaces.
*/
class ExportableSpan : public Recordable
{
public:
explicit ExportableSpan(std::shared_ptr<Tracer> tracer);
virtual ~ExportableSpan();


/**
* Constructs a new unique Exportable span which extracts any exportable associated
* with a given processor.
*
* Note: This is used to coordinate exportable span in a mulit-span processor.
*/
std::unique_ptr<ExportableSpan> ReleaseExportableSpanFor(const SpanProcessor& processor) noexcept;
/**
* Registers a recordable for a given processor that this span should write data into.
*/
void RegisterRecordableFor(const SpanProcessor& processor, std::unique_ptr<Recordable> recordable) noexcept;
/**
* Releases ownership of the originally registered recordable.
*/
std::unique_ptr<Recordable> ReleaseRecordableFor(const SpanProcessor& processor) noexcept;


Tracer &GetTracer() { return *tracer_; }
// Note: used in MultiSpanProcessor, clean this up...
std::shared_ptr<Tracer> ShareTracer() { return tracer_; }

// Recordable Interface
void SetIds(opentelemetry::trace::TraceId trace_id,
opentelemetry::trace::SpanId span_id,
opentelemetry::trace::SpanId parent_span_id) noexcept override;
void SetAttribute(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept override;
void AddEvent(nostd::string_view name,
core::SystemTimestamp timestamp,
const opentelemetry::common::KeyValueIterable &attributes) noexcept override;
void AddLink(const opentelemetry::trace::SpanContext &span_context,
const opentelemetry::common::KeyValueIterable &attributes) noexcept override;
void SetStatus(opentelemetry::trace::StatusCode code,
nostd::string_view description) noexcept override;
void SetName(nostd::string_view name) noexcept override;
void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override;
void SetStartTime(opentelemetry::core::SystemTimestamp start_time) noexcept override;
void SetDuration(std::chrono::nanoseconds duration) noexcept override;
private:
std::shared_ptr<Tracer> tracer_;
// TODO - more efficient data structure
std::map<std::size_t, std::unique_ptr<Recordable>> recordables_;
};

} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
91 changes: 91 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/multi_span_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#pragma once

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"

#include <atomic>
#include <condition_variable>
#include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{

namespace trace
{

/** Instantiation options. */
struct MultiSpanProcessorOptions {};

/**
* This is an implementation of the SpanProcessor which aggregates across an in-order list of
* other span processors.
*/
class MultiSpanProcessor : public SpanProcessor
{
public:
/**
* Creates a batch span processor by configuring the specified exporter and other parameters
* as per the official, language-agnostic opentelemetry specs.
*
* @param exporter - The backend exporter to pass the ended spans to.
* @param options - The batch SpanProcessor options.
*/
MultiSpanProcessor(std::vector<std::unique_ptr<SpanProcessor>> processors,
const MultiSpanProcessorOptions &options = {});

/**
* Registeres any needed `Recordable`s for a given span.
*/
void RegisterRecordable(ExportableSpan& span) noexcept override;

/**
* Called when a span is started.
*
* NOTE: This method is a no-op.
*
* @param span - The span that just started
* @param parent_context - The parent context of the span that just started
*/
void OnStart(ExportableSpan &span,
const opentelemetry::trace::SpanContext &parent_context) noexcept override;

/**
* Called when a span ends.
*
* @param span - A recordable for a span that just ended
*/
void OnEnd(std::unique_ptr<ExportableSpan> &&span) noexcept override;

/**
* Export all ended spans that have not been exported yet.
*
* NOTE: Timeout functionality not supported yet.
*/
bool ForceFlush(std::chrono::microseconds timeout) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its ended spans and passes them to the exporter. Any subsequent calls to OnStart, OnEnd,
* ForceFlush or Shutdown will return immediately without doing anything.
*
* NOTE: Timeout functionality not supported yet.
*/
bool Shutdown(std::chrono::microseconds timeout) noexcept override;

/**
* Class destructor which invokes the Shutdown() method. The Shutdown() method is supposed to be
* invoked when the Tracer is shutdown (as per other languages), but the C++ Tracer only takes
* shared ownership of the processor, and thus doesn't call Shutdown (as the processor might be
* shared with other Tracers).
*/
~MultiSpanProcessor();

private:
std::vector<std::unique_ptr<SpanProcessor>> processors_;
};

} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
17 changes: 7 additions & 10 deletions sdk/include/opentelemetry/sdk/trace/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

#include <chrono>
#include <memory>
#include "opentelemetry/sdk/trace/recordable.h"

#include "opentelemetry/trace/span_context.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand All @@ -11,7 +12,7 @@ namespace trace
{

// Forward declaration to break circular dependency.
class Span;
class ExportableSpan;

/**
* Span processor allow hooks for span start and end method invocations.
Expand All @@ -25,27 +26,23 @@ class SpanProcessor
virtual ~SpanProcessor() = default;

/**
* Create a span recordable. This requests a new span recordable from the
* associated exporter.
* @return a newly initialized recordable
*
* Note: This method must be callable from multiple threads.
* Registers any `Recordable` this span processor will use.
*/
virtual std::unique_ptr<Recordable> MakeRecordable() noexcept = 0;
virtual void RegisterRecordable(ExportableSpan& span) noexcept = 0;

/**
* OnStart is called when a span is started.
* @param span a recordable for a span that was just started
* @param parent_context The parent context of the span that just started
*/
virtual void OnStart(Span &span,
virtual void OnStart(ExportableSpan &span,
const opentelemetry::trace::SpanContext &parent_context) noexcept = 0;

/**
* OnEnd is called when a span is ended.
* @param span the span that ended. Note: We need to pull our recordables off of this.
*/
virtual void OnEnd(Span &span) noexcept = 0;
virtual void OnEnd(std::unique_ptr<ExportableSpan> &&span) noexcept = 0;

/**
* Export all ended spans that have not yet been exported.
Expand Down
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/trace/recordable.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class Recordable
*/
virtual void SetDuration(std::chrono::nanoseconds duration) noexcept = 0;
};

} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
12 changes: 6 additions & 6 deletions sdk/include/opentelemetry/sdk/trace/simple_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/span.h"
#include "opentelemetry/sdk/trace/exportable_span.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand All @@ -33,18 +33,18 @@ class SimpleSpanProcessor : public SpanProcessor
: exporter_(std::move(exporter))
{}

std::unique_ptr<Recordable> MakeRecordable() noexcept override
void RegisterRecordable(ExportableSpan& span) noexcept override
{
return exporter_->MakeRecordable();
span.RegisterRecordableFor(*this, exporter_->MakeRecordable());
}

void OnStart(Span &span,
void OnStart(ExportableSpan &span,
const opentelemetry::trace::SpanContext &parent_context) noexcept override
{}

void OnEnd(Span &span) noexcept override
void OnEnd(std::unique_ptr<ExportableSpan> &&span) noexcept override
{
auto data = span.ConsumeRecordable();
auto data = span->ReleaseRecordableFor(*this);
nostd::span<std::unique_ptr<Recordable>> batch(&data, 1);
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
if (exporter_->Export(batch) == ExportResult::kFailure)
Expand Down
35 changes: 10 additions & 25 deletions sdk/include/opentelemetry/sdk/trace/span.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <mutex>

#include "opentelemetry/trace/tracer.h"
#include "opentelemetry/sdk/trace/exportable_span.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/sdk/trace/tracer.h"
#include "opentelemetry/sdk/trace/tracer_context.h"
Expand All @@ -14,7 +15,7 @@ namespace sdk
namespace trace
{

class Span final : public opentelemetry::trace::Span
class Span final : public opentelemetry::trace::Span, public std::enable_shared_from_this<Span>
{
public:
explicit Span(std::shared_ptr<Tracer> &&tracer,
Expand Down Expand Up @@ -48,32 +49,16 @@ class Span final : public opentelemetry::trace::Span

trace_api::SpanContext GetContext() const noexcept override { return *span_context_.get(); }

/**
* Gives ownership of the recordable.
*
* Must only be called after `End()`.
*
* TODO(jsuereth): This method will be reworked once multi-processor span support is added.
*/
std::unique_ptr<Recordable> ConsumeRecordable() {
return std::unique_ptr<Recordable>(recordable_.release());
}

/**
* A pointer to the current recordable. Could be nullptr.
*
* Note: this does not give over control, and is currently only used for z-pages.
*
* TODO(jsuereth): This method will be reworked once multi-processor span support is added.
*/
std::unique_ptr<Recordable>& GetRecordablePtr() {
return recordable_;
}

private:
std::shared_ptr<Tracer> tracer_;
// Returns the recordable, or nullptr if not available.
Recordable* GetRecordable() const {
if (exportable_ != nullptr) {
return exportable_.get();
}
return nullptr;
}
mutable std::mutex mu_;
std::unique_ptr<Recordable> recordable_;
std::unique_ptr<ExportableSpan> exportable_;
opentelemetry::core::SteadyTimestamp start_steady_time;
std::unique_ptr<opentelemetry::trace::SpanContext> span_context_;
bool has_ended_;
Expand Down
Loading

0 comments on commit eacd67a

Please sign in to comment.