Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement InvocationSpanContext for user tracing / streaming tail workers #3028

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -334,5 +334,6 @@ kj_test(
src = "trace-test.c++",
deps = [
":trace",
"//src/workerd/util:thread-scopes",
],
)
50 changes: 50 additions & 0 deletions src/workerd/io/trace-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
// https://opensource.org/licenses/Apache-2.0

#include <workerd/io/trace.h>
#include <workerd/util/thread-scopes.h>

#include <capnp/message.h>
#include <kj/test.h>

namespace workerd::tracing {
Expand Down Expand Up @@ -62,5 +64,53 @@ KJ_TEST("can write trace ID protobuf format") {
"\xfe\xdc\xba\x98\x76\x54\x32\x12\xfe\xdc\xba\x98\x76\x54\x32\x11"_kjb);
}

KJ_TEST("InvocationSpanContext") {
setPredictableModeForTest();
auto sc = InvocationSpanContext::newForInvocation();

// We can create an InvocationSpanContext...
static constexpr auto kCheck = TraceId(0x2a2a2a2a2a2a2a2a, 0x2a2a2a2a2a2a2a2a);
KJ_EXPECT(sc->getTraceId() == kCheck);
KJ_EXPECT(sc->getInvocationId() == kCheck);
KJ_EXPECT(sc->getSpanId() == 0);

// And serialize that to a capnp struct...
capnp::MallocMessageBuilder builder;
auto root = builder.initRoot<rpc::InvocationSpanContext>();
sc->toCapnp(root);

// Then back again...
auto sc2 = KJ_ASSERT_NONNULL(InvocationSpanContext::fromCapnp(root.asReader()));
KJ_EXPECT(sc2->getTraceId() == kCheck);
KJ_EXPECT(sc2->getInvocationId() == kCheck);
KJ_EXPECT(sc2->getSpanId() == 0);
KJ_EXPECT(sc2->isTrigger());

// The one that has been deserialized from capnp cannot create children...
try {
sc2->newChild();
KJ_FAIL_ASSERT("should not be able to create child span with SpanContext from capnp");
} catch (kj::Exception& ex) {
KJ_EXPECT(ex.getDescription() ==
"expected counter != nullptr; unable to create child spans on this context"_kj);
}

auto sc3 = sc->newChild();
KJ_EXPECT(sc3->getTraceId() == kCheck);
KJ_EXPECT(sc3->getInvocationId() == kCheck);
KJ_EXPECT(sc3->getSpanId() == 1);

auto sc4 = InvocationSpanContext::newForInvocation(sc2);
KJ_EXPECT(sc4->getTraceId() == kCheck);
KJ_EXPECT(sc4->getInvocationId() == kCheck);
KJ_EXPECT(sc4->getSpanId() == 0);

auto& sc5 = KJ_ASSERT_NONNULL(sc4->getParent());
KJ_EXPECT(sc5->getTraceId() == kCheck);
KJ_EXPECT(sc5->getInvocationId() == kCheck);
KJ_EXPECT(sc5->getSpanId() == 0);
KJ_EXPECT(sc5->isTrigger());
}

} // namespace
} // namespace workerd::tracing
67 changes: 67 additions & 0 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,73 @@ kj::String KJ_STRINGIFY(const TraceId& id) {
return id;
}

InvocationSpanContext::InvocationSpanContext(kj::Badge<InvocationSpanContext>,
kj::Maybe<kj::Rc<SpanIdCounter>> counter,
TraceId traceId,
TraceId invocationId,
kj::uint spanId,
kj::Maybe<kj::Rc<InvocationSpanContext>> parentSpanContext)
: counter(kj::mv(counter)),
traceId(kj::mv(traceId)),
invocationId(kj::mv(invocationId)),
spanId(spanId),
parentSpanContext(kj::mv(parentSpanContext)) {}

kj::Rc<InvocationSpanContext> InvocationSpanContext::newChild() {
auto& c = KJ_ASSERT_NONNULL(counter, "unable to create child spans on this context");
return kj::rc<InvocationSpanContext>(kj::Badge<InvocationSpanContext>(), c.addRef(), traceId,
invocationId, c->next(), addRefToThis());
}

