Skip to content

Commit

Permalink
transform/logging: Introduce transform_log_event
Browse files Browse the repository at this point in the history
Includes RapidJSON serialization for a mapping of the OpenTelemetry
logging format [1] and some internal helper types to that end.

Example JSON:

```
{
  "body": {
    "stringValue": "my message goes here...",
  },
  "timeUnixNano": 1705462202516000000,
  "severityNumber": 9, // 9 for stdout, 13 for stderr
  "attributes": [
    {"key": "node", "value": {"intValue": 1}},
    {"key": "transform_name", "value": {"stringValue": "<xform name>"}}
  ]
}
```

1. https://opentelemetry.io/docs/specs/otel/logs/data-model/

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
(cherry picked from commit 35ff58b)
  • Loading branch information
oleiman authored and rockwotj committed Jan 30, 2024
1 parent 5fc6a94 commit ba94fea
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/transform/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ v_cc_library(

add_subdirectory(tests)
add_subdirectory(rpc)
add_subdirectory(logging)
11 changes: 11 additions & 0 deletions src/v/transform/logging/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
v_cc_library(
NAME transform_logging
HDRS
event.h
SRCS
event.cc
DEPS
v::model
)

add_subdirectory(tests)
134 changes: 134 additions & 0 deletions src/v/transform/logging/event.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "transform/logging/event.h"

#include "bytes/streambuf.h"
#include "json/json.h"
#include "json/ostreamwrapper.h"

#include <type_traits>

using namespace std::chrono_literals;

namespace {

using otel_xform_name_attr
= named_type<model::transform_name_view, struct otel_xform_name_attr_tag>;

using otel_node_id_attr
= named_type<model::node_id, struct otel_node_id_attr_tag>;

struct otel_log_event {
otel_log_event() = delete;
explicit otel_log_event(
model::transform_name_view name, const transform::logging::event& ev)
: name(name)
, source_id(ev.source_id)
, ts(ev.ts.time_since_epoch() / 1ns)
, level(ev.level)
, message(ev.message) {}
otel_xform_name_attr name;
otel_node_id_attr source_id;
uint64_t ts;
ss::log_level level;
std::string_view message;
};

} // namespace

namespace json {

inline void rjson_serialize(
Writer<OStreamWrapper>& w, const model::transform_name_view& name) {
w.String(name().data(), name().size());
}

inline void
rjson_serialize(Writer<OStreamWrapper>& w, const ::otel_xform_name_attr& name) {
w.StartObject();
w.Key("key");
w.String("transform_name");
w.Key("value");
w.StartObject();
w.Key("stringValue");
rjson_serialize(w, name());
w.EndObject();
w.EndObject();
}

inline void
rjson_serialize(Writer<OStreamWrapper>& w, const ::otel_node_id_attr& nid) {
w.StartObject();
w.Key("key");
w.String("node");
w.Key("value");
w.StartObject();
w.Key("intValue");
w.Int(static_cast<int>(nid()));
w.EndObject();
w.EndObject();
}

inline void
rjson_serialize(Writer<OStreamWrapper>& w, const ::otel_log_event& ev) {
w.StartObject();
w.Key("body");
w.String(ev.message.data(), ev.message.size());
w.Key("timeUnixNano");
w.Uint64(ev.ts);
w.Key("severityNumber");
w.Uint(transform::logging::log_level_to_severity(ev.level));
w.Key("attributes");
w.StartArray();
rjson_serialize(w, ev.name);
rjson_serialize(w, ev.source_id);
w.EndArray();
w.EndObject();
}

} // namespace json

namespace transform::logging {

uint32_t log_level_to_severity(ss::log_level lvl) {
static const std::unordered_map<ss::log_level, int> severity{
{ss::log_level::trace, 1},
{ss::log_level::debug, 5},
{ss::log_level::info, 9},
{ss::log_level::warn, 13},
{ss::log_level::error, 17},
};
return severity.at(lvl);
}

event::event(
model::node_id source,
clock_type::time_point ts,
ss::log_level level,
ss::sstring message)
: source_id(source)
, ts(ts)
, level(level)
, message(std::move(message)) {}

void event::to_json(model::transform_name_view name, iobuf& b) const {
iobuf_ostreambuf obuf{b};
std::ostream os{&obuf};
::json::OStreamWrapper wrapper{os};
::json::Writer<::json::OStreamWrapper> writer{wrapper};

using ::json::rjson_serialize;

rjson_serialize(writer, ::otel_log_event{name, *this});
}

} // namespace transform::logging
67 changes: 67 additions & 0 deletions src/v/transform/logging/event.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "model/transform.h"
#include "seastarx.h"
#include "utils/named_type.h"

#include <seastar/core/sstring.hh>

#pragma once

namespace transform::logging {

/**
* Map a seastar log level to a SeverityNumber as specified in
* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
*/
uint32_t log_level_to_severity(ss::log_level lvl);

/**
* A singe log message emitted by some WASM data transform.
*
* Transforms emit log messages by writing to stdout/stderr, which
* is intercepted at the WASI layer and forwarded to the transform
* logging subsystem.
*
* That message is then packaged into an `event`, along with the source
* node ID, a timestamp, and a log_level, the latter of which is determined
* by whether the transform wrote to stdout (ss::log_level::info) or
* stderr (ss::log_level::warn).
*
*/
struct event {
using clock_type = std::chrono::system_clock;
event() = delete;
explicit event(
model::node_id source,
clock_type::time_point ts,
ss::log_level level,
ss::sstring message);

model::node_id source_id;
clock_type::time_point ts;
ss::log_level level;
ss::sstring message;

friend bool operator==(const event&, const event&) = default;

/**
* Serialize this event to JSON, with the result mapping directly to:
* https://github.com/open-telemetry/opentelemetry-proto/blob/34d29fe5ad4689b5db0259d3750de2bfa195bc85/opentelemetry/proto/logs/v1/logs.proto#L134
*
* NOTE: We include a `name` for the attributes here, but the transform
* name is omitted from the event itself to avoid duplicating it across
* every log entry.
*/
void to_json(model::transform_name_view name, iobuf&) const;
};
} // namespace transform::logging

0 comments on commit ba94fea

Please sign in to comment.