Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate resource to exporters #706

Merged
merged 19 commits into from
Apr 30, 2021
Merged
1 change: 1 addition & 0 deletions exporters/memory/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ cc_library(
strip_include_prefix = "include",
deps = [
"//api",
"//sdk/src/resource",
"//sdk/src/trace",
],
)
Expand Down
5 changes: 3 additions & 2 deletions exporters/memory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ if(BUILD_TESTING)

target_link_libraries(
in_memory_span_data_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
opentelemetry_exporter_in_memory)
opentelemetry_exporter_in_memory opentelemetry_resources)

target_link_libraries(
in_memory_span_exporter_test ${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} opentelemetry_exporter_in_memory)
${CMAKE_THREAD_LIBS_INIT} opentelemetry_exporter_in_memory
opentelemetry_resources)

gtest_add_tests(
TARGET in_memory_span_data_test
Expand Down
1 change: 1 addition & 0 deletions exporters/otlp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cc_library(
],
strip_include_prefix = "include",
deps = [
"//sdk/src/resource",
"//sdk/src/trace",
"@com_github_opentelemetry_proto//:trace_proto_cc",
],
Expand Down
5 changes: 3 additions & 2 deletions exporters/otlp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ add_library(opentelemetry_exporter_otprotocol src/recordable.cc
set_target_properties(opentelemetry_exporter_otprotocol
PROPERTIES EXPORT_NAME otlp_exporter)

target_link_libraries(opentelemetry_exporter_otprotocol
PUBLIC opentelemetry_trace opentelemetry_proto)
target_link_libraries(
opentelemetry_exporter_otprotocol
PUBLIC opentelemetry_trace opentelemetry_resources opentelemetry_proto)

install(
TARGETS opentelemetry_exporter_otprotocol
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"

#include "opentelemetry/proto/resource/v1/resource.pb.h"
#include "opentelemetry/proto/trace/v1/trace.pb.h"

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"
Expand All @@ -19,6 +19,9 @@ class Recordable final : public sdk::trace::Recordable
public:
const proto::trace::v1::Span &span() const noexcept { return span_; }

/** Dynamically converts the resource of this span into a proto. */
proto::resource::v1::Resource resource() const noexcept;
lalitb marked this conversation as resolved.
Show resolved Hide resolved

void SetIdentity(const opentelemetry::trace::SpanContext &span_context,
opentelemetry::trace::SpanId parent_span_id) noexcept override;

Expand All @@ -38,6 +41,8 @@ class Recordable final : public sdk::trace::Recordable

void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override;

void SetResource(const opentelemetry::sdk::resource::Resource &resource) noexcept override;

void SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept override;

void SetDuration(std::chrono::nanoseconds duration) noexcept override;
Expand All @@ -48,6 +53,7 @@ class Recordable final : public sdk::trace::Recordable

private:
proto::trace::v1::Span span_;
const opentelemetry::sdk::resource::Resource *resource_;
};
} // namespace otlp
} // namespace exporter
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ void PopulateRequest(const nostd::span<std::unique_ptr<sdk::trace::Recordable>>
{
auto resource_span = request->add_resource_spans();
auto instrumentation_lib = resource_span->add_instrumentation_library_spans();
bool has_resource = false;

for (auto &recordable : spans)
{
auto rec = std::unique_ptr<Recordable>(static_cast<Recordable *>(recordable.release()));
// TODO - Handle Resource
*instrumentation_lib->add_spans() = std::move(rec->span());
}
}
Expand Down
115 changes: 114 additions & 1 deletion exporters/otlp/src/recordable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace otlp
//
// See `attribute_value.h` for details.
//
const int kAttributeValueSize = 15;
const int kAttributeValueSize = 15;
const int kOwnedAttributeValueSize = 15;

void Recordable::SetIdentity(const opentelemetry::trace::SpanContext &span_context,
opentelemetry::trace::SpanId parent_span_id) noexcept
Expand Down Expand Up @@ -123,6 +124,116 @@ void PopulateAttribute(opentelemetry::proto::common::v1::KeyValue *attribute,
}
}