kj::Rc<InvocationSpanContext> InvocationSpanContext::newForInvocation(
kj::Maybe<kj::Rc<InvocationSpanContext>&> triggerContext,
kj::Maybe<kj::EntropySource&> entropySource) {
kj::Maybe<kj::Rc<InvocationSpanContext>> parent;
auto traceId = triggerContext
.map([&](kj::Rc<InvocationSpanContext>& ctx) {
parent = ctx.addRef();
return ctx->traceId;
}).orDefault([&] { return TraceId::fromEntropy(entropySource); });
return kj::rc<InvocationSpanContext>(kj::Badge<InvocationSpanContext>(), kj::rc<SpanIdCounter>(),
kj::mv(traceId), TraceId::fromEntropy(entropySource), 0, kj::mv(parent));
}

TraceId TraceId::fromCapnp(rpc::InvocationSpanContext::TraceId::Reader reader) {
return TraceId(reader.getLow(), reader.getHigh());
}

void TraceId::toCapnp(rpc::InvocationSpanContext::TraceId::Builder writer) const {
writer.setLow(low);
writer.setHigh(high);
}

kj::Maybe<kj::Rc<InvocationSpanContext>> InvocationSpanContext::fromCapnp(
rpc::InvocationSpanContext::Reader reader) {
if (!reader.hasTraceId() || !reader.hasInvocationId()) {
// If the reader does not have a traceId or invocationId field then it is
// invalid and we will just ignore it.
return kj::none;
}

auto sc = kj::rc<InvocationSpanContext>(kj::Badge<InvocationSpanContext>(), kj::none,
TraceId::fromCapnp(reader.getTraceId()), TraceId::fromCapnp(reader.getInvocationId()),
reader.getSpanId());
// If the traceId or invocationId are invalid, then we'll ignore them.
if (!sc->getTraceId() || !sc->getInvocationId()) return kj::none;
Comment on lines +209 to +210
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, maybe we should just have a helper method to check validness.

Suggested change
// If the traceId or invocationId are invalid, then we'll ignore them.
if (!sc->getTraceId() || !sc->getInvocationId()) return kj::none;
// If the traceId or invocationId are invalid, then we'll ignore them.
if (!sc->hasTraceId() || !sc->hasInvocationId()) return kj::none;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting renaming these because there is no hasTraceId() or hasInvocationId(). These both return const TraceId& which implements an operator bool(), which means these work as is.

return kj::mv(sc);
}

void InvocationSpanContext::toCapnp(rpc::InvocationSpanContext::Builder writer) const {
traceId.toCapnp(writer.initTraceId());
invocationId.toCapnp(writer.initInvocationId());
writer.setSpanId(spanId);
kj::mv(getParent());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the parent context move, or is this just used to invalidate it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just invalidates.

}

kj::String KJ_STRINGIFY(const kj::Rc<InvocationSpanContext>& context) {
return kj::str(context->getTraceId(), "-", context->getInvocationId(), "-", context->getSpanId());
}

} // namespace tracing

// Approximately how much external data we allow in a trace before we start ignoring requests. We
Expand Down
101 changes: 101 additions & 0 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,114 @@ class TraceId final {
return high;
}

static TraceId fromCapnp(rpc::InvocationSpanContext::TraceId::Reader reader);
void toCapnp(rpc::InvocationSpanContext::TraceId::Builder writer) const;

private:
uint64_t low = 0;
uint64_t high = 0;
};
constexpr TraceId TraceId::nullId = nullptr;

