Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust telemetry to JSON files, instead of OTLP/gRPC #79

Merged
merged 20 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
518 changes: 192 additions & 326 deletions runners/s3-benchrunner-rust/Cargo.lock

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions runners/s3-benchrunner-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@ edition = "2021"
aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "3dd20c8aa0872352100cf456beee02bfc53c73d1" }
# aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }

# tracing-opentelemetry 0.26.0 is a bit broken (see https://github.com/tokio-rs/tracing-opentelemetry/issues/159)
# so use 0.24.0 and the exact opentelemetry-* versions it depends on.
tracing-opentelemetry = "0.24.0"
opentelemetry = { version = "0.23", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.16", features = ["metrics"] }
opentelemetry-semantic-conventions = "0.15.0"
tracing-opentelemetry = "0.27"
opentelemetry = { version = "0.26", features = ["trace"] }
opentelemetry_sdk = { version = "0.26", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-stdout = { version = "0.26", features = ["trace"] }
opentelemetry-semantic-conventions = "0.26"

anyhow = "1.0.86"
async-trait = "0.1.81"
aws-config = "1.5.4"
aws-sdk-s3 = "1.41.0"
bytes = "1"
chrono = "0.4.38"
clap = { version = "4.5.9", features = ["derive"] }
fastrand = "=2.1.0"
futures-util = "0.3"
ordered-float = "4.3.0"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
thiserror = "1.0.62"
tokio = { version = "1.38.1", features = ["io-util"] }
tokio = { version = "1.40.0", features = ["io-util"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
17 changes: 14 additions & 3 deletions runners/s3-benchrunner-rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,19 @@ See further instructions [here](../../README.md#run-a-benchmark).

### Viewing Telemetry

Use the `--telemetry` flag to export OpenTelemetry data to http://localhost:4317 as OTLP/gRPC payloads.
Use the `--telemetry` flag to export OpenTelemetry Protocol data as a `trace_*.json` files in the working directory.

The simplest way I know collect and view this data is with [Jaeger All in One](https://www.jaegertracing.io/docs/latest/getting-started/) or [otel-desktop-viewer](https://github.com/CtrlSpice/otel-desktop-viewer?tab=readme-ov-file#getting-started). Get one of these running, run the benchmark with the `--telemetry` flag, then view the data in your browser.
Run `graph.py`:
```sh
usage: graph.py [-h] TRACE_JSON

Graph a benchmark run

positional arguments:
TRACE_JSON trace_*.json file to graph.

options:
-h, --help show this help message and exit
```

TODO: document how to collect and view data from a non-local run.
View new `trace_*.html` file, in same directory as `trace_*.json` file.
40 changes: 40 additions & 0 deletions runners/s3-benchrunner-rust/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import argparse
import json
from pathlib import Path

from graph import PerfTimer
import graph.allspans

PARSER = argparse.ArgumentParser(description="Graph a benchmark run")

# File contains JSON representation of OTLP TracesData.
# Contents look like:
# {"resourceSpans":[
# {"resource": {"attributes":[{"key":"service.name","value":{"stringValue":"s3-benchrunner-rust"}}, ...]},
# "scopeSpans":[
# {"scope":{"name":"s3-benchrunner-rust"},
# "spans":[
# {"traceId":"0e506aee98c24b869337620977f30cbb","spanId":"6fb4c16d1d1652d6", ...},
# {"traceId":"0e506aee98c24b869337620977f30cbb","spanId":"6440f82fb6fc6299", ...},
# ...
#
# Official protobuf format specified here:
# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
#
# Note that when proto data is mapped to JSON, snake_case names become camelCase
# see: https://protobuf.dev/programming-guides/proto3/#json
PARSER.add_argument('TRACE_JSON', help="trace_*.json file to graph.")

args = PARSER.parse_args()

with PerfTimer(f'Open {args.TRACE_JSON}'):
with open(args.TRACE_JSON) as f:
traces_data = json.load(f)

with PerfTimer('Graph all spans'):
fig = graph.allspans.draw(traces_data)

html_path = Path(args.TRACE_JSON).with_suffix('.html')
with PerfTimer(f'Write {html_path}'):
fig.write_html(html_path)
16 changes: 16 additions & 0 deletions runners/s3-benchrunner-rust/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import time


class PerfTimer:
"""Context manager that prints how long a `with` statement took"""

def __init__(self, name):
self.name = name

def __enter__(self):
self.start = time.perf_counter()

def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
end = time.perf_counter()
print(f"{self.name}: {end - self.start:.3f} sec")
160 changes: 160 additions & 0 deletions runners/s3-benchrunner-rust/graph/allspans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from collections import defaultdict
import pandas as pd # type: ignore
import plotly.express as px # type: ignore


def draw(data):
# gather all spans into a single list
spans = []
for resource_span in data['resourceSpans']:
for scope_span in resource_span['scopeSpans']:
spans.extend(scope_span['spans'])

# simplify attributes of each span to be simple dict
for span in spans:
span['attributes'] = _simplify_attributes(span['attributes'])

# sort spans according to parent-child hierarchy
spans = _sort_spans_by_hierarchy(spans)

# prepare columns for plotly
columns = defaultdict(list)
name_count = defaultdict(int)
for (idx, span) in enumerate(spans):

name = span['name']
# we want each span in its own row, so assign a unique name and use that as Y value
# TODO: improve unique name, using "seq" or "part-num"
name_count[name] += 1
unique_name = f"{name}#{name_count[name]}"
graebm marked this conversation as resolved.
Show resolved Hide resolved

columns['Name'].append(name)
columns['Unique Name'].append(unique_name)
columns['Duration (ns)'].append(
span['endTimeUnixNano'] - span['startTimeUnixNano'])
columns['Start Time'].append(pd.to_datetime(span['startTimeUnixNano']))
columns['End Time'].append(pd.to_datetime(span['endTimeUnixNano']))
columns['Index'].append(idx)
columns['Span ID'].append(span['spanId'])
columns['Parent ID'].append(span['parentSpanId'])
columns['Attributes'].append(
[f"<br> {k}: {v}" for (k, v) in span['attributes'].items()])

# if a span name occurs only once, remove the "#1" from its unique name
for (i, name) in enumerate(columns['Name']):
if name_count[name] == 1:
columns['Unique Name'][i] = name

df = pd.DataFrame(columns)

# By default, show all columns in hover text.
# Omit a column by setting false. You can also set special formatting rules here.
hover_data = {col: True for col in columns.keys()}
hover_data['Name'] = False # already shown
hover_data['Unique Name'] = False # already shown
hover_data['End Time'] = False # who cares

fig = px.timeline(
data_frame=df,
x_start='Start Time',
x_end='End Time',
y='Unique Name',
hover_data=hover_data,
# spans with same original name get same color
# TODO: combine name with code.namespace, in case same name used in multiple places
color='Name',
# force ordering, otherwise plotly will group by 'color'
category_orders={'Unique Name': df['Unique Name']},
)

# if there are lots of rows, ensure they're not drawn too small
num_rows = len(spans)
if num_rows > 20:
preferred_total_height = 800
min_row_height = 3
row_height = preferred_total_height / num_rows
row_height = int(max(min_row_height, row_height))
height = num_rows * row_height
# don't show yaxis labels if they're so squished that some are omitted
show_yaxis_labels = row_height >= 15
else:
# otherwise auto-height
height = None
show_yaxis_labels = True

fig.update_layout(
title="All Benchmark Spans",
xaxis_title="Time",
yaxis_title="Span Name",
height=height,
yaxis=dict(
showticklabels=show_yaxis_labels,
),
hovermode='y unified', # show hover if mouse anywhere in row
)

return fig


def _sort_spans_by_hierarchy(spans):
# map from ID to span
id_to_span = {}
# map from parent ID to to child span IDs
parent_to_child_ids = defaultdict(list)
for span in spans:
id = span['spanId']
id_to_span[id] = span

parent_id = span['parentSpanId']
parent_to_child_ids[parent_id].append(id)

# sort spans in depth-first order, by crawling the parent/child tree starting at root
sorted_spans = []
# ids_to_process is FIFO
# With each loop, we pop the last item in ids_to_process
# and then append its children, so that we process them next.
ids_to_process = ['0000000000000000']
while ids_to_process:
id = ids_to_process.pop(-1)
if id in parent_to_child_ids:
child_ids = parent_to_child_ids[id]
# sorted by start time, but reversed because we pop from the BACK of ids_to_process
child_ids = sorted(
child_ids, key=lambda x: id_to_span[x]['startTimeUnixNano'], reverse=True)
ids_to_process.extend(child_ids)

if id in id_to_span:
sorted_spans.append(id_to_span[id])

# warn if any spans are missing
if (num_leftover := len(spans) - len(sorted_spans)):
print(f"WARNING: {num_leftover} spans not shown (missing parents)")

return sorted_spans


# Transform attributes from like:
# [
# {"key": "code.namespace", "value": {"stringValue": "s3_benchrunner_rust::transfer_manager"}},
# {"key": "code.lineno", "value": {"intValue": 136}}
# ]
# To like:
# {
# "code.namespace": "s3_benchrunner_rust::transfer_manager",
# "code.lineno": 136,
# }
def _simplify_attributes(attributes_list):
simple_dict = {}
for attr in attributes_list:
key = attr['key']
# extract actual value, ignoring value's key which looks like "intValue"
value = next(iter(attr['value'].values()))

# trim down long filepaths by omitting everything before "src/"
if key == 'code.filepath':
if (src_idx := value.find("src/")) > 0:
value = value[src_idx:]

simple_dict[key] = value

return simple_dict
61 changes: 44 additions & 17 deletions runners/s3-benchrunner-rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::{Parser, ValueEnum};
use std::process::exit;
use std::time::Instant;
use tracing::{self, info_span, instrument, Instrument};
use tracing::{info_span, Instrument};

use s3_benchrunner_rust::{
bytes_to_gigabits, prepare_run, telemetry, BenchmarkConfig, Result, RunBenchmark,
Expand Down Expand Up @@ -37,16 +37,6 @@ enum S3ClientId {
async fn main() {
let args = Args::parse();

let _telemetry_guard = if args.telemetry {
// If emitting telemetry, set that up as tracing_subscriber.
Some(telemetry::init_tracing_subscriber().unwrap())
} else {
// Otherwise, set the default subscriber,
// which prints to stdout if env-var set like RUST_LOG=trace
tracing_subscriber::fmt::init();
None
};

let result = execute(&args).await;
if let Err(e) = result {
match e.downcast_ref::<SkipBenchmarkError>() {
Expand All @@ -61,31 +51,52 @@ async fn main() {
}
}

#[instrument(name = "main")]
async fn execute(args: &Args) -> Result<()> {
let mut telemetry = if args.telemetry {
// If emitting telemetry, set that up as tracing_subscriber.
Some(telemetry::init_tracing_subscriber().unwrap())
} else {
// Otherwise, set the default subscriber,
// which prints to stdout if env-var set like RUST_LOG=trace
tracing_subscriber::fmt::init();
None
};

// create appropriate benchmark runner
let runner = new_runner(args).await?;

let workload = &runner.config().workload;
let workload_name = workload_name(&args.workload);
let bytes_per_run: u64 = workload.tasks.iter().map(|x| x.size).sum();
let gigabits_per_run = bytes_to_gigabits(bytes_per_run);

// repeat benchmark until we exceed max_repeat_count or max_repeat_secs
let app_start = Instant::now();
for run_i in 0..workload.max_repeat_count {
for run_num in 1..=workload.max_repeat_count {
prepare_run(workload)?;

let run_start = Instant::now();
let run_start_datetime = chrono::Utc::now();
let run_start = Instant::now(); // high resolution

runner
.run()
.instrument(info_span!("run", i = run_i))
.instrument(info_span!("run", num = run_num, workload = workload_name))
.await?;

let run_secs = run_start.elapsed().as_secs_f64();
println!(

// flush any telemetry
if let Some(telemetry) = &mut telemetry {
telemetry.flush_to_file(&trace_file_name(
workload_name,
&run_start_datetime,
run_num,
));
}

eprintln!(
"Run:{} Secs:{:.6} Gb/s:{:.6}",
run_i + 1,
run_num,
run_secs,
gigabits_per_run / run_secs
);
Expand Down Expand Up @@ -114,3 +125,19 @@ async fn new_runner(args: &Args) -> Result<Box<dyn RunBenchmark>> {
}
}
}

// Given "path/to/my-workload.run.json" return "my-workload"
fn workload_name(path: &str) -> &str {
let filename = path.rsplit('/').next().unwrap_or(path);
let without_extension = filename.split('.').next().unwrap_or(filename);
without_extension
}

fn trace_file_name(
workload: &str,
run_start: &chrono::DateTime<chrono::Utc>,
run_num: u32,
) -> String {
let run_start = run_start.format("%Y%m%dT%H%M%SZ").to_string();
format!("trace_{run_start}_{workload}_run{run_num:02}.json")
}
Loading
Loading