Skip to content

Commit

Permalink
BlockingStreamingHttpService: drop trailers if users didn't create …
Browse files Browse the repository at this point in the history
…any (#3151)

Motivation:

Behavior was discovered as part of debugging #3148.
Currently, `BlockingStreamingHttpService` allocates trailers upfront and concatenates them after the payload body when protocol allows. In result, aggregation of `BlockingStreamingHttpResponse` does not lead to a single HTTP/2 frame because we always expect to receive trailers. However, adding trailers after users close payload writer is race. There are no guarantees that new trailers will be written to the network after close. Therefore, we can inspect the state after close and decide if we need to append trailers or not.

Modifications:

- Use `scanWithMapper` inside `BlockingStreamingHttpService` instead of always concatenating payload body with trailers;
- Create trailers on demand inside `BufferHttpPayloadWriter`, only when users touch them;
- Clarify behavior in `HttpPayloadWriter`'s javadoc for trailers-related methods;
- Enhance `BlockingStreamingToStreamingServiceTest` to assert new expectations;

Result:

In `BlockingStreamingHttpService`, trailers are allocated and attached only if users touch trailers before closing `HttpPayloadWriter`.
  • Loading branch information
idelpivnitskiy authored Dec 23, 2024
1 parent d4bb93f commit f570fdb
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.ScanMapper;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
Expand Down Expand Up @@ -76,7 +79,7 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
// still be propagated.
final Processor exceptionProcessor = newCompletableProcessor();
final BufferHttpPayloadWriter payloadWriter = new BufferHttpPayloadWriter(
ctx.headersFactory().newTrailers());
() -> ctx.headersFactory().newTrailers());
DefaultBlockingStreamingHttpServerResponse response = null;
try {
final Consumer<DefaultHttpResponseMetaData> sendMeta = (metaData) -> {
Expand All @@ -102,7 +105,7 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
Publisher<Object> messageBody = fromSource(exceptionProcessor)
.merge(payloadWriter.connect());
if (addTrailers) {
messageBody = messageBody.concat(succeeded(payloadWriter.trailers()));
messageBody = messageBody.scanWithMapper(() -> new TrailersMapper(payloadWriter));
}
messageBody = messageBody.beforeSubscription(() -> new Subscription() {
@Override
Expand Down Expand Up @@ -169,10 +172,12 @@ public Completable closeAsyncGracefully() {

private static final class BufferHttpPayloadWriter implements HttpPayloadWriter<Buffer> {
private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter<>();
private final HttpHeaders trailers;
private final Supplier<HttpHeaders> trailersFactory;
@Nullable
private HttpHeaders trailers;

BufferHttpPayloadWriter(final HttpHeaders trailers) {
this.trailers = trailers;
BufferHttpPayloadWriter(final Supplier<HttpHeaders> trailersFactory) {
this.trailersFactory = trailersFactory;
}

@Override
Expand All @@ -197,11 +202,61 @@ public void close(final Throwable cause) throws IOException {

@Override
public HttpHeaders trailers() {
if (trailers == null) {
trailers = trailersFactory.get();
}
return trailers;
}

@Nullable
HttpHeaders trailers0() {
return trailers;
}

Publisher<Buffer> connect() {
return payloadWriter.connect();
}
}

private static final class TrailersMapper implements ScanMapper<Object, Object>, ScanMapper.MappedTerminal<Object> {
private final BufferHttpPayloadWriter payloadWriter;

TrailersMapper(final BufferHttpPayloadWriter payloadWriter) {
this.payloadWriter = payloadWriter;
}

@Nullable
@Override
public Object mapOnNext(@Nullable final Object next) {
return next;
}

@Nullable
@Override
public MappedTerminal<Object> mapOnError(final Throwable cause) {
return null;
}

@Override
public MappedTerminal<Object> mapOnComplete() {
return this;
}

@Nullable
@Override
public Object onNext() {
return payloadWriter.trailers0();
}

@Override
public boolean onNextValid() {
return payloadWriter.trailers0() != null;
}

@Nullable
@Override
public Throwable terminal() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public <T> HttpRequest payloadBody(final T pojo, final HttpSerializer2<T> serial
public HttpHeaders trailers() {
if (trailers == null) {
trailers = original.payloadHolder().headersFactory().newTrailers();
original.transform(this);
original.transform(this); // Invoke "transform" to set PayloadInfo.mayHaveTrailers() flag
}
return trailers;
}
Expand Down Expand Up @@ -277,6 +277,9 @@ public StreamingHttpRequest toStreamingRequest() {
@Nullable
final Publisher<Object> payload;
if (trailers != null) {
// We can not drop empty Trailers here bcz users could do type conversion intermediately, while still
// referencing the original HttpHeaders object from an aggregated type and keep using it to add trailers
// before sending the message or converting it back to an aggregated one.
payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers);
} else {
payload = emptyPayloadBody ? null : from(payloadBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public StreamingHttpResponse toStreamingResponse() {
@Nullable
final Publisher<Object> payload;
if (trailers != null) {
// We can not drop empty Trailers here bcz users could do type conversion intermediately, while still
// referencing the original HttpHeaders object from an aggregated type and keep using it to add trailers
// before sending the message or converting it back to an aggregated one.
payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers);
} else {
payload = emptyPayloadBody ? null : from(payloadBody);
Expand All @@ -127,7 +130,7 @@ public BlockingStreamingHttpResponse toBlockingStreamingResponse() {
public HttpHeaders trailers() {
if (trailers == null) {
trailers = original.payloadHolder().headersFactory().newTrailers();
original.transform(this);
original.transform(this); // Invoke "transform" to set PayloadInfo.mayHaveTrailers() flag
}
return trailers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ static Single<PayloadAndTrailers> aggregatePayloadAndTrailers(final DefaultPaylo
if (isAlwaysEmpty(pair.payload)) {
payloadInfo.setEmpty(true);
}
// We can not drop empty Trailers here bcz users could do type conversion intermediately multiple times,
// while still referencing the original HttpHeaders object from an aggregated type and keep using it to add
// trailers before sending the message or converting it back to an aggregated one.
if (pair.trailers == null) {
payloadInfo.setMayHaveTrailersAndGenericTypeBuffer(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,42 @@
*/
public interface HttpPayloadWriter<T> extends PayloadWriter<T>, TrailersHolder {

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
HttpHeaders trailers();

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> addTrailer(final CharSequence name, final CharSequence value) {
TrailersHolder.super.addTrailer(name, value);
return this;
}

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> addTrailers(final HttpHeaders trailers) {
TrailersHolder.super.addTrailers(trailers);
return this;
}

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> setTrailer(final CharSequence name, final CharSequence value) {
TrailersHolder.super.setTrailer(name, value);
return this;
}

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> setTrailers(final HttpHeaders trailers) {
TrailersHolder.super.setTrailers(trailers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorExtension;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.oio.api.PayloadWriter;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand Down Expand Up @@ -63,8 +65,10 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.function.Function.identity;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -78,7 +82,8 @@ class BlockingStreamingToStreamingServiceTest {
private static final String HELLO_WORLD = "Hello\nWorld\n";

@RegisterExtension
final ExecutorExtension<Executor> executorExtension = ExecutorExtension.withCachedExecutor();
static final ExecutorExtension<Executor> executorExtension = ExecutorExtension.withCachedExecutor()
.setClassLevel(true);

@Mock
private HttpExecutionContext mockExecutionCtx;
Expand All @@ -93,14 +98,26 @@ void setup() {
mockCtx = new TestHttpServiceContext(DefaultHttpHeadersFactory.INSTANCE, reqRespFactory, mockExecutionCtx);
}

@Test
void defaultResponseStatusNoPayload() throws Exception {
BlockingStreamingHttpService syncService = (ctx, request, response) -> response.sendMetaData().close();
@ParameterizedTest(name = "{displayName} [{index}] withEmptyTrailers={0}")
@ValueSource(booleans = {false, true})
void defaultResponseStatusNoPayload(boolean withEmptyTrailers) throws Exception {
BlockingStreamingHttpService syncService = (ctx, request, response) -> {
HttpPayloadWriter<Buffer> writer = response.sendMetaData();
if (withEmptyTrailers) {
writer.trailers(); // accessing trailers before close should preserve trailers in message body
}
writer.close();
writer.trailers(); // accessing trailers after close should not modify output
};

List<Object> response = invokeService(syncService, reqRespFactory.get("/"));
assertMetaData(OK, response);
assertPayloadBody("", response, false);
assertEmptyTrailers(response);
if (withEmptyTrailers) {
assertEmptyTrailers(response);
} else {
assertNoTrailers(response);
}
}

@Test
Expand All @@ -111,7 +128,7 @@ void customResponseStatusNoPayload() throws Exception {
List<Object> response = invokeService(syncService, reqRespFactory.get("/"));
assertMetaData(NO_CONTENT, response);
assertPayloadBody("", response, false);
assertEmptyTrailers(response);
assertNoTrailers(response);
}

@Test
Expand All @@ -127,24 +144,34 @@ void receivePayloadBody() throws Exception {
.payloadBody(from("Hello\n", "World\n"), appSerializerUtf8FixLen()));
assertMetaData(OK, response);
assertPayloadBody("", response, true);
assertEmptyTrailers(response);
assertNoTrailers(response);

assertThat(receivedPayload.toString(), is(HELLO_WORLD));
}

@Test
void respondWithPayloadBody() throws Exception {
@ParameterizedTest(name = "{displayName} [{index}] withEmptyTrailers={0}")
@ValueSource(booleans = {false, true})
void respondWithPayloadBody(boolean withEmptyTrailers) throws Exception {
BlockingStreamingHttpService syncService = (ctx, request, response) -> {
try (PayloadWriter<Buffer> pw = response.sendMetaData()) {
pw.write(ctx.executionContext().bufferAllocator().fromAscii("Hello\n"));
pw.write(ctx.executionContext().bufferAllocator().fromAscii("World\n"));
BufferAllocator alloc = ctx.executionContext().bufferAllocator();
HttpPayloadWriter<Buffer> writer = response.sendMetaData();
writer.write(alloc.fromAscii("Hello\n"));
if (withEmptyTrailers) {
writer.trailers(); // accessing trailers before close should preserve trailers in message body
}
writer.write(alloc.fromAscii("World\n"));
writer.close();
writer.trailers(); // accessing trailers after close should not modify output
};

List<Object> response = invokeService(syncService, reqRespFactory.get("/"));
assertMetaData(OK, response);
assertPayloadBody(HELLO_WORLD, response, false);
assertEmptyTrailers(response);
if (withEmptyTrailers) {
assertEmptyTrailers(response);
} else {
assertNoTrailers(response);
}
}

@Test
Expand Down Expand Up @@ -533,6 +560,10 @@ private static void assertPayloadBody(String expectedPayloadBody, List<Object> r
assertThat(payloadBody, is(expectedPayloadBody));
}

private static void assertNoTrailers(List<Object> response) {
assertThat(response, not(containsInAnyOrder(instanceOf(HttpHeaders.class))));
}

private static void assertEmptyTrailers(List<Object> response) {
HttpHeaders trailers = (HttpHeaders) response.get(response.size() - 1);
assertThat(trailers, is(notNullValue()));
Expand Down

0 comments on commit f570fdb

Please sign in to comment.