-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
…ming support Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.core.action.ActionListener; | ||
|
||
import io.netty.handler.codec.http.HttpContent; | ||
import org.reactivestreams.Publisher; | ||
|
||
/** | ||
* The generic interface for chunked {@link HttpContent} producers (response streaming). | ||
*/ | ||
interface HttpContentSender extends Publisher<HttpContent> { | ||
/** | ||
* Sends the next {@link HttpContent} over the wire | ||
* @param content next {@link HttpContent} | ||
* @param listener action listener | ||
* @param isLast {@code true} if this is the last chunk, {@code false} otherwise | ||
*/ | ||
void send(HttpContent content, ActionListener<Void> listener, boolean isLast); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.http.HttpChunk; | ||
import org.opensearch.transport.reactor.netty4.Netty4Utils; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
|
||
class ReactorNetty4HttpChunk implements HttpChunk { | ||
Check warning on line 19 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L19
|
||
private final AtomicBoolean released; | ||
private final boolean pooled; | ||
private final ByteBuf content; | ||
private final boolean last; | ||
|
||
ReactorNetty4HttpChunk(ByteBuf content, boolean last) { | ||
this(new AtomicBoolean(false), true, content, last); | ||
} | ||
Check warning on line 27 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L26-L27
|
||
|
||
private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) { | ||
this.content = content; | ||
this.pooled = pooled; | ||
this.released = released; | ||
this.last = last; | ||
} | ||
Check warning on line 34 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L29-L34
|
||
|
||
@Override | ||
public BytesReference content() { | ||
assert released.get() == false; | ||
return Netty4Utils.toBytesReference(content); | ||
Check warning on line 39 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L39
|
||
} | ||
|
||
@Override | ||
public void release() { | ||
if (pooled && released.compareAndSet(false, true)) { | ||
content.release(); | ||
Check warning on line 45 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L45
|
||
} | ||
} | ||
Check warning on line 47 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L47
|
||
|
||
@Override | ||
public boolean isLast() { | ||
return last; | ||
Check warning on line 51 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L51
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.common.concurrent.CompletableContext; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.http.HttpChunk; | ||
import org.opensearch.http.HttpResponse; | ||
import org.opensearch.http.StreamingHttpChannel; | ||
import org.opensearch.transport.reactor.netty4.Netty4Utils; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import io.netty.buffer.Unpooled; | ||
import io.netty.handler.codec.http.DefaultHttpContent; | ||
import io.netty.handler.codec.http.FullHttpResponse; | ||
import io.netty.handler.codec.http.HttpContent; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.FluxSink; | ||
import reactor.netty.http.server.HttpServerRequest; | ||
import reactor.netty.http.server.HttpServerResponse; | ||
|
||
class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel { | ||
private final HttpServerRequest request; | ||
private final HttpServerResponse response; | ||
private final CompletableContext<Void> closeContext = new CompletableContext<>(); | ||
Check warning on line 37 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L37
|
||
private final Publisher<HttpChunk> receiver; | ||
private final HttpContentSender sender; | ||
private volatile FluxSink<HttpChunk> producer; | ||
private volatile boolean lastChunkReceived = false; | ||
Check warning on line 41 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L41
|
||
|
||
ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) { | ||
this.request = request; | ||
this.response = response; | ||
this.sender = sender; | ||
this.receiver = Flux.create(producer -> this.producer = producer); | ||
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); | ||
} | ||
Check warning on line 49 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L43-L49
|
||
|
||
@Override | ||
public boolean isOpen() { | ||
return true; | ||
Check warning on line 53 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L53
|
||
} | ||
|
||
@Override | ||
public void close() { | ||
request.withConnection(connection -> connection.channel().close()); | ||
} | ||
Check warning on line 59 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L58-L59
|
||
|
||
@Override | ||
public void addCloseListener(ActionListener<Void> listener) { | ||
closeContext.addListener(ActionListener.toBiConsumer(listener)); | ||
} | ||
Check warning on line 64 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L63-L64
|
||
|
||
@Override | ||
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) { | ||
sender.send(createContent(chunk), listener, chunk.isLast()); | ||
} | ||
Check warning on line 69 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L68-L69
|
||
|
||
@Override | ||
public void sendResponse(HttpResponse response, ActionListener<Void> listener) { | ||
sender.send(createContent(response), listener, true); | ||
} | ||
Check warning on line 74 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L73-L74
|
||
|
||
@Override | ||
public void prepareResponse(int status, Map<String, List<String>> headers) { | ||
this.response.status(status); | ||
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v))); | ||
} | ||
Check warning on line 80 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L78-L80
|
||
|
||
@Override | ||
public InetSocketAddress getRemoteAddress() { | ||
return (InetSocketAddress) response.remoteAddress(); | ||
Check warning on line 84 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L84
|
||
} | ||
|
||
@Override | ||
public InetSocketAddress getLocalAddress() { | ||
return (InetSocketAddress) response.hostAddress(); | ||
Check warning on line 89 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L89
|
||
} | ||
|
||
@Override | ||
public void receiveChunk(HttpChunk message) { | ||
if (lastChunkReceived) { | ||
return; | ||
Check warning on line 95 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L95
|
||
} | ||
|
||
producer.next(message); | ||
Check warning on line 98 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L98
|
||
if (message.isLast()) { | ||
lastChunkReceived = true; | ||
producer.complete(); | ||
Check warning on line 101 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L100-L101
|
||
} | ||
} | ||
Check warning on line 103 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L103
|
||
|
||
@Override | ||
public void subscribe(Subscriber<? super HttpChunk> subscriber) { | ||
receiver.subscribe(subscriber); | ||
} | ||
Check warning on line 108 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L107-L108
|
||
|
||
private static HttpContent createContent(HttpResponse response) { | ||
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response; | ||
return new DefaultHttpContent(fullHttpResponse.content()); | ||
Check warning on line 112 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L111-L112
|
||
} | ||
|
||
private static HttpContent createContent(HttpChunk chunk) { | ||
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content()))); | ||
Check warning on line 116 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java Codecov / codecov/patchplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L116
|
||
} | ||
} |