Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ public final Publisher<T> onCompleteError(final Supplier<? extends Throwable> er
public final Publisher<T> onErrorReturn(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends T> itemSupplier) {
requireNonNull(itemSupplier);
return onErrorResume(predicate, t -> Publisher.from(itemSupplier.apply(t)));
return onErrorResume(predicate, t -> Publisher.from(itemSupplier.apply(t)).shareContextOnSubscribe());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public final <E extends Throwable> Single<T> onErrorReturn(
public final Single<T> onErrorReturn(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends T> itemSupplier) {
requireNonNull(itemSupplier);
return onErrorResume(predicate, t -> succeeded(itemSupplier.apply(t)));
return onErrorResume(predicate, t -> succeeded(itemSupplier.apply(t)).shareContextOnSubscribe());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static void main(String[] args) throws Exception {
.whenOnSuccess(resp -> {
System.out.println(resp.toString((name, value) -> value));
System.out.println(resp.payloadBody(textSerializerUtf8()));
})
}).repeat(i -> i <= 3)
// This example is demonstrating asynchronous execution, but needs to prevent the main thread from exiting
// before the response has been processed. This isn't typical usage for an asynchronous API but is useful
// for demonstration purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
public final class HelloWorldStreamingClient {
public static void main(String[] args) throws Exception {
try (StreamingHttpClient client = HttpClients.forSingleAddress("localhost", 8080).buildStreaming()) {
client.request(client.get("/sayHello"))
.beforeOnSuccess(response -> System.out.println(response.toString((name, value) -> value)))
.flatMapPublisher(resp -> resp.payloadBody(appSerializerUtf8FixLen()))
.whenOnNext(System.out::println)
// This example is demonstrating asynchronous execution, but needs to prevent the main thread from exiting
// before the response has been processed. This isn't typical usage for an asynchronous API but is useful
// for demonstration purposes.
.toFuture().get();
for (int i = 0; i < 3; i++) {

client.request(client.get("/sayHello"))
.beforeOnSuccess(response -> System.out.println(response.toString((name, value) -> value)))
.flatMapPublisher(resp -> resp.payloadBody(appSerializerUtf8FixLen()))
.whenOnNext(System.out::println)
// This example is demonstrating asynchronous execution, but needs to prevent the main thread from exiting
// before the response has been processed. This isn't typical usage for an asynchronous API but is useful
// for demonstration purposes.
.toFuture().get();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.context.api.ContextMap;
Expand Down Expand Up @@ -79,6 +80,7 @@ final Single<StreamingHttpResponse> trackLifecycle(@Nullable final ConnectionInf
final Function<StreamingHttpRequest, Single<StreamingHttpResponse>> responseFunction) {

return defer(() -> {
System.err.println("--- " + Thread.currentThread().getName() + " --- trackLifecycle.defer: " + AsyncContext.context());
final HttpExchangeObserver onExchange = safeReport(observer::onNewExchange, observer, "onNewExchange",
NoopHttpExchangeObserver.INSTANCE);
final Runnable clearContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.servicetalk.http.netty;

import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Single;
Expand Down Expand Up @@ -346,7 +347,7 @@ private List<StreamingHttpServiceFilterFactory> initNonOffloadsServiceFilters(
@Nullable final HttpLifecycleObserver lifecycleObserver) {
final List<StreamingHttpServiceFilterFactory> filters = new ArrayList<>();
// Append internal filters:
appendNonOffloadingServiceFilter(filters, ClearAsyncContextHttpServiceFilter.INSTANCE);
// appendNonOffloadingServiceFilter(filters, ClearAsyncContextHttpServiceFilter.INSTANCE);
if (lifecycleObserver != null) {
appendNonOffloadingServiceFilter(filters, new HttpLifecycleObserverServiceFilter(lifecycleObserver));
}
Expand Down Expand Up @@ -545,6 +546,7 @@ public StreamingHttpServiceFilter create(final StreamingHttpService service) {
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
System.err.println("--- " + Thread.currentThread().getName() + " --- KeepAliveServiceFilter: " + AsyncContext.context());
final HttpKeepAlive keepAlive = HttpKeepAlive.responseKeepAlive(request);
// Don't expect any exceptions from delegate because it's already wrapped with
// ExceptionMapperServiceFilterFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.CompletableSource.Processor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Processors;
Expand Down Expand Up @@ -325,6 +326,9 @@ public FlushStrategy defaultFlushStrategy() {
private Completable handleRequestAndWriteResponse(final Single<StreamingHttpRequest> requestSingle,
final boolean handleMultipleRequests) {
final Completable exchange = requestSingle.flatMapCompletable(rawRequest -> {
System.err.println("--- " + Thread.currentThread().getName() + " --- handleRequestAndWriteResponse-beforeClean: " + AsyncContext.context());
AsyncContext.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd get a whole new instances of AsyncContext.context() every loop, just in case the reference leaks outside of a single request chain.

Copy link
Member Author

@idelpivnitskiy idelpivnitskiy Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be guaranteed by a new defer applied before repeat. The clear() here is only a safeguard to make sure whatever new copy of the context is here, it should be empty.

System.err.println("--- " + Thread.currentThread().getName() + " --- handleRequestAndWriteResponse: " + AsyncContext.context());
// We transform the request and delay the completion of the result flattened stream to avoid
// resubscribing to the NettyChannelPublisher before the previous subscriber has terminated. Otherwise
// we may attempt to do duplicate subscribe on NettyChannelPublisher, which will result in a connection
Expand Down Expand Up @@ -393,6 +397,7 @@ public void onComplete() {
// HttpExceptionMapperServiceFilter.
service.handle(this, request, streamingResponseFactory())
.flatMapPublisher(response -> {
System.err.println("--- " + Thread.currentThread().getName() + " --- service.handle.flatMapPublisher: " + AsyncContext.context());
if (responseSent != null) {
// While concurrency between 100 (Continue) and the final response is handled in Netty
// encoders, it's necessary to prevent generating 100 (Continue) response after the full
Expand All @@ -410,21 +415,32 @@ public void onComplete() {
return (resetFlushStrategy == null ? pub : pub.beforeFinally(resetFlushStrategy::cancel))
// No need to make a copy of the context while consuming response message body.
.shareContextOnSubscribe();
// There is no need to call shareContextOnSubscribe() at the end of the `write` Publisher here
// because `connection.write(...)` will do it for us internally after applying FlushStrategy.
}));

if (drainRequestPayloadBody) {
return responseWrite.concat(defer(() -> (payloadSubscribed.get() ?
// Discarding the request payload body is an operation which should not impact the state of
// request/response processing. It's appropriate to recover from any error here.
// ST may introduce RejectedSubscribeError if user already consumed the request payload body
requestCompletion : request.messageBody().ignoreElements().onErrorComplete())
// No need to make a copy of the context in both cases.
.shareContextOnSubscribe()));
return responseWrite.concat(defer(() -> {
System.err.println("--- " + Thread.currentThread().getName() + " --- payloadSubscribed.defer: " + AsyncContext.context());
return (payloadSubscribed.get() ?
// Discarding the request payload body is an operation which should not impact the state of
// request/response processing. It's appropriate to recover from any error here.
// ST may introduce RejectedSubscribeError if user already consumed the request payload body
requestCompletion : request.messageBody().ignoreElements()
.whenOnComplete(() -> System.err.println("--- " + Thread.currentThread().getName() + " --- ignoreElements.whenOnComplete: " + AsyncContext.context()))
.onErrorComplete())
// No need to make a copy of the context in both cases.
.shareContextOnSubscribe();
// We need to apply shareContextOnSubscribe() on deferred Completable to share context between
// concatenated Completables.
}).shareContextOnSubscribe()).shareContextOnSubscribe();
} else {
return responseWrite.concat(requestCompletion);
return responseWrite.concat(requestCompletion).shareContextOnSubscribe();
}
});
return handleMultipleRequests ? exchange.repeat(__ -> true).ignoreElements() : exchange;
// In case of repetition, we have to wrap each `exchange` in defer to guarantee an isolated AsyncContext
// state between subsequent requests on the same HTTP/1.x connection.
return handleMultipleRequests ? defer(() -> exchange).repeat(__ -> true).ignoreElements() : exchange;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

import javax.annotation.Nullable;

abstract class AbstractOpenTelemetryFilter implements HttpExecutionStrategyInfluencer {
static final OpenTelemetryOptions DEFAULT_OPTIONS = new OpenTelemetryOptions.Builder().build();
static final String INSTRUMENTATION_SCOPE_NAME = "io.servicetalk";
Expand All @@ -40,25 +43,40 @@ public final HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}

static Single<StreamingHttpResponse> withContext(Single<StreamingHttpResponse> responseSingle, Context context) {
static Single<StreamingHttpResponse> withContext(Single<StreamingHttpResponse> responseSingle, Context context,
@Nullable StreamingHttpRequest request,
@Nullable ScopeTrackerV2 trackerV2) {
return new SubscribableSingle<StreamingHttpResponse>() {
@Override
protected void handleSubscribe(SingleSource.Subscriber<? super StreamingHttpResponse> subscriber) {
try (Scope ignored = context.makeCurrent()) {
SourceAdapters.toSource(responseSingle.map(resp ->
resp.transformMessageBody(body -> transformBody(body, context))))
SourceAdapters.toSource(responseSingle
.map(resp -> resp.transformMessageBody(body -> {
Publisher<?> publisher = transformBody(body, context);
if (request != null && trackerV2 != null) {
// This should not be race because if request body is already subscribed,
// we don't need this `transformBody`, but if it will be subscribed later
// (auto-draining), then it's not racy to apply a transformation here.
// TODO: We should also handle `beforeOnError` or maybe even `beforeFinally`
publisher = publisher.beforeOnComplete(() -> request
.transformMessageBody(b -> transformBody(b, context)
.beforeFinally(trackerV2::requestComplete)));
}
return publisher;
}))
.shareContextOnSubscribe())
.subscribe(subscriber);
}
}
};
}

private static <T> Publisher<T> transformBody(Publisher<T> body, Context context) {
protected static <T> Publisher<T> transformBody(Publisher<T> body, Context context) {
return new SubscribablePublisher<T>() {
@Override
protected void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber) {
try (Scope ignored = context.makeCurrent()) {
SourceAdapters.toSource(body).subscribe(subscriber);
SourceAdapters.toSource(body.shareContextOnSubscribe()).subscribe(subscriber);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private Single<StreamingHttpResponse> trackRequest(final StreamingHttpRequester
final ScopeTracker tracker = new ScopeTracker(context, request, instrumenter);
try {
Single<StreamingHttpResponse> response = delegate.request(request);
return withContext(tracker.track(response), context);
return withContext(tracker.track(response), context, null, null);
} catch (Throwable t) {
tracker.onError(t);
return Single.failed(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,11 @@ private Single<StreamingHttpResponse> trackRequest(final StreamingHttpService de
}
final Context context = instrumenter.start(parentContext, request);
try (Scope unused = context.makeCurrent()) {
final ScopeTracker tracker = new ScopeTracker(context, request, instrumenter);
System.err.println("Initiated context: " + Span.current().getSpanContext());
final ScopeTrackerV2 tracker = new ScopeTrackerV2(context, request, instrumenter);
try {
Single<StreamingHttpResponse> response = delegate.handle(ctx, request, responseFactory);
return withContext(tracker.track(response), context);
return withContext(tracker.track(response), context, request, tracker);
} catch (Throwable t) {
tracker.onError(t);
return Single.failed(t);
Expand Down
Loading
Loading