Skip to content

Commit

Permalink
[Streaming Indexing] Enhance RestAction with request / response strea…
Browse files Browse the repository at this point in the history
…ming support

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed May 27, 2024
1 parent 09c2c7d commit b809a35
Show file tree
Hide file tree
Showing 20 changed files with 804 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.core.action.ActionListener;

import io.netty.handler.codec.http.HttpContent;
import org.reactivestreams.Publisher;

/**
* The generic interface for chunked {@link HttpContent} producers (response streaming).
*/
interface HttpContentSender extends Publisher<HttpContent> {
void send(HttpContent content, ActionListener<Void> listener, boolean isLast);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.buffer.ByteBuf;

class ReactorNetty4HttpChunk implements HttpChunk {
private final AtomicBoolean released;
private final boolean pooled;
private final ByteBuf content;
private final boolean last;

ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
this(new AtomicBoolean(false), true, content, last);
}

private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) {
this.content = content;
this.pooled = pooled;
this.released = released;
this.last = last;
}

@Override
public BytesReference content() {
assert released.get() == false;
return Netty4Utils.toBytesReference(content);
}

@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
content.release();
}
}

@Override
public boolean isLast() {
return last;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.reactor.SharedGroupFactory;
Expand All @@ -40,6 +42,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -351,24 +354,46 @@ public List<String> protocols() {
* @return response publisher
*/
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
final NonStreamingRequestConsumer<HttpContent> consumer = new NonStreamingRequestConsumer<>(
this,
request,
response,
maxCompositeBufferComponents
final Method method = HttpConversionUtil.convertMethod(request.method());
final Optional<RestHandler> dispatchHandlerOpt = dispatcher.dispatchHandler(
request.uri(),
request.fullPath(),
method,
request.params()
);
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
this,
request,
response
);

request.receiveContent()
.switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT));
consumer.start();

return response.sendObject(consumer);
} else {
final ReactorNetty4NonStreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4NonStreamingRequestConsumer<>(
this,
request,
response,
maxCompositeBufferComponents
);

request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);

return Mono.from(consumer).flatMap(hc -> {
final FullHttpResponse r = (FullHttpResponse) hc;
response.status(r.status());
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
response.chunkedTransfer(false);
response.compression(true);
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
return Mono.from(response.sendObject(r.content()));
});
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);

return Mono.from(consumer).flatMap(hc -> {
final FullHttpResponse r = (FullHttpResponse) hc;
response.status(r.status());
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
response.chunkedTransfer(false);
response.compression(true);
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
return Mono.from(response.sendObject(r.content()));
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class NonStreamingHttpChannel implements HttpChannel {
class ReactorNetty4NonStreamingHttpChannel implements HttpChannel {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final FluxSink<HttpContent> emitter;

NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
this.request = request;
this.response = response;
this.emitter = emitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompositeByteBuf content;
Expand All @@ -34,7 +34,7 @@ class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>,
private final AtomicBoolean disposed = new AtomicBoolean(false);
private volatile FluxSink<HttpContent> emitter;

NonStreamingRequestConsumer(
ReactorNetty4NonStreamingRequestConsumer(
AbstractHttpServerTransport transport,
HttpServerRequest request,
HttpServerResponse response,
Expand Down Expand Up @@ -64,12 +64,12 @@ public void accept(T message) {
}
}

public void process(HttpContent in, FluxSink<HttpContent> emitter) {
void process(HttpContent in, FluxSink<HttpContent> emitter) {
// Consume request body in full before dispatching it
content.addComponent(true, in.content().retain());

if (in instanceof LastHttpContent) {
final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter);
final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter);
final HttpRequest r = createRequest(request, content);

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.common.concurrent.CompletableContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpResponse;
import org.opensearch.http.StreamingHttpChannel;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final Publisher<HttpChunk> receiver;
private final HttpContentSender sender;
private volatile FluxSink<HttpChunk> producer;
private volatile boolean lastChunkReceived = false;

ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) {
this.request = request;
this.response = response;
this.sender = sender;
this.receiver = Flux.create(producer -> this.producer = producer);
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext));
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() {
request.withConnection(connection -> connection.channel().close());
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) {
sender.send(createContent(chunk), listener, chunk.isLast());
}

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
sender.send(createContent(response), listener, true);
}

@Override
public void prepareResponse(int status, Map<String, List<String>> headers) {
this.response.status(status);
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
}

@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) response.remoteAddress();
}

@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) response.hostAddress();
}

@Override
public void receiveChunk(HttpChunk message) {
if (lastChunkReceived) {
return;
}

producer.next(message);
if (message.isLast()) {
lastChunkReceived = true;
producer.complete();
}
}

@Override
public void subscribe(Subscriber<? super HttpChunk> subscriber) {
receiver.subscribe(subscriber);
}

private static HttpContent createContent(HttpResponse response) {
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response;
return new DefaultHttpContent(fullHttpResponse.content());
}

private static HttpContent createContent(HttpChunk chunk) {
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.http.AbstractHttpServerTransport;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpRequest;
import org.opensearch.http.StreamingHttpChannel;

import java.util.function.Consumer;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent> {
private final AbstractHttpServerTransport transport;
private final HttpServerRequest request;
private final HttpContentSender sender;
private final StreamingHttpChannel httpChannel;

ReactorNetty4StreamingRequestConsumer(AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response) {
this.transport = transport;
this.request = request;
this.sender = new ReactorNetty4StreamingResponseProducer();
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
}

@Override
public void accept(T message) {
if (message instanceof LastHttpContent) {
httpChannel.receiveChunk(createChunk(message, true));
} else if (message instanceof HttpContent) {
httpChannel.receiveChunk(createChunk(message, false));
}
}

@Override
public void subscribe(Subscriber<? super HttpContent> s) {
sender.subscribe(s);
}

void start() {
transport.incomingStream(createRequest(request), httpChannel);
}

HttpRequest createRequest(HttpServerRequest request) {
return new ReactorNetty4HttpRequest(request, Unpooled.EMPTY_BUFFER);
}

HttpChunk createChunk(HttpContent chunk, boolean last) {
return new ReactorNetty4HttpChunk(chunk.content(), last);
}
}
Loading

0 comments on commit b809a35

Please sign in to comment.