From 454ec5695fd6880a78868c6de6183e90be440aff Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 1 Oct 2024 23:33:51 +0000 Subject: [PATCH] [BUG] Streaming bulk request hangs (#16158) Signed-off-by: Andriy Redko (cherry picked from commit 1ee858fc8cc631d35c4a48b2a53d4ab6009e2c2b) Signed-off-by: github-actions[bot] --- CHANGELOG.md | 1 + .../rest/ReactorNetty4StreamingIT.java | 28 +++++++++++++++++++ .../org/opensearch/rest/RestController.java | 2 +- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27bb1133f2871..045d4ec169647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Avoid infinite loop when `flat_object` field contains invalid token ([#15985](https://github.com/opensearch-project/OpenSearch/pull/15985)) - Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931)) - Fix race condition in node-join and node-left ([#15521](https://github.com/opensearch-project/OpenSearch/pull/15521)) +- Streaming bulk request hangs ([#16158](https://github.com/opensearch-project/OpenSearch/pull/16158)) ### Security diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java index 6f3895fffa437..1b60023da0329 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Locale; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -297,4 +298,31 @@ public void testStreamingBadStream() throws IOException { assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); assertThat(streamingResponse.getWarnings(), empty()); } + + public void testStreamingLargeDocument() throws IOException { + final Stream stream = Stream.of( + String.format( + Locale.getDefault(), + "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n", + randomAlphaOfLength(5000) + ) + ); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"type\":\"illegal_argument_exception\"")) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + } } diff --git a/server/src/main/java/org/opensearch/rest/RestController.java b/server/src/main/java/org/opensearch/rest/RestController.java index 4f87c01258396..c17f723c13f2a 100644 --- a/server/src/main/java/org/opensearch/rest/RestController.java +++ b/server/src/main/java/org/opensearch/rest/RestController.java @@ -709,7 +709,7 @@ public void sendResponse(RestResponse response) { prepareResponse(response.status(), Map.of("Content-Type", List.of(response.contentType()))); } - Mono.ignoreElements(this).then(Mono.just(response)).subscribe(delegate::sendResponse); + Mono.from(this).ignoreElement().then(Mono.just(response)).subscribe(delegate::sendResponse); } @Override