/** Maps from C++ attribute into OTLP proto attribute. */
void PopulateAttribute(opentelemetry::proto::common::v1::KeyValue *attribute,
nostd::string_view key,
const sdk::common::OwnedAttributeValue &value)
{
// Assert size of variant to ensure that this method gets updated if the variant
// definition changes
static_assert(
nostd::variant_size<opentelemetry::common::AttributeValue>::value == kOwnedAttributeValueSize,
"AttributeValue contains unknown type");

attribute->set_key(key.data(), key.size());

if (nostd::holds_alternative<bool>(value))
{
attribute->mutable_value()->set_bool_value(nostd::get<bool>(value));
}
else if (nostd::holds_alternative<int32_t>(value))
{
attribute->mutable_value()->set_int_value(nostd::get<int32_t>(value));
}
else if (nostd::holds_alternative<int64_t>(value))
{
attribute->mutable_value()->set_int_value(nostd::get<int64_t>(value));
}
else if (nostd::holds_alternative<uint32_t>(value))
{
attribute->mutable_value()->set_int_value(nostd::get<uint32_t>(value));
}
else if (nostd::holds_alternative<uint64_t>(value))
{
attribute->mutable_value()->set_int_value(nostd::get<uint64_t>(value));
}
else if (nostd::holds_alternative<double>(value))
{
attribute->mutable_value()->set_double_value(nostd::get<double>(value));
}
else if (nostd::holds_alternative<std::string>(value))
{
attribute->mutable_value()->set_string_value(nostd::get<std::string>(value));
}
else if (nostd::holds_alternative<std::vector<bool>>(value))
{
for (const auto &val : nostd::get<std::vector<bool>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_bool_value(val);
}
}
else if (nostd::holds_alternative<std::vector<int32_t>>(value))
{
for (const auto &val : nostd::get<std::vector<int32_t>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val);
}
}
else if (nostd::holds_alternative<std::vector<uint32_t>>(value))
{
for (const auto &val : nostd::get<std::vector<uint32_t>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val);
}
}
else if (nostd::holds_alternative<std::vector<int64_t>>(value))
{
for (const auto &val : nostd::get<std::vector<int64_t>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val);
}
}
else if (nostd::holds_alternative<std::vector<uint64_t>>(value))
{
for (const auto &val : nostd::get<std::vector<uint64_t>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val);
}
}
else if (nostd::holds_alternative<std::vector<double>>(value))
{
for (const auto &val : nostd::get<std::vector<double>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_double_value(val);
}
}
else if (nostd::holds_alternative<std::vector<std::string>>(value))
{
for (const auto &val : nostd::get<std::vector<std::string>>(value))
{
attribute->mutable_value()->mutable_array_value()->add_values()->set_string_value(val);
}
}
}

proto::resource::v1::Resource Recordable::resource() const noexcept
{
proto::resource::v1::Resource proto;
if (resource_)
{
for (const auto &kv : resource_->GetAttributes())
{
PopulateAttribute(proto.add_attributes(), kv.first, kv.second);
}
}
return proto;
}

void Recordable::SetResource(const opentelemetry::sdk::resource::Resource &resource) noexcept
{
resource_ = &resource;
};

