Skip to content

Commit 5cd9340

Browse files
authored
Add protection for the I/O reactor and troubleshooting docs (#686)
Adds a wrapper around the RestClient's default response consumer so that any Error happening there (such as OOME) is reported to the application and doesn't crash the Apache http-client library's reactor. This should avoid I/O reactor related errors, such as I/O reactor terminated abnormally. Also add troubleshooting documentation for users of previous versions, and users using RestClient directly.
1 parent 4d555ba commit 5cd9340

File tree

6 files changed

+382
-2
lines changed

6 files changed

+382
-2
lines changed

docs/troubleshooting/index.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
* <<missing-required-property>>
1111
* <<no-such-method-request-options>>
12+
* <<io-reactor-errors>>
1213

1314
[discrete]
1415
=== Miscellaneous
@@ -23,4 +24,6 @@
2324

2425
include::missing-required-property.asciidoc[]
2526
include::no-such-method-request-options.asciidoc[]
27+
include::io-reactor-errors.asciidoc[]
28+
2629
include::serialize-without-typed-keys.asciidoc[]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
[[io-reactor-errors]]
2+
=== Apache http-client I/O reactor errors
3+
4+
Sending requests can sometimes fail with one of the following errors, coming from the Apache http-client library:
5+
6+
* `Request cannot be executed; I/O reactor status: STOPPED`
7+
* `I/O reactor terminated abnormally`
8+
* `I/O reactor has been shut down`
9+
10+
The I/O Reactor is the internal event loop in the http client library. It can terminate when an application callback throws an `Error`, like an `OutOfMemoryError` or a `StackOverflowError`. Remember that `Error` is different from a regular `Exception` and – https://docs.oracle.com/javase/8/docs/api/?java/lang/Error.html[quoting the Java documentation] – _indicates serious problems that a reasonable application should not try to catch_.
11+
12+
In the context of the Elasticsearch Java clients, this can happen on two occasions:
13+
14+
* the application calls the low level `RestClient` directly, using the asynchronous `performRequestAsync` method, and an `Error` is thrown in the `ResponseListener` provided by the application.
15+
* an `OutOfMemoryError` happens while buffering the body of an http response.
16+
17+
In the first case, it is the application's responsibility to catch `Error` in its `ResponseListener` and decide what to do when these errors happen.
18+
19+
The second case is taken care of in the {java-client} since version 8.12: the error is wrapped in a `RuntimeException` that is reported to the application.
20+
21+
In previous versions of the {java-client}, you can copy/paste the `SafeResponseConsumer` class in your project and initialize the `RestClientTransport` as follows:
22+
23+
["source","java"]
24+
------
25+
RestClient restClient = ...
26+
JsonpMapper mapper = ...
27+
RestClientOptions options = new RestClientOptions(
28+
SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS
29+
);
30+
RestClientTransport transport = new RestClientTransport(
31+
restClient, mapper, options
32+
);
33+
------

java-client/build.gradle.kts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,11 @@ dependencies {
251251
testImplementation("org.testcontainers", "testcontainers", "1.17.3")
252252
testImplementation("org.testcontainers", "elasticsearch", "1.17.3")
253253

254-
255254
testImplementation("io.opentelemetry", "opentelemetry-sdk", openTelemetryVersion)
255+
256+
// Apache-2.0
257+
// https://github.com/awaitility/awaitility
258+
testImplementation("org.awaitility", "awaitility", "4.2.0")
256259
}
257260

258261

java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public RestClientOptions build() {
188188
}
189189

190190
static RestClientOptions initialOptions() {
191-
return new RestClientOptions(RequestOptions.DEFAULT);
191+
return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS);
192192
}
193193

194194
private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) {
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package co.elastic.clients.transport.rest_client;
21+
22+
import org.apache.http.HttpException;
23+
import org.apache.http.HttpResponse;
24+
import org.apache.http.nio.ContentDecoder;
25+
import org.apache.http.nio.IOControl;
26+
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
27+
import org.apache.http.protocol.HttpContext;
28+
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
29+
import org.elasticsearch.client.RequestOptions;
30+
31+
import java.io.IOException;
32+
33+
/**
34+
* A response consumer that will propagate Errors as RuntimeExceptions to avoid crashing the IOReactor.
35+
*/
36+
public class SafeResponseConsumer<T> implements HttpAsyncResponseConsumer<T> {
37+
38+
private final HttpAsyncResponseConsumer<T> delegate;
39+
40+
/**
41+
* A consumer factory that safely wraps the one provided by {@code RequestOptions.DEFAULT}.
42+
*/
43+
public static final HttpAsyncResponseConsumerFactory DEFAULT_FACTORY = () -> new SafeResponseConsumer<>(
44+
RequestOptions.DEFAULT.getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer()
45+
);
46+
47+
/**
48+
* Same as {@code RequestOptions.DEFAULT} with a safe consumer factory
49+
*/
50+
public static final RequestOptions DEFAULT_REQUEST_OPTIONS;
51+
52+
static {
53+
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
54+
builder.setHttpAsyncResponseConsumerFactory(DEFAULT_FACTORY);
55+
DEFAULT_REQUEST_OPTIONS = builder.build();
56+
}
57+
58+
public SafeResponseConsumer(HttpAsyncResponseConsumer<T> delegate) {
59+
this.delegate = delegate;
60+
}
61+
62+
@SuppressWarnings("unchecked")
63+
private static <T extends Throwable> void throwUnchecked(Throwable thr) throws T {
64+
throw (T) thr;
65+
}
66+
67+
@Override
68+
public void responseReceived(HttpResponse response) throws IOException, HttpException {
69+
try {
70+
delegate.responseReceived(response);
71+
} catch(Exception e) {
72+
throwUnchecked(e);
73+
} catch(Throwable e) {
74+
throw new RuntimeException("Error receiving response", e);
75+
}
76+
}
77+
78+
@Override
79+
public void consumeContent(ContentDecoder decoder, IOControl ioControl) throws IOException {
80+
try {
81+
delegate.consumeContent(decoder, ioControl);
82+
} catch(Exception e) {
83+
throwUnchecked(e);
84+
} catch(Throwable e) {
85+
throw new RuntimeException("Error consuming content", e);
86+
}
87+
}
88+
89+
@Override
90+
public void responseCompleted(HttpContext context) {
91+
try {
92+
delegate.responseCompleted(context);
93+
} catch(Exception e) {
94+
throwUnchecked(e);
95+
} catch(Throwable e) {
96+
throw new RuntimeException("Error completing response", e);
97+
}
98+
}
99+
100+
@Override
101+
public void failed(Exception ex) {
102+
try {
103+
delegate.failed(ex);
104+
} catch(Exception e) {
105+
throwUnchecked(e);
106+
} catch(Throwable e) {
107+
throw new RuntimeException("Error handling failure", e);
108+
}
109+
}
110+
111+
@Override
112+
public Exception getException() {
113+
return delegate.getException();
114+
}
115+
116+
@Override
117+
public T getResult() {
118+
return delegate.getResult();
119+
}
120+
121+
@Override
122+
public boolean isDone() {
123+
return delegate.isDone();
124+
}
125+
126+
@Override
127+
public void close() throws IOException {
128+
delegate.close();
129+
}
130+
131+
@Override
132+
public boolean cancel() {
133+
return delegate.cancel();
134+
}
135+
}

0 commit comments

Comments
 (0)