Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[o11y] Provide OTel-compatible user span representation in trace #3180

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <workerd/util/uuid.h>

#include <capnp/schema.h>
#include <kj/encoding.h>

namespace workerd::api {

Expand Down Expand Up @@ -74,15 +75,11 @@ jsg::V8Ref<v8::Object> getTraceLogMessage(jsg::Lock& js, const tracing::Log& log
}

kj::Array<jsg::Ref<TraceLog>> getTraceLogs(jsg::Lock& js, const Trace& trace) {
auto builder = kj::heapArrayBuilder<jsg::Ref<TraceLog>>(trace.logs.size() + trace.spans.size());
for (auto i: kj::indices(trace.logs)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.logs[i]));
}
// Add spans represented as logs to the logs object.
for (auto i: kj::indices(trace.spans)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.spans[i]));
}
return builder.finish();
return KJ_MAP(x, trace.logs) -> jsg::Ref<TraceLog> { return jsg::alloc<TraceLog>(js, trace, x); };
}

kj::Array<jsg::Ref<OTelSpan>> getTraceSpans(const Trace& trace) {
return KJ_MAP(x, trace.spans) -> jsg::Ref<OTelSpan> { return jsg::alloc<OTelSpan>(x); };
}

kj::Array<jsg::Ref<TraceDiagnosticChannelEvent>> getTraceDiagnosticChannelEvents(
Expand Down Expand Up @@ -209,6 +206,7 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace)
dispatchNamespace(trace.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })),
scriptTags(getTraceScriptTags(trace)),
executionModel(enumToStr(trace.executionModel)),
spans(getTraceSpans(trace)),
outcome(enumToStr(trace.outcome)),
cpuTime(trace.cpuTime / kj::MILLISECONDS),
wallTime(trace.wallTime / kj::MILLISECONDS),
Expand Down Expand Up @@ -290,6 +288,10 @@ kj::StringPtr TraceItem::getExecutionModel() {
return executionModel;
}

kj::ArrayPtr<jsg::Ref<OTelSpan>> TraceItem::getSpans() {
return spans;
}

kj::StringPtr TraceItem::getOutcome() {
return outcome;
}
Expand Down Expand Up @@ -553,6 +555,47 @@ bool TraceItem::HibernatableWebSocketEventInfo::Close::getWasClean() {
return eventInfo.wasClean;
}

kj::StringPtr OTelSpan::getOperation() {
return operation;
}

kj::Date OTelSpan::getStartTime() {
return startTime;
}

kj::StringPtr OTelSpan::getSpanID() {
return spanId;
}
kj::StringPtr OTelSpan::getParentSpanID() {
return parentSpanId;
}

kj::Date OTelSpan::getEndTime() {
return endTime;
}

kj::ArrayPtr<OTelSpanTag> OTelSpan::getTags() {
return tags;
}

OTelSpan::OTelSpan(const CompleteSpan& span)
: operation(kj::str(span.operationName)),
startTime(span.startTime),
endTime(span.endTime),
tags(kj::heapArray<OTelSpanTag>(span.tags.size())) {
// IDs are represented as network-order hex strings.
uint64_t netSpanId = __builtin_bswap64(span.spanId);
uint64_t netParentSpanId = __builtin_bswap64(span.parentSpanId);
spanId = kj::encodeHex(kj::ArrayPtr<byte>((kj::byte*)&netSpanId, sizeof(uint64_t)));
parentSpanId = kj::encodeHex(kj::ArrayPtr<byte>((kj::byte*)&netParentSpanId, sizeof(uint64_t)));
uint32_t i = 0;
for (auto& tag: span.tags) {
tags[i].key = kj::str(tag.key);
tags[i].value = spanTagClone(tag.value);
i++;
}
}

TraceLog::TraceLog(jsg::Lock& js, const Trace& trace, const tracing::Log& log)
: timestamp(getTraceLogTimestamp(log)),
level(getTraceLogLevel(log)),
Expand Down
60 changes: 57 additions & 3 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,54 @@ struct ScriptVersion {
}
};

struct OTelSpanTag final: public jsg::Object {
kj::String key;
kj::OneOf<bool, int64_t, double, kj::String> value;
JSG_STRUCT(key, value);
};

// OpenTelemetry-compatible span data exposed as part of the trace. Loosely based on https://github.com/open-telemetry/opentelemetry-js/blob/v1.28.0/experimental/packages/otlp-transformer/src/trace/types.ts#L64
class OTelSpan final: public jsg::Object {
public:
OTelSpan(const CompleteSpan& span);
kj::StringPtr getSpanID();
kj::StringPtr getParentSpanID();
kj::StringPtr getOperation();
kj::ArrayPtr<OTelSpanTag> getTags();
kj::Date getStartTime();
kj::Date getEndTime();

JSG_RESOURCE_TYPE(OTelSpan) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(spanId, getSpanID);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(parentSpanId, getParentSpanID);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(operation, getOperation);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(tags, getTags);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(startTime, getStartTime);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(endTime, getEndTime);
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("operation", operation);
for (const OTelSpanTag& tag: tags) {
tracker.trackField("key", tag.key);
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(str, kj::String) {
tracker.trackField("value", str);
}
KJ_CASE_ONEOF_DEFAULT break;
}
}
}