void Recordable::SetAttribute(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept
{
Expand Down Expand Up @@ -213,6 +324,8 @@ void Recordable::SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept
span_.set_kind(proto_span_kind);
}

void SetResource(const opentelemetry::sdk::resource::Resource &resource) noexcept {}
lalitb marked this conversation as resolved.
Show resolved Hide resolved

void Recordable::SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept
{
span_.set_start_time_unix_nano(start_time.time_since_epoch().count());
Expand Down
3 changes: 2 additions & 1 deletion exporters/zipkin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ if(BUILD_TESTING)

target_link_libraries(
zipkin_recordable_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
opentelemetry_exporter_zipkin_trace http_client_curl)
opentelemetry_exporter_zipkin_trace opentelemetry_resources
http_client_curl)

gtest_add_tests(
TARGET zipkin_recordable_test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class Recordable final : public sdk::trace::Recordable
public:
const ZipkinSpan &span() const noexcept { return span_; }

const std::string &GetServiceName() const noexcept { return service_name_; }

void SetIdentity(const opentelemetry::trace::SpanContext &span_context,
opentelemetry::trace::SpanId parent_span_id) noexcept override;

Expand All @@ -57,7 +59,9 @@ class Recordable final : public sdk::trace::Recordable

void SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept override;

virtual void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override;
void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override;

void SetResource(const opentelemetry::sdk::resource::Resource &resource) noexcept override;

void SetDuration(std::chrono::nanoseconds duration) noexcept override;

Expand All @@ -67,6 +71,7 @@ class Recordable final : public sdk::trace::Recordable

private:
ZipkinSpan span_;
std::string service_name_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking here about some optimization opportunity: how often do you expect service_name value to change, i.e. do you have to copy it here for every recordable, then again into every JSON (ZipkinSpan) object?

Copy link
Member Author

@lalitb lalitb Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can think of some optimization of not passing resource and instrumentation library as part of every Recordable( and instead just passing tracer reference from where we can get both), but still, these data need to be copied to every ZipkinSpan. ( https://zipkin.io/zipkin-api/#/default/post_spans ).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine for now. I am looking at Zipkin exporter as example for Fluentd exporter, I can share some thoughts how I would've rearranged this in the Fluentd exporter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will look forward to that :)

};
} // namespace zipkin
} // namespace exporter
Expand Down
10 changes: 10 additions & 0 deletions exporters/zipkin/src/recordable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ void Recordable::SetName(nostd::string_view name) noexcept
span_["name"] = name.data();
}

void Recordable::SetResource(const opentelemetry::sdk::resource::Resource &resource) noexcept
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming re. thread-safety of SetResource vs GetServiceName -- both cannot be called at once from two different threads?

Copy link
Member Author

@lalitb lalitb Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetResource would be called when Span is created using Tracer::StartSpan(). And GetServiceName() would be when Span is ending : Span::End() -> Processor::OnEnd() -> Exporter::Export(). Both these need to be sequential in same thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would let it go for now. But we may need to describe some of these expectations where we do not explicitly shield the access with a mutex, esp. in the other place where we return an object (that may get changed in another thread) by reference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Specifically for Resources, these are immutable as per the specs, with their ownership transferred to TracerProvider while it's initialization. So there won't be race condition arising while accessing them within the pipeline.

{
// only service.name attribute is supported by specs as of now.
auto attributes = resource.GetAttributes();
if (attributes.find("service.name") != attributes.end())
{
service_name_ = nostd::get<std::string>(attributes["service.name"]);
}
}

void Recordable::SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept
{
span_["timestamp"] =
Expand Down
6 changes: 6 additions & 0 deletions exporters/zipkin/src/zipkin_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ sdk::common::ExportResult ZipkinExporter::Export(
auto json_span = rec->span();
// add localEndPoint
json_span["localEndpoint"] = local_end_point_;
// check service.name
auto service_name = rec->GetServiceName();
if (service_name.size())
{
json_span["localEndpoint"]["serviceName"] = service_name;
}
json_spans.push_back(json_span);
}
}
Expand Down
9 changes: 9 additions & 0 deletions exporters/zipkin/test/zipkin_recordable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ TEST(ZipkinSpanRecordable, SetArrayAtrribute)
EXPECT_EQ(rec.span(), j_span);
}

TEST(ZipkinSpanRecordable, SetResource)
{
opentelemetry::exporter::zipkin::Recordable rec;
std::string service_name = "test";
auto resource = opentelemetry::sdk::resource::Resource::Create({{"service.name", service_name}});
rec.SetResource(resource);
EXPECT_EQ(rec.GetServiceName(), service_name);
}

/**
* AttributeValue can contain different int types, such as int, int64_t,
* unsigned int, and uint64_t. To avoid writing test cases for each, we can
Expand Down
5 changes: 5 additions & 0 deletions ext/include/opentelemetry/ext/zpages/threadsafe_span_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ class ThreadsafeSpanData final : public opentelemetry::sdk::trace::Recordable
span_kind_ = span_kind;
}

void SetResource(const opentelemetry::sdk::resource::Resource & /*resource*/) noexcept override
{
// Not Implemented
}

void SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept override
{
std::lock_guard<std::mutex> lock(mutex_);
Expand Down
Loading