Skip to content

Commit

Permalink
Dialogue guards against HTTPCLIENT-1924 client self-closure
Browse files Browse the repository at this point in the history
  • Loading branch information
carterkozak committed Jun 13, 2023
1 parent 5074abc commit 7111a5f
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public abstract static class CloseableClient implements Closeable {
static CloseableClient wrap(
CloseableHttpClient apacheClient,
@Safe String clientName,
PoolingHttpClientConnectionManager pool,
InstrumentedPoolingHttpClientConnectionManager pool,
ScheduledFuture<?> connectionEvictorFuture,
ClientConfiguration clientConfiguration,
@Nullable ExecutorService executor) {
Expand Down Expand Up @@ -277,7 +277,7 @@ private static final class CloseableClientImpl extends CloseableClient {

private final String clientName;
private final CloseableHttpClient apacheClient;
private final PoolingHttpClientConnectionManager pool;
private final InstrumentedPoolingHttpClientConnectionManager pool;
private final ResponseLeakDetector leakDetector;
private final ClientConfiguration clientConfiguration;

Expand All @@ -289,7 +289,7 @@ private static final class CloseableClientImpl extends CloseableClient {
private CloseableClientImpl(
CloseableHttpClient apacheClient,
@Safe String clientName,
PoolingHttpClientConnectionManager pool,
InstrumentedPoolingHttpClientConnectionManager pool,
ScheduledFuture<?> connectionEvictorFuture,
ResponseLeakDetector leakDetector,
@Nullable ExecutorService executor,
Expand All @@ -302,7 +302,7 @@ private CloseableClientImpl(
this.clientConfiguration = clientConfiguration;
closer.register(() -> connectionEvictorFuture.cancel(true));
closer.register(apacheClient);
closer.register(pool);
closer.register(pool::closeUnderlyingConnectionManager);
closer.register(DialogueClientMetrics.of(clientConfiguration.taggedMetricRegistry())
.close(clientName)::mark);
}
Expand Down Expand Up @@ -477,7 +477,7 @@ public CloseableClient build() {
? () -> new Socket(Proxy.NO_PROXY)
: () -> new Socket(new Proxy(Proxy.Type.SOCKS, socksProxyAddress));

PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(
PoolingHttpClientConnectionManager internalConnectionManager = new PoolingHttpClientConnectionManager(
RegistryBuilder.<ConnectionSocketFactory>create()
.register(URIScheme.HTTP.id, new PlainConnectionSocketFactory() {
@Override
Expand Down Expand Up @@ -510,7 +510,7 @@ public Socket createSocket(HttpContext _context) {
new InstrumentedDnsResolver(SystemDefaultDnsResolver.INSTANCE, name, conf.taggedMetricRegistry()),
new InstrumentedManagedHttpConnectionFactory(
ManagedHttpClientConnectionFactory.INSTANCE, conf.taggedMetricRegistry(), name));
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
internalConnectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoKeepAlive(true)
// The default socket configuration socket timeout only applies prior to request execution.
// By using a more specific timeout here, we bound the handshake in addition to the
Expand All @@ -519,11 +519,15 @@ public Socket createSocket(HttpContext _context) {
// Doesn't appear to do anything in this release
.setSocksProxyAddress(socksProxyAddress)
.build());
connectionManager.setValidateAfterInactivity(CONNECTION_INACTIVITY_CHECK);
connectionManager.setMaxTotal(Integer.MAX_VALUE);
connectionManager.setDefaultMaxPerRoute(Integer.MAX_VALUE);
internalConnectionManager.setValidateAfterInactivity(CONNECTION_INACTIVITY_CHECK);
internalConnectionManager.setMaxTotal(Integer.MAX_VALUE);
internalConnectionManager.setDefaultMaxPerRoute(Integer.MAX_VALUE);

setupConnectionPoolMetrics(conf.taggedMetricRegistry(), name, connectionManager);
setupConnectionPoolMetrics(conf.taggedMetricRegistry(), name, internalConnectionManager);

InstrumentedPoolingHttpClientConnectionManager connectionManager =
new InstrumentedPoolingHttpClientConnectionManager(
internalConnectionManager, conf.taggedMetricRegistry(), name);

HttpClientBuilder builder = HttpClients.custom()
.setDefaultRequestConfig(RequestConfig.custom()
Expand All @@ -543,9 +547,8 @@ public Socket createSocket(HttpContext _context) {
// precise IdleConnectionEvictor.
.setConnectionManagerShared(true)
.setKeepAliveStrategy(
new InactivityValidationAwareConnectionKeepAliveStrategy(connectionManager, name))
.setConnectionManager(new InstrumentedPoolingHttpClientConnectionManager(
connectionManager, conf.taggedMetricRegistry(), name))
new InactivityValidationAwareConnectionKeepAliveStrategy(internalConnectionManager, name))
.setConnectionManager(connectionManager)
.setRoutePlanner(new SystemDefaultRoutePlanner(null, conf.proxy()))
.disableAutomaticRetries()
// Must be disabled otherwise connections are not reused when client certificates are provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.CloseableTracer;
Expand Down Expand Up @@ -47,6 +48,7 @@ final class InstrumentedPoolingHttpClientConnectionManager
private final TaggedMetricRegistry registry;
private final String clientName;
private final Timer connectTimer;
private volatile boolean closed;

InstrumentedPoolingHttpClientConnectionManager(
PoolingHttpClientConnectionManager manager, TaggedMetricRegistry registry, String clientName) {
Expand All @@ -58,12 +60,41 @@ final class InstrumentedPoolingHttpClientConnectionManager

@Override
public void close() {
manager.close();
if (!closed) {
log.warn(
"Dialogue ConnectionManager close invoked unexpectedly and ignored",
SafeArg.of("clientName", clientName),
new SafeRuntimeException("stacktrace"));
// Note: manager.close is not invoked here, see closeUnderlyingConnectionManager.
}
}

@Override
public void close(CloseMode closeMode) {
manager.close(closeMode);
if (!closed) {
log.warn(
"Dialogue ConnectionManager close invoked unexpectedly and ignored",
SafeArg.of("clientName", clientName),
SafeArg.of("closeMode", closeMode),
new SafeRuntimeException("stacktrace"));
// Note: manager.close is not invoked here, see closeUnderlyingConnectionManager.
}
}

/**
* This method is used to close the underlying connection manager, while the {@link #close()} methods are
* overridden specifically not to do so in order to avoid unexpected closure in MainClientExec when
* an Error is encountered due to HTTPCLIENT-1924:
* https://github.com/apache/httpcomponents-client/blob/5b61e132c3871ddfa967ab21b3af5d6d738bc6e8/
* httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MainClientExec.java#L161-L164
* Note that MainClientExec pool self-closure will likely leak a connection each time it occurs, however
* dialogue bounds connections to Integer.MAX_VALUE, so this is preferable over
*/
void closeUnderlyingConnectionManager() {
if (!closed) {
closed = true;
manager.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.dialogue.hc5;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.collect.Iterables;
import com.palantir.conjure.java.client.config.ClientConfiguration;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Clients;
import com.palantir.dialogue.Deserializer;
import com.palantir.dialogue.EndpointChannel;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.RequestBody;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.TestConfigurations;
import com.palantir.dialogue.TestEndpoint;
import io.undertow.Undertow;
import io.undertow.server.handlers.ResponseCodeHandler;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import org.junit.jupiter.api.Test;

@SuppressWarnings("checkstyle:NestedTryDepth")
public final class ErrorRecoveryTest {

@Test
public void errorRecovery() throws Exception {
Clients clients = DefaultConjureRuntime.builder().build().clients();
Undertow server = startServer();
try {
String uri = "https://localhost:" + getPort(server);
ClientConfiguration conf = TestConfigurations.create(uri);
try (ApacheHttpClientChannels.CloseableClient client =
ApacheHttpClientChannels.createCloseableHttpClient(conf, "client")) {
Channel channel = ApacheHttpClientChannels.createSingleUri(uri, client);
EndpointChannel endpointChannel = request -> channel.execute(TestEndpoint.POST, request);
assertThatThrownBy(() ->
singleRequest(clients, endpointChannel, Optional.of(ErrorThrowingRequestBody.INSTANCE)))
.isInstanceOf(StackOverflowError.class);
// Following a failure, the client should be operable.
singleRequest(clients, endpointChannel, Optional.empty());
}
} finally {
server.stop();
}
}

private static void singleRequest(Clients clients, EndpointChannel endpointChannel, Optional<RequestBody> body) {
clients.callBlocking(endpointChannel, Request.builder().body(body).build(), VoidDeserializer.INSTANCE);
}

private static Undertow startServer() {
SSLContext sslContext = SslSocketFactories.createSslContext(TestConfigurations.SSL_CONFIG);
Undertow server = Undertow.builder()
.setHandler(ResponseCodeHandler.HANDLE_200)
.addHttpsListener(0, null, sslContext)
.build();
server.start();
return server;
}

private static int getPort(Undertow undertow) {
return ((InetSocketAddress)
Iterables.getOnlyElement(undertow.getListenerInfo()).getAddress())
.getPort();
}

private enum VoidDeserializer implements Deserializer<Void> {
INSTANCE;

@Override
public Void deserialize(Response response) {
response.close();
return null;
}

@Override
public Optional<String> accepts() {
return Optional.empty();
}
}

private enum ErrorThrowingRequestBody implements RequestBody {
INSTANCE;

@Override
public void writeTo(OutputStream _output) {
throw new StackOverflowError();
}

@Override
public String contentType() {
return "application/octet-stream";
}

@Override
public boolean repeatable() {
return false;
}

@Override
public void close() {}
}
}

0 comments on commit 7111a5f

Please sign in to comment.