// The InvocationSpanContext is a tuple of a trace id, invocation id, and span id.
// The trace id represents a top-level request and should be shared across all
// invocation spans and events within those spans. The invocation id identifies
// a specific worker invocation. The span id identifies a specific span within an
// invocation. Every invocation of every worker should have an InvocationSpanContext.
// That may or may not have a trigger InvocationSpanContext.
class InvocationSpanContext final: public kj::Refcounted,
public kj::EnableAddRefToThis<InvocationSpanContext> {
public:
// Spans within a InvocationSpanContext are identified by a span id that is a
// monotically increasing number. Every InvocationSpanContext has a root span
// whose ID is zero. Every child span context created within that context will
// have a span id that is one greater than the previously created one.
class SpanIdCounter final: public kj::Refcounted {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpanIdCounter felt overengineered at a first glance – using an increasing unsigned counter value correctly should not be difficult as long as it only needs to be done within the span implementation itself. I take that this is being used for counter memory management so that it is easier to acquire increasing, unique span IDs across several "branches" of the trace?

Copy link
Member Author

@jasnell jasnell Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's exactly it. I didn't put much effort into designing this bit and just Did Something That Works. If you have an alternative suggested approach here I'm happy to change this part. The key requirement is that every descendent span should be assigned the next monotonically increasing value without those having to be aware of each other in any other way.

e.g.

root span (0)

create child span off root (1)

create child span off root (2)

create child span off 1 (3)

create child span off 3 (4)

create child span off 2 (5)

....

public:
SpanIdCounter() = default;
KJ_DISALLOW_COPY_AND_MOVE(SpanIdCounter);

inline kj::uint next() {
static constexpr kj::uint kMax = kj::maxValue;
KJ_ASSERT(id < kMax, "max number of spans exceeded");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not objecting to this assert, but if 2^32 spans are reached we have bigger problems 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant for this to be a KJ_DASSERT but this is just me being way over paranoid lol.

return id++;
}

private:
kj::uint id = 1;
};

// The constructor is public only so kj::rc can see it and create a new instance.
// User code should use the static factory methods or the newChild method.
InvocationSpanContext(kj::Badge<InvocationSpanContext>,
kj::Maybe<kj::Rc<SpanIdCounter>> counter,
TraceId traceId,
TraceId invocationId,
kj::uint spanId = 0,
kj::Maybe<kj::Rc<InvocationSpanContext>> parentSpanContext = kj::none);
KJ_DISALLOW_COPY_AND_MOVE(InvocationSpanContext);

inline const TraceId& getTraceId() const {
return traceId;
}

inline const TraceId& getInvocationId() const {
return invocationId;
}

inline const kj::uint getSpanId() const {
return spanId;
}

inline const kj::Maybe<kj::Rc<InvocationSpanContext>>& getParent() const {
return parentSpanContext;
}

// Creates a new child span. If the current context does not have a counter,
// then this will assert. If isTrigger() is true then it will not have a
// counter.
kj::Rc<InvocationSpanContext> newChild();

// An InvocationSpanContext is a trigger context if it has no counter. This
// generally means the SpanContext was create from a capnp message and
// represents an InvocationSpanContext that was propagated from a parent
// or triggering context.
bool isTrigger() const {
return counter == kj::none;
}

// Creates a new InvocationSpanContext. If the triggerContext is given, then its
// traceId is used as the traceId for the newly created context. Otherwise a new
// traceId is generated. The invocationId is always generated new and the spanId
// will be 0 with no parent span.
static kj::Rc<InvocationSpanContext> newForInvocation(
kj::Maybe<kj::Rc<InvocationSpanContext>&> triggerContext = kj::none,
kj::Maybe<kj::EntropySource&> entropySource = kj::none);

// Creates a new InvocationSpanContext from a capnp message. The returned
// InvocationSpanContext will not be capable of creating child spans and
// is considered only a "trigger" span.
static kj::Maybe<kj::Rc<InvocationSpanContext>> fromCapnp(
rpc::InvocationSpanContext::Reader reader);
void toCapnp(rpc::InvocationSpanContext::Builder writer) const;

private:
// If there is no counter, then child spans cannot be created from
// this InvocationSpanContext.
kj::Maybe<kj::Rc<SpanIdCounter>> counter;
const TraceId traceId;
const TraceId invocationId;
const kj::uint spanId;

// The parentSpanContext can be either a direct parent or a trigger
// context. If it is a trigger context, then it should have the same
// traceId but a different invocationId (unless predictable mode for
// testing is enabled). The isTrigger() should also return true.
const kj::Maybe<kj::Rc<InvocationSpanContext>> parentSpanContext;
};

kj::String KJ_STRINGIFY(const TraceId& id);
kj::String KJ_STRINGIFY(const kj::Rc<InvocationSpanContext>& context);
} // namespace tracing

enum class PipelineLogLevel {
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ using import "/capnp/compat/byte-stream.capnp".ByteStream;
using import "/workerd/io/outcome.capnp".EventOutcome;
using import "/workerd/io/script-version.capnp".ScriptVersion;

struct InvocationSpanContext {
struct TraceId {
high @0 :UInt64;
jasnell marked this conversation as resolved.
Show resolved Hide resolved
low @1 :UInt64;
}
traceId @0 :TraceId;
invocationId @1 :TraceId;
spanId @2 :UInt32;
}

struct Trace @0x8e8d911203762d34 {
logs @0 :List(Log);
struct Log {
Expand Down