private:
kj::String spanId;
kj::String parentSpanId;
kj::String operation;
kj::Date startTime;
kj::Date endTime;
kj::Array<OTelSpanTag> tags;
};

class TraceItem final: public jsg::Object {
public:
class FetchEventInfo;
Expand Down Expand Up @@ -102,16 +150,20 @@ class TraceItem final: public jsg::Object {
jsg::Optional<kj::StringPtr> getDispatchNamespace();
jsg::Optional<kj::Array<kj::StringPtr>> getScriptTags();
kj::StringPtr getExecutionModel();
kj::ArrayPtr<jsg::Ref<OTelSpan>> getSpans();
kj::StringPtr getOutcome();

uint getCpuTime();
uint getWallTime();
bool getTruncated();

JSG_RESOURCE_TYPE(TraceItem) {
JSG_RESOURCE_TYPE(TraceItem, CompatibilityFlags::Reader flags) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(event, getEvent);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(eventTimestamp, getEventTimestamp);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(logs, getLogs);
if (flags.getTailWorkerUserSpans()) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(spans, getSpans);
}
JSG_LAZY_READONLY_INSTANCE_PROPERTY(exceptions, getExceptions);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(diagnosticsChannelEvents, getDiagnosticChannelEvents);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(scriptName, getScriptName);
Expand All @@ -138,6 +190,7 @@ class TraceItem final: public jsg::Object {
kj::Maybe<kj::String> dispatchNamespace;
jsg::Optional<kj::Array<kj::String>> scriptTags;
kj::String executionModel;
kj::Array<jsg::Ref<OTelSpan>> spans;
kj::String outcome;
uint cpuTime;
uint wallTime;
Expand Down Expand Up @@ -647,8 +700,9 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent {
api::TraceItem::HibernatableWebSocketEventInfo, \
api::TraceItem::HibernatableWebSocketEventInfo::Message, \
api::TraceItem::HibernatableWebSocketEventInfo::Close, \
api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \
api::TraceDiagnosticChannelEvent, api::TraceMetrics, api::UnsafeTraceMetrics
api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::OTelSpan, \
api::OTelSpanTag, api::TraceException, api::TraceDiagnosticChannelEvent, api::TraceMetrics, \
api::UnsafeTraceMetrics
// The list of trace.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ wd_capnp_library(
deps = [
":outcome_capnp",
":script-version_capnp",
":trace_capnp",
"@capnp-cpp//src/capnp/compat:http-over-capnp_capnp",
],
)
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -668,4 +668,9 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# A bug in the original implementation of TransformStream failed to apply backpressure
# correctly. The fix, however, can break existing implementations that don't account
# for the bug so we need to put the fix behind a compat flag.

# Experimental support for exporting user spans to tail worker.
tailWorkerUserSpans @69 :Bool
$compatEnableFlag("tail_worker_user_spans")
$experimental;
}
129 changes: 100 additions & 29 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -582,13 +582,17 @@ Trace::~Trace() noexcept(false) {}

void Trace::copyTo(rpc::Trace::Builder builder) {
{
auto list = builder.initLogs(logs.size() + spans.size());
auto list = builder.initLogs(logs.size());
for (auto i: kj::indices(logs)) {
logs[i].copyTo(list[i]);
}
// Add spans represented as logs to the logs object.
}

{
// Add spans to the builder.
auto list = builder.initSpans(spans.size());
for (auto i: kj::indices(spans)) {
spans[i].copyTo(list[i + logs.size()]);
spans[i].copyTo(list[i]);
}
}

Expand Down Expand Up @@ -719,6 +723,7 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev
// "full", so we may need to filter out the extra data after receiving the traces back.
if (pipelineLogLevel != PipelineLogLevel::NONE) {
logs.addAll(reader.getLogs());
spans.addAll(reader.getSpans());
exceptions.addAll(reader.getExceptions());
diagnosticChannelEvents.addAll(reader.getDiagnosticChannelEvents());
}
Expand Down Expand Up @@ -1682,7 +1687,10 @@ WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel exe
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none, executionModel)),
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}

