Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proper handling of content-read and content-write logs within HTTP Request tracing span #8105

Merged
merged 2 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/includes/tracing/common-spans.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ is used.

Some of these spans `log` to the span. These log events can be (in most cases) configured:

[cols="2,2,2,4", role="flex, sm10"]
[cols="2,2,1,1,8", role="flex, sm10"]
|===
|span name |log name |configurable |enabled by default |description

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.helidon.webserver.observe.tracing;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -62,6 +63,8 @@
*/
@RuntimeType.PrototypedBy(TracingObserverConfig.class)
public class TracingObserver implements Observer, RuntimeType.Api<TracingObserverConfig> {
private static final String CONTENT_READ_SPAN_NAME = "content-read";
private static final String CONTENT_WRITE_SPAN_NAME = "content-write";
private final TracingObserverConfig config;

private TracingObserver(TracingObserverConfig config) {
Expand Down Expand Up @@ -215,13 +218,24 @@ Find configuration of the web server span (can customize name, disable etc.)
context.register(span.context());
context.register(TracingConfig.class, span.context());

/*
Register an input stream filter to handle content read span.
*/
if (spanConfig.logEnabled(CONTENT_READ_SPAN_NAME, true)) {
// Invoked when the input stream is read. Our implementation does tracing, then delegates to the real stream.
req.streamFilter(is -> new TracingStreamInputDelegate(tracer, span, is));

}

/*
Register an output stream filter to correctly handle content write span
*/
res.streamFilter(os -> {
// this is invoked when the user requests output stream, we just replace it with our own delegate
return new TracingStreamDelegate(tracer, span, os);
});
if (spanConfig.logEnabled(CONTENT_WRITE_SPAN_NAME, true)) {
res.streamFilter(os -> {
// this is invoked when the user requests output stream, we just replace it with our own delegate
return new TracingStreamOutputDelegate(tracer, span, os);
});
}

try (Scope ignored = span.activate()) {
span.tag(Tag.COMPONENT.create("helidon-webserver"));
Expand Down Expand Up @@ -273,51 +287,244 @@ private TracingConfig configureTracingConfig(RoutingRequest req, Context context
}
}

private static final class TracingStreamDelegate extends OutputStream {
private static final class TracingStreamDelegate {

private final Tracer tracer;
private final Span requestSpan;
private final OutputStream delegate;

private final String logName;
private boolean started;
private boolean stopped;
private Span span;
private Throwable thrown;

private TracingStreamDelegate(Tracer tracer, Span requestSpan, OutputStream delegate) {


private TracingStreamDelegate(Tracer tracer, Span requestSpan, String logName) {
this.tracer = tracer;
this.requestSpan = requestSpan;
this.logName = logName;
}

void thrown(Throwable t) {
thrown = t;
}

void start() {
if (!started) {
started = true;
span(tracer.spanBuilder(logName)
.parent(requestSpan.context())
.start());
}
}

void stop() {
if (started && !stopped) {
stopped = true;
if (thrown == null) {
span.end();
} else {
span.end(thrown);
}
}
}
void span(Span span) {
this.span = span;
}
}

private static final class TracingStreamInputDelegate extends InputStream {
private final TracingStreamDelegate tracingStream;
private final InputStream delegate;

private TracingStreamInputDelegate(Tracer tracer, Span requestSpan, InputStream delegate) {
this.tracingStream = new TracingStreamDelegate(tracer, requestSpan, CONTENT_READ_SPAN_NAME);
this.delegate = delegate;
}

@Override
public int available() throws IOException {
try {
return delegate.available();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
} finally {
tracingStream.stop();
}
}

@Override
public void close() throws IOException {
try {
delegate.close();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
} finally {
tracingStream.stop();
}
}

@Override
public void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}

@Override
public int read() throws IOException {
tracingStream.start();
try {
return delegate.read();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public int read(byte[] b) throws IOException {
tracingStream.start();
try {
return delegate.read(b);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
tracingStream.start();
try {
return delegate.read(b, off, len);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public byte[] readAllBytes() throws IOException {
tracingStream.start();
try {
return delegate.readAllBytes();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public byte[] readNBytes(int len) throws IOException {
tracingStream.start();
try {
return delegate.readNBytes(len);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public int readNBytes(byte[] b, int off, int len) throws IOException {
tracingStream.start();
try {
return delegate.readNBytes(b, off, len);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public void reset() throws IOException {
tracingStream.start();
try {
delegate.reset();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public long skip(long n) throws IOException {
tracingStream.start();
try {
return delegate.skip(n);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public void skipNBytes(long n) throws IOException {
tracingStream.start();
try {
delegate.skipNBytes(n);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public long transferTo(OutputStream out) throws IOException {
tracingStream.start();
try {
return delegate.transferTo(out);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}
}

private static final class TracingStreamOutputDelegate extends OutputStream {
private final TracingStreamDelegate tracingStream;
private final OutputStream delegate;

private TracingStreamOutputDelegate(Tracer tracer, Span requestSpan, OutputStream delegate) {
this.tracingStream = new TracingStreamDelegate(tracer, requestSpan, CONTENT_WRITE_SPAN_NAME);
this.delegate = delegate;
}

@Override
public void write(int b) throws IOException {
start();
tracingStream.start();
try {
this.delegate.write(b);
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}

@Override
public void write(byte[] b) throws IOException {
start();
tracingStream.start();
try {
this.delegate.write(b);
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
start();
tracingStream.start();
try {
this.delegate.write(b, off, len);
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}
Expand All @@ -327,7 +534,7 @@ public void flush() throws IOException {
try {
this.delegate.flush();
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}
Expand All @@ -337,32 +544,14 @@ public void close() throws IOException {
try {
this.delegate.close();
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
} finally {
stop();
tracingStream.stop();
}
}

private void start() {
if (!started) {
started = true;
span = tracer.spanBuilder("content-write")
.parent(requestSpan.context())
.start();
}
}

private void stop() {
if (started && !stopped) {
stopped = true;
if (thrown == null) {
span.end();
} else {
span.end(thrown);
}
}
}
}

private static class HeaderProviderImpl implements HeaderProvider {
Expand Down