void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) {
kj::LiteralStringConst logSizeExceeded =
"[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]"_kjc;

void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message) {
if (trace->exceededLogLimit) {
return;
}
Expand All @@ -1694,47 +1702,80 @@ void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String mess
trace->exceededLogLimit = true;
trace->truncated = true;
// We use a JSON encoded array/string to match other console.log() recordings:
trace->logs.add(timestamp, LogLevel::WARN,
kj::str(
"[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]"));
trace->logs.add(timestamp, LogLevel::WARN, kj::str(logSizeExceeded));
return;
}
trace->bytesUsed = newSize;
if (isSpan) {
trace->spans.add(timestamp, logLevel, kj::mv(message));
trace->numSpans++;
return;
}
trace->logs.add(timestamp, logLevel, kj::mv(message));
}

void WorkerTracer::addSpan(const Span& span, kj::String spanContext) {
// This is where we'll actually encode the span for now.
void WorkerTracer::addSpan(CompleteSpan&& span) {
// This is where we'll actually encode the span.
// Drop any spans beyond MAX_USER_SPANS.
if (trace->numSpans >= MAX_USER_SPANS) {
return;
}
if (isPredictableModeForTest()) {
// Do not emit span duration information in predictable mode.
addLog(span.endTime, LogLevel::LOG, kj::str("[\"span: ", span.operationName, "\"]"), true);
} else {
// Time since Unix epoch in seconds, with millisecond precision
double epochSecondsStart = (span.startTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
double epochSecondsEnd = (span.endTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
auto message = kj::str("[\"span: ", span.operationName, " ", kj::mv(spanContext), " ",
epochSecondsStart, " ", epochSecondsEnd, "\"]");
addLog(span.endTime, LogLevel::LOG, kj::mv(message), true);
trace->numSpans++;

if (trace->exceededLogLimit) {
return;
}
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}

// TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e
// String) to avoid having to handle each type explicitly here.
// 48B for traceID, spanID, parentSpanID, start & end time.
const int fixedSpanOverhead = 48;
size_t newSize = trace->bytesUsed + fixedSpanOverhead + span.operationName.size();
for (const Span::TagMap::Entry& tag: span.tags) {
kj::String message = kj::str("[\"tag: "_kj, tag.key, " => "_kj, spanTagStr(tag.value), "\"]");
addLog(span.endTime, LogLevel::LOG, kj::mv(message), true);
newSize += tag.key.size();
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(str, kj::String) {
newSize += str.size();
}
KJ_CASE_ONEOF(val, bool) {
newSize++;
}
// int64_t and double
KJ_CASE_ONEOF_DEFAULT {
newSize += sizeof(int64_t);
}
}
}

if (newSize > MAX_TRACE_BYTES) {
trace->exceededLogLimit = true;
trace->truncated = true;
trace->logs.add(span.endTime, LogLevel::WARN, kj::str(logSizeExceeded));
return;
}
trace->bytesUsed = newSize;
trace->spans.add(kj::mv(span));
trace->numSpans++;
}

Span::TagValue spanTagClone(const Span::TagValue& tag) {
KJ_SWITCH_ONEOF(tag) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
}
KJ_CASE_ONEOF(val, int64_t) {
// TODO(o11y): We can't stringify BigInt, which causes test problems. Export this as hex
// instead? Then again OTel assumes that int values can be represented as JS numbers, so
// representing this as a double/Number might be fine despite the possible precision loss.
return kj::str(val);
}
KJ_CASE_ONEOF(val, double) {
return val;
}
KJ_CASE_ONEOF(val, bool) {
return val;
}
}
KJ_UNREACHABLE;
}

kj::String spanTagStr(const kj::OneOf<bool, int64_t, double, kj::String>& tag) {
kj::String spanTagStr(const Span::TagValue& tag) {
KJ_SWITCH_ONEOF(tag) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
Expand Down Expand Up @@ -1785,6 +1826,36 @@ Span::TagValue deserializeTagValue(RpcValue::Reader value) {
}
}

void CompleteSpan::copyTo(rpc::UserSpanData::Builder builder) {
builder.setOperationName(operationName.asPtr());
builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);

auto tagsParam = builder.initTags(tags.size());
auto i = 0;
for (auto& tag: tags) {
auto tagParam = tagsParam[i++];
tagParam.setKey(tag.key.asPtr());
serializeTagValue(tagParam.initValue(), tag.value);
}
}

CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader)
: spanId(reader.getSpanId()),
parentSpanId(reader.getParentSpanId()),
operationName(kj::str(reader.getOperationName())),
startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS),
endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) {
auto tagsParam = reader.getTags();
tags.reserve(tagsParam.size());
for (auto tagParam: tagsParam) {
tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())),
deserializeTagValue(tagParam.getValue()));
}
}

void WorkerTracer::addException(
kj::Date timestamp, kj::String name, kj::String message, kj::Maybe<kj::String> stack) {
if (trace->exceededExceptionLimit) {
Expand Down
Loading
Loading