From 6c4ee083be2d4ffb528a4a6259dfcf63bd74ac51 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Thu, 17 Jan 2019 10:43:03 +0100
Subject: [PATCH 1/4] Issue #132 - ClientConnector abstraction.
Introduced ClientConnector and refactored HttpClient transports,
removing duplicated code that was connect() to a remote host.
Refactored also HTTP2Client to reference ClientConnector.
Refactored tests accordingly to the changes introduced in the
implementations.
Signed-off-by: Simone Bordet
---
.../alpn/client/ALPNClientConnection.java | 8 +-
.../client/ALPNClientConnectionFactory.java | 3 +-
jetty-alpn/jetty-alpn-java-client/pom.xml | 1 +
.../alpn/java/client/JDK9HTTP2Client.java | 5 +-
.../jetty/alpn/java/server/JDK9ALPNTest.java | 21 +-
.../AbstractConnectorHttpClientTransport.java | 156 ++------
.../client/AbstractHttpClientTransport.java | 13 +
.../org/eclipse/jetty/client/HttpClient.java | 91 ++---
.../jetty/client/HttpClientTransport.java | 4 +-
.../java/org/eclipse/jetty/client/Origin.java | 40 ++-
.../jetty/client/ResponseNotifier.java | 27 +-
.../http/HttpClientTransportOverHTTP.java | 11 +-
.../client/http/HttpConnectionOverHTTP.java | 1 -
.../client/HttpClientCustomProxyTest.java | 7 +-
.../jetty/client/HttpClientTLSTest.java | 20 +-
.../eclipse/jetty/client/HttpClientTest.java | 28 +-
.../InsufficientThreadsDetectionTest.java | 12 +-
.../http/HttpDestinationOverHTTPTest.java | 18 +-
.../http/HttpClientTransportOverFCGI.java | 9 +-
.../jetty/http2/client/HTTP2Client.java | 245 +++----------
.../client/HTTP2ClientConnectionFactory.java | 15 +-
.../eclipse/jetty/http2/client/Client.java | 94 -----
.../http/HttpClientTransportOverHTTP2.java | 19 +-
.../client/http/MaxConcurrentStreamsTest.java | 6 +-
.../jetty/io/ClientConnectionFactory.java | 8 +-
.../org/eclipse/jetty/io/ClientConnector.java | 333 ++++++++++++++++++
.../jetty/io/NegotiatingClientConnection.java | 6 +-
.../io/ssl/SslClientConnectionFactory.java | 17 +-
.../eclipse/jetty/proxy/ProxyServletTest.java | 18 +-
.../HttpClientTransportOverUnixSockets.java | 144 +++++---
.../jetty/unixsocket/UnixSocketTest.java | 97 +++--
.../http/client/ConnectionStatisticsTest.java | 21 +-
.../http/client/HttpClientContinueTest.java | 10 +-
.../client/HttpClientIdleTimeoutTest.java | 18 +-
.../jetty/http/client/HttpClientTest.java | 25 +-
.../test/resources/jetty-logging.properties | 2 +-
36 files changed, 789 insertions(+), 764 deletions(-)
delete mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/Client.java
create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
diff --git a/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnection.java b/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnection.java
index 9779f0ec49ab..4b6c25573b88 100644
--- a/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnection.java
+++ b/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnection.java
@@ -27,13 +27,9 @@
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.NegotiatingClientConnection;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
public class ALPNClientConnection extends NegotiatingClientConnection
{
- private static final Logger LOG = Log.getLogger(ALPNClientConnection.class);
-
private final List protocols;
public ALPNClientConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, SSLEngine sslEngine, Map context, List protocols)
@@ -49,9 +45,9 @@ public List getProtocols()
public void selected(String protocol)
{
- if (protocol==null || !protocols.contains(protocol))
+ if (protocol == null || !protocols.contains(protocol))
close();
else
- super.completed();
+ completed();
}
}
diff --git a/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnectionFactory.java b/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnectionFactory.java
index c1a003266206..75d14ff337ce 100644
--- a/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnectionFactory.java
+++ b/jetty-alpn/jetty-alpn-client/src/main/java/org/eclipse/jetty/alpn/client/ALPNClientConnectionFactory.java
@@ -18,7 +18,6 @@
package org.eclipse.jetty.alpn.client;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -96,7 +95,7 @@ public ALPNClientConnectionFactory(Executor executor, ClientConnectionFactory co
}
@Override
- public Connection newConnection(EndPoint endPoint, Map context) throws IOException
+ public Connection newConnection(EndPoint endPoint, Map context)
{
SSLEngine engine = (SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY);
for (Client processor : processors)
diff --git a/jetty-alpn/jetty-alpn-java-client/pom.xml b/jetty-alpn/jetty-alpn-java-client/pom.xml
index 31660c91677f..6054bc0d7a6b 100644
--- a/jetty-alpn/jetty-alpn-java-client/pom.xml
+++ b/jetty-alpn/jetty-alpn-java-client/pom.xml
@@ -42,6 +42,7 @@
jetty-alpn-client
${project.version}
+
org.eclipse.jetty.http2
http2-client
diff --git a/jetty-alpn/jetty-alpn-java-client/src/test/java/org/eclipse/jetty/alpn/java/client/JDK9HTTP2Client.java b/jetty-alpn/jetty-alpn-java-client/src/test/java/org/eclipse/jetty/alpn/java/client/JDK9HTTP2Client.java
index 9dbcf11e479c..aa772b826e36 100644
--- a/jetty-alpn/jetty-alpn-java-client/src/test/java/org/eclipse/jetty/alpn/java/client/JDK9HTTP2Client.java
+++ b/jetty-alpn/jetty-alpn-java-client/src/test/java/org/eclipse/jetty/alpn/java/client/JDK9HTTP2Client.java
@@ -35,22 +35,19 @@
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
public class JDK9HTTP2Client
{
public static void main(String[] args) throws Exception
{
HTTP2Client client = new HTTP2Client();
- SslContextFactory sslContextFactory = new SslContextFactory();
- client.addBean(sslContextFactory);
client.start();
String host = "webtide.com";
int port = 443;
FuturePromise sessionPromise = new FuturePromise<>();
- client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
+ client.connect(client.getClientConnector().getSslContextFactory(), new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);
HttpFields requestFields = new HttpFields();
diff --git a/jetty-alpn/jetty-alpn-java-server/src/test/java/org/eclipse/jetty/alpn/java/server/JDK9ALPNTest.java b/jetty-alpn/jetty-alpn-java-server/src/test/java/org/eclipse/jetty/alpn/java/server/JDK9ALPNTest.java
index 8f74502dfe31..72a5aef503ac 100644
--- a/jetty-alpn/jetty-alpn-java-server/src/test/java/org/eclipse/jetty/alpn/java/server/JDK9ALPNTest.java
+++ b/jetty-alpn/jetty-alpn-java-server/src/test/java/org/eclipse/jetty/alpn/java/server/JDK9ALPNTest.java
@@ -18,11 +18,7 @@
package org.eclipse.jetty.alpn.java.server;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-
import java.io.BufferedReader;
-import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
@@ -31,7 +27,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -45,8 +40,12 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
public class JDK9ALPNTest
{
private Server server;
@@ -66,6 +65,13 @@ public void startServer(Handler handler) throws Exception
server.start();
}
+ @AfterEach
+ public void stopServer() throws Exception
+ {
+ if (server != null)
+ server.stop();
+ }
+
private SslContextFactory newSslContextFactory()
{
SslContextFactory sslContextFactory = new SslContextFactory();
@@ -84,7 +90,7 @@ public void testClientNotSupportingALPNServerSpeaksDefaultProtocol() throws Exce
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
- protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
@@ -126,7 +132,7 @@ public void testClientSupportingALPNServerSpeaksNegotiatedProtocol() throws Exce
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
- protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
@@ -163,6 +169,5 @@ protected void doNonErrorHandle(String target, Request baseRequest, HttpServletR
break;
}
}
-
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
index 67f57a114863..0c234daa160f 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
@@ -18,21 +18,12 @@
package org.eclipse.jetty.client;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
+import java.time.Duration;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.ManagedSelector;
-import org.eclipse.jetty.io.SelectorManager;
-import org.eclipse.jetty.io.SocketChannelEndPoint;
-import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
+import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@@ -40,147 +31,48 @@
@ManagedObject
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
{
- private final int selectors;
- private SelectorManager selectorManager;
+ private final ClientConnector connector;
- protected AbstractConnectorHttpClientTransport(int selectors)
+ protected AbstractConnectorHttpClientTransport(ClientConnector connector)
{
- this.selectors = selectors;
+ this.connector = connector;
+ addBean(connector);
+ }
+
+ public ClientConnector getClientConnector()
+ {
+ return connector;
}
@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
- return selectors;
+ return connector.getSelectors();
}
@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
- selectorManager = newSelectorManager(httpClient);
- selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
- addBean(selectorManager);
+ connector.setBindAddress(httpClient.getBindAddress());
+ connector.setByteBufferPool(httpClient.getByteBufferPool());
+ connector.setConnectBlocking(httpClient.isConnectBlocking());
+ connector.setConnectTimeout(Duration.ofMillis(httpClient.getConnectTimeout()));
+ connector.setExecutor(httpClient.getExecutor());
+ connector.setIdleTimeout(Duration.ofMillis(httpClient.getIdleTimeout()));
+ connector.setScheduler(httpClient.getScheduler());
+ connector.setSslContextFactory(httpClient.getSslContextFactory());
super.doStart();
}
- @Override
- protected void doStop() throws Exception
- {
- super.doStop();
- removeBean(selectorManager);
- }
-
@Override
public void connect(InetSocketAddress address, Map context)
{
- SocketChannel channel = null;
- try
- {
- channel = SocketChannel.open();
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- HttpClient client = destination.getHttpClient();
- SocketAddress bindAddress = client.getBindAddress();
- if (bindAddress != null)
- channel.bind(bindAddress);
- configure(client, channel);
-
- context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
- context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
-
- boolean connected = true;
- if (client.isConnectBlocking())
- {
- channel.socket().connect(address, (int)client.getConnectTimeout());
- channel.configureBlocking(false);
- }
- else
- {
- channel.configureBlocking(false);
- connected = channel.connect(address);
- }
- if (connected)
- selectorManager.accept(channel, context);
- else
- selectorManager.connect(channel, context);
- }
- // Must catch all exceptions, since some like
- // UnresolvedAddressException are not IOExceptions.
- catch (Throwable x)
- {
- // If IPv6 is not deployed, a generic SocketException "Network is unreachable"
- // exception is being thrown, so we attempt to provide a better error message.
- if (x.getClass() == SocketException.class)
- x = new SocketException("Could not connect to " + address).initCause(x);
-
- try
- {
- if (channel != null)
- channel.close();
- }
- catch (IOException xx)
- {
- LOG.ignore(xx);
- }
- finally
- {
- connectFailed(context, x);
- }
- }
- }
-
- protected void connectFailed(Map context, Throwable x)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory());
@SuppressWarnings("unchecked")
Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
- promise.failed(x);
- }
-
- protected void configure(HttpClient client, SocketChannel channel) throws IOException
- {
- channel.socket().setTcpNoDelay(client.isTCPNoDelay());
- }
-
- protected SelectorManager newSelectorManager(HttpClient client)
- {
- return new ClientSelectorManager(client, getSelectors());
- }
-
- protected class ClientSelectorManager extends SelectorManager
- {
- private final HttpClient client;
-
- protected ClientSelectorManager(HttpClient client, int selectors)
- {
- super(client.getExecutor(), client.getScheduler(), selectors);
- this.client = client;
- }
-
- @Override
- protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
- {
- SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
- endp.setIdleTimeout(client.getIdleTimeout());
- return endp;
- }
-
- @Override
- public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
- {
- @SuppressWarnings("unchecked")
- Map context = (Map)attachment;
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- return destination.getClientConnectionFactory().newConnection(endPoint, context);
- }
-
- @Override
- protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
- {
- @SuppressWarnings("unchecked")
- Map context = (Map)attachment;
- connectFailed(context, x);
- }
+ context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
+ connector.connect(address, context);
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java
index 3a601f7c2378..e5282e0fce9d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java
@@ -18,6 +18,10 @@
package org.eclipse.jetty.client;
+import java.util.Map;
+
+import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
@@ -53,4 +57,13 @@ public void setConnectionPoolFactory(ConnectionPool.Factory factory)
{
this.factory = factory;
}
+
+ protected void connectFailed(Map context, Throwable failure)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
+ @SuppressWarnings("unchecked")
+ Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
+ promise.failed(failure);
+ }
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
index e6761f41eca1..555871270cc9 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
@@ -81,16 +81,16 @@
import org.eclipse.jetty.util.thread.ThreadPool;
/**
- * {@link HttpClient} provides an efficient, asynchronous, non-blocking implementation
+ *
HttpClient provides an efficient, asynchronous, non-blocking implementation
* to perform HTTP requests to a server through a simple API that offers also blocking semantic.
- * {@link HttpClient} provides easy-to-use methods such as {@link #GET(String)} that allow to perform HTTP
+ *
HttpClient provides easy-to-use methods such as {@link #GET(String)} that allow to perform HTTP
* requests in a one-liner, but also gives the ability to fine tune the configuration of requests via
* {@link HttpClient#newRequest(URI)}.
- * {@link HttpClient} acts as a central configuration point for network parameters (such as idle timeouts)
+ *
HttpClient acts as a central configuration point for network parameters (such as idle timeouts)
* and HTTP parameters (such as whether to follow redirects).
- * {@link HttpClient} transparently pools connections to servers, but allows direct control of connections
+ *
HttpClient transparently pools connections to servers, but allows direct control of connections
* for cases where this is needed.
- * {@link HttpClient} also acts as a central configuration point for cookies, via {@link #getCookieStore()}.
+ * HttpClient also acts as a central configuration point for cookies, via {@link #getCookieStore()}.
* Typical usage:
*
* HttpClient httpClient = new HttpClient();
@@ -157,7 +157,7 @@ public class HttpClient extends ContainerLifeCycle
private String defaultRequestContentType = "application/octet-stream";
/**
- * Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only
+ * Creates a HttpClient instance that can perform requests to non-TLS destinations only
* (that is, requests with the "http" scheme only, and not "https").
*
* @see #HttpClient(SslContextFactory) to perform requests to TLS destinations.
@@ -168,7 +168,7 @@ public HttpClient()
}
/**
- * Creates a {@link HttpClient} instance that can perform requests to non-TLS and TLS destinations
+ * Creates a HttpClient instance that can perform requests to non-TLS and TLS destinations
* (that is, both requests with the "http" scheme and with the "https" scheme).
*
* @param sslContextFactory the {@link SslContextFactory} that manages TLS encryption
@@ -517,7 +517,7 @@ private URI checkHost(URI uri)
/**
* Returns a {@link Destination} for the given scheme, host and port.
* Applications may use {@link Destination}s to create {@link Connection}s
- * that will be outside {@link HttpClient}'s pooling mechanism, to explicitly
+ * that will be outside HttpClient's pooling mechanism, to explicitly
* control the connection lifecycle (in particular their termination with
* {@link Connection#close()}).
*
@@ -570,7 +570,7 @@ protected boolean removeDestination(HttpDestination destination)
}
/**
- * @return the list of destinations known to this {@link HttpClient}.
+ * @return the list of destinations known to this HttpClient.
*/
public List getDestinations()
{
@@ -586,13 +586,13 @@ protected void send(final HttpRequest request, List l
protected void newConnection(final HttpDestination destination, final Promise promise)
{
Origin.Address address = destination.getConnectAddress();
- resolver.resolve(address.getHost(), address.getPort(), new Promise>()
+ resolver.resolve(address.getHost(), address.getPort(), new Promise<>()
{
@Override
public void succeeded(List socketAddresses)
{
Map context = new HashMap<>();
- context.put(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, HttpClient.this);
+ context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this);
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
connect(socketAddresses, 0, context);
}
@@ -605,7 +605,7 @@ public void failed(Throwable x)
private void connect(List socketAddresses, int index, Map context)
{
- context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper(promise)
+ context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise)
{
@Override
public void failed(Throwable x)
@@ -638,7 +638,7 @@ protected ProtocolHandler findProtocolHandler(Request request, Response response
}
/**
- * @return the {@link ByteBufferPool} of this {@link HttpClient}
+ * @return the {@link ByteBufferPool} of this HttpClient
*/
public ByteBufferPool getByteBufferPool()
{
@@ -646,7 +646,7 @@ public ByteBufferPool getByteBufferPool()
}
/**
- * @param byteBufferPool the {@link ByteBufferPool} of this {@link HttpClient}
+ * @param byteBufferPool the {@link ByteBufferPool} of this HttpClient
*/
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
@@ -706,7 +706,7 @@ public long getAddressResolutionTimeout()
/**
* Sets the socket address resolution timeout used by the default {@link SocketAddressResolver}
- * created by this {@link HttpClient} at startup.
+ * created by this HttpClient at startup.
* For more fine tuned configuration of socket address resolution, see
* {@link #setSocketAddressResolver(SocketAddressResolver)}.
*
@@ -755,7 +755,7 @@ public void setBindAddress(SocketAddress bindAddress)
}
/**
- * @return the "User-Agent" HTTP field of this {@link HttpClient}
+ * @return the "User-Agent" HTTP field of this HttpClient
*/
public HttpField getUserAgentField()
{
@@ -763,7 +763,7 @@ public HttpField getUserAgentField()
}
/**
- * @param agent the "User-Agent" HTTP header string of this {@link HttpClient}
+ * @param agent the "User-Agent" HTTP header string of this HttpClient
*/
public void setUserAgentField(HttpField agent)
{
@@ -773,7 +773,7 @@ public void setUserAgentField(HttpField agent)
}
/**
- * @return whether this {@link HttpClient} follows HTTP redirects
+ * @return whether this HttpClient follows HTTP redirects
* @see Request#isFollowRedirects()
*/
@ManagedAttribute("Whether HTTP redirects are followed")
@@ -783,7 +783,7 @@ public boolean isFollowRedirects()
}
/**
- * @param follow whether this {@link HttpClient} follows HTTP redirects
+ * @param follow whether this HttpClient follows HTTP redirects
* @see #setMaxRedirects(int)
*/
public void setFollowRedirects(boolean follow)
@@ -792,7 +792,7 @@ public void setFollowRedirects(boolean follow)
}
/**
- * @return the {@link Executor} of this {@link HttpClient}
+ * @return the {@link Executor} of this HttpClient
*/
public Executor getExecutor()
{
@@ -800,7 +800,7 @@ public Executor getExecutor()
}
/**
- * @param executor the {@link Executor} of this {@link HttpClient}
+ * @param executor the {@link Executor} of this HttpClient
*/
public void setExecutor(Executor executor)
{
@@ -811,7 +811,7 @@ public void setExecutor(Executor executor)
}
/**
- * @return the {@link Scheduler} of this {@link HttpClient}
+ * @return the {@link Scheduler} of this HttpClient
*/
public Scheduler getScheduler()
{
@@ -819,7 +819,7 @@ public Scheduler getScheduler()
}
/**
- * @param scheduler the {@link Scheduler} of this {@link HttpClient}
+ * @param scheduler the {@link Scheduler} of this HttpClient
*/
public void setScheduler(Scheduler scheduler)
{
@@ -830,7 +830,7 @@ public void setScheduler(Scheduler scheduler)
}
/**
- * @return the {@link SocketAddressResolver} of this {@link HttpClient}
+ * @return the {@link SocketAddressResolver} of this HttpClient
*/
public SocketAddressResolver getSocketAddressResolver()
{
@@ -838,7 +838,7 @@ public SocketAddressResolver getSocketAddressResolver()
}
/**
- * @param resolver the {@link SocketAddressResolver} of this {@link HttpClient}
+ * @param resolver the {@link SocketAddressResolver} of this HttpClient
*/
public void setSocketAddressResolver(SocketAddressResolver resolver)
{
@@ -849,7 +849,7 @@ public void setSocketAddressResolver(SocketAddressResolver resolver)
}
/**
- * @return the max number of connections that this {@link HttpClient} opens to {@link Destination}s
+ * @return the max number of connections that this HttpClient opens to {@link Destination}s
*/
@ManagedAttribute("The max number of connections per each destination")
public int getMaxConnectionsPerDestination()
@@ -862,11 +862,11 @@ public int getMaxConnectionsPerDestination()
*
* RFC 2616 suggests that 2 connections should be opened per each destination,
* but browsers commonly open 6.
- * If this {@link HttpClient} is used for load testing, it is common to have only one destination
+ * If this HttpClient is used for load testing, it is common to have only one destination
* (the server to load test), and it is recommended to set this value to a high value (at least as
* much as the threads present in the {@link #getExecutor() executor}).
*
- * @param maxConnectionsPerDestination the max number of connections that this {@link HttpClient} opens to {@link Destination}s
+ * @param maxConnectionsPerDestination the max number of connections that this HttpClient opens to {@link Destination}s
*/
public void setMaxConnectionsPerDestination(int maxConnectionsPerDestination)
{
@@ -885,11 +885,11 @@ public int getMaxRequestsQueuedPerDestination()
/**
* Sets the max number of requests that may be queued to a destination.
*
- * If this {@link HttpClient} performs a high rate of requests to a destination,
+ * If this HttpClient performs a high rate of requests to a destination,
* and all the connections managed by that destination are busy with other requests,
* then new requests will be queued up in the destination.
* This parameter controls how many requests can be queued before starting to reject them.
- * If this {@link HttpClient} is used for load testing, it is common to have this parameter
+ * If this HttpClient is used for load testing, it is common to have this parameter
* set to a high value, although this may impact latency (requests sit in the queue for a long
* time before being sent).
*
@@ -970,35 +970,6 @@ public void setTCPNoDelay(boolean tcpNoDelay)
this.tcpNoDelay = tcpNoDelay;
}
- /**
- * @return true to dispatch I/O operations in a different thread, false to execute them in the selector thread
- * @see #setDispatchIO(boolean)
- */
- @Deprecated
- public boolean isDispatchIO()
- {
- // TODO this did default to true, so usage needs to be evaluated.
- return false;
- }
-
- /**
- * Whether to dispatch I/O operations from the selector thread to a different thread.
- *
- * This implementation never blocks on I/O operation, but invokes application callbacks that may
- * take time to execute or block on other I/O.
- * If application callbacks are known to take time or block on I/O, then parameter {@code dispatchIO}
- * should be set to true.
- * If application callbacks are known to be quick and never block on I/O, then parameter {@code dispatchIO}
- * may be set to false.
- *
- * @param dispatchIO true to dispatch I/O operations in a different thread,
- * false to execute them in the selector thread
- */
- @Deprecated
- public void setDispatchIO(boolean dispatchIO)
- {
- }
-
/**
* Gets the http compliance mode for parsing http responses.
* The default http compliance level is {@link HttpCompliance#RFC7230} which is the latest HTTP/1.1 specification
@@ -1256,7 +1227,7 @@ public boolean containsAll(Collection> c)
public Iterator iterator()
{
final Iterator iterator = set.iterator();
- return new Iterator()
+ return new Iterator<>()
{
@Override
public boolean hasNext()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java
index ff5f9269d6d6..7d2366a19fd0 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java
@@ -36,8 +36,8 @@
*/
public interface HttpClientTransport extends ClientConnectionFactory
{
- public static final String HTTP_DESTINATION_CONTEXT_KEY = "http.destination";
- public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "http.connection.promise";
+ public static final String HTTP_DESTINATION_CONTEXT_KEY = "org.eclipse.jetty.client.destination";
+ public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.connection.promise";
/**
* Sets the {@link HttpClient} instance on this transport.
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/Origin.java b/jetty-client/src/main/java/org/eclipse/jetty/client/Origin.java
index c94dfb8928cd..344b1c16edb5 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/Origin.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/Origin.java
@@ -48,18 +48,13 @@ public Address getAddress()
return address;
}
- public String asString()
- {
- StringBuilder result = new StringBuilder();
- URIUtil.appendSchemeHostPort(result, scheme, address.host, address.port);
- return result.toString();
- }
-
@Override
public boolean equals(Object obj)
{
- if (this == obj) return true;
- if (obj == null || getClass() != obj.getClass()) return false;
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
Origin that = (Origin)obj;
return scheme.equals(that.scheme) && address.equals(that.address);
}
@@ -67,9 +62,20 @@ public boolean equals(Object obj)
@Override
public int hashCode()
{
- int result = scheme.hashCode();
- result = 31 * result + address.hashCode();
- return result;
+ return Objects.hash(scheme, address);
+ }
+
+ public String asString()
+ {
+ StringBuilder result = new StringBuilder();
+ URIUtil.appendSchemeHostPort(result, scheme, address.host, address.port);
+ return result.toString();
+ }
+
+ @Override
+ public String toString()
+ {
+ return asString();
}
public static class Address
@@ -96,8 +102,10 @@ public int getPort()
@Override
public boolean equals(Object obj)
{
- if (this == obj) return true;
- if (obj == null || getClass() != obj.getClass()) return false;
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
Address that = (Address)obj;
return host.equals(that.host) && port == that.port;
}
@@ -105,9 +113,7 @@ public boolean equals(Object obj)
@Override
public int hashCode()
{
- int result = host.hashCode();
- result = 31 * result + port;
- return result;
+ return Objects.hash(host, port);
}
public String asString()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java
index 6d9faa94089a..7db9af838515 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java
@@ -203,16 +203,7 @@ private void notifyComplete(Response.CompleteListener listener, Result result)
public void forwardSuccess(List listeners, Response response)
{
- notifyBegin(listeners, response);
- for (Iterator iterator = response.getHeaders().iterator(); iterator.hasNext();)
- {
- HttpField field = iterator.next();
- if (!notifyHeader(listeners, response, field))
- iterator.remove();
- }
- notifyHeaders(listeners, response);
- if (response instanceof ContentResponse)
- notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), Callback.NOOP);
+ forwardEvents(listeners, response);
notifySuccess(listeners, response);
}
@@ -223,9 +214,16 @@ public void forwardSuccessComplete(List listeners, Re
}
public void forwardFailure(List listeners, Response response, Throwable failure)
+ {
+ forwardEvents(listeners, response);
+ notifyFailure(listeners, response, failure);
+ }
+
+ private void forwardEvents(List listeners, Response response)
{
notifyBegin(listeners, response);
- for (Iterator iterator = response.getHeaders().iterator(); iterator.hasNext();)
+ Iterator iterator = response.getHeaders().iterator();
+ while (iterator.hasNext())
{
HttpField field = iterator.next();
if (!notifyHeader(listeners, response, field))
@@ -233,8 +231,11 @@ public void forwardFailure(List listeners, Response r
}
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
- notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), Callback.NOOP);
- notifyFailure(listeners, response, failure);
+ {
+ byte[] content = ((ContentResponse)response).getContent();
+ if (content != null && content.length > 0)
+ notifyContent(listeners, response, ByteBuffer.wrap(content), Callback.NOOP);
+ }
}
public void forwardFailureComplete(List listeners, Request request, Throwable requestFailure, Response response, Throwable responseFailure)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
index 8c353e98f79f..9ef6bd7c49ef 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
@@ -26,6 +26,7 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.Promise;
@@ -36,12 +37,18 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran
{
public HttpClientTransportOverHTTP()
{
- this(Math.max( 1, ProcessorUtils.availableProcessors() / 2));
+ this(Math.max(1, ProcessorUtils.availableProcessors() / 2));
}
public HttpClientTransportOverHTTP(int selectors)
{
- super(selectors);
+ this(new ClientConnector());
+ getClientConnector().setSelectors(selectors);
+ }
+
+ public HttpClientTransportOverHTTP(ClientConnector connector)
+ {
+ super(connector);
setConnectionPoolFactory(destination -> new DuplexConnectionPool(destination, getHttpClient().getMaxConnectionsPerDestination(), destination));
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
index 001989b79137..746e54ec2b02 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
@@ -93,7 +93,6 @@ public long getBytesOut()
return bytesOut.longValue();
}
-
protected void addBytesOut(long bytesOut)
{
this.bytesOut.add(bytesOut);
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientCustomProxyTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientCustomProxyTest.java
index 529df7b41121..4dffe0b939bc 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientCustomProxyTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientCustomProxyTest.java
@@ -18,9 +18,6 @@
package org.eclipse.jetty.client;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -49,9 +46,11 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
-
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class HttpClientCustomProxyTest
{
public static final byte[] CAFE_BABE = new byte[]{(byte)0xCA, (byte)0xFE, (byte)0xBA, (byte)0xBE};
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java
index ecb7789efb78..ea806cb8c31f 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java
@@ -18,15 +18,6 @@
package org.eclipse.jetty.client;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
-
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
@@ -53,17 +44,22 @@
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class HttpClientTLSTest
{
private Server server;
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
index b32c271ff45a..9fba73442ca6 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
@@ -18,16 +18,6 @@
package org.eclipse.jetty.client;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -106,6 +96,16 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
@ExtendWith(WorkDirExtension.class)
public class HttpClientTest extends AbstractHttpClientServerTest
{
@@ -1716,10 +1716,12 @@ public void test204WithContent(Scenario scenario) throws Exception
try (ServerSocket server = new ServerSocket(0))
{
- startClient(scenario);
- client.setMaxConnectionsPerDestination(1);
int idleTimeout = 2000;
- client.setIdleTimeout(idleTimeout);
+ startClient(scenario, null, httpClient ->
+ {
+ httpClient.setMaxConnectionsPerDestination(1);
+ httpClient.setIdleTimeout(idleTimeout);
+ });
Request request = client.newRequest("localhost", server.getLocalPort())
.scheme(scenario.getScheme())
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java
index ea209422f799..86051d6f8d71 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java
@@ -18,13 +18,12 @@
package org.eclipse.jetty.client;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
-
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
public class InsufficientThreadsDetectionTest
{
@Test
@@ -33,9 +32,7 @@ public void testInsufficientThreads()
QueuedThreadPool clientThreads = new QueuedThreadPool(1);
HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP(1), null);
httpClient.setExecutor(clientThreads);
- assertThrows(IllegalStateException.class, ()->{
- httpClient.start();
- });
+ assertThrows(IllegalStateException.class, httpClient::start);
}
@Test
@@ -46,7 +43,8 @@ public void testInsufficientThreadsForMultipleHttpClients() throws Exception
httpClient1.setExecutor(clientThreads);
httpClient1.start();
- assertThrows(IllegalStateException.class, ()->{
+ assertThrows(IllegalStateException.class, () ->
+ {
// Share the same thread pool with another instance.
HttpClient httpClient2 = new HttpClient(new HttpClientTransportOverHTTP(1), null);
httpClient2.setExecutor(clientThreads);
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java
index 1d10407b6bbf..7dff8d953a71 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java
@@ -18,9 +18,6 @@
package org.eclipse.jetty.client.http;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.*;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -39,10 +36,18 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.hamcrest.Matchers;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@@ -178,10 +183,9 @@ public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection(Scenario
@ArgumentsSource(ScenarioProvider.class)
public void test_IdleConnection_IdleTimeout(Scenario scenario) throws Exception
{
- start(scenario, new EmptyServerHandler());
-
+ startServer(scenario, new EmptyServerHandler());
long idleTimeout = 1000;
- client.setIdleTimeout(idleTimeout);
+ startClient(scenario, null, httpClient -> httpClient.setIdleTimeout(idleTimeout));
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
{
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
index 4ab77cb2d57e..2c095ee2dab4 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
@@ -29,6 +29,7 @@
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.Promise;
@@ -47,7 +48,13 @@ public HttpClientTransportOverFCGI(String scriptRoot)
public HttpClientTransportOverFCGI(int selectors, String scriptRoot)
{
- super(selectors);
+ this(new ClientConnector(), scriptRoot);
+ getClientConnector().setSelectors(selectors);
+ }
+
+ public HttpClientTransportOverFCGI(ClientConnector connector, String scriptRoot)
+ {
+ super(connector);
this.scriptRoot = scriptRoot;
setConnectionPoolFactory(destination ->
{
diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java
index 3b969874d63f..a058215190a3 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java
@@ -18,13 +18,10 @@
package org.eclipse.jetty.http2.client;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.Arrays;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,32 +35,24 @@
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
-import org.eclipse.jetty.io.Connection;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.ManagedSelector;
-import org.eclipse.jetty.io.MappedByteBufferPool;
-import org.eclipse.jetty.io.SelectorManager;
-import org.eclipse.jetty.io.SocketChannelEndPoint;
+import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
/**
- * {@link HTTP2Client} provides an asynchronous, non-blocking implementation
+ *
HTTP2Client provides an asynchronous, non-blocking implementation
* to send HTTP/2 frames to a server.
* Typical usage:
*
* // Create and start HTTP2Client.
* HTTP2Client client = new HTTP2Client();
- * SslContextFactory sslContextFactory = new SslContextFactory();
- * client.addBean(sslContextFactory);
* client.start();
+ * SslContextFactory sslContextFactory = client.getClientConnector().getSslContextFactory();
*
* // Connect to host.
* String host = "webtide.com";
@@ -108,7 +97,7 @@
* // Use the Stream object to send request content, if any, using a DATA frame.
* ByteBuffer content = ...;
* DataFrame requestContent = new DataFrame(stream.getId(), content, true);
- * stream.data(requestContent, Callback.Adapter.INSTANCE);
+ * stream.data(requestContent, Callback.NOOP);
*
* // When done, stop the client.
* client.stop();
@@ -117,18 +106,9 @@
@ManagedObject
public class HTTP2Client extends ContainerLifeCycle
{
- private Executor executor;
- private Scheduler scheduler;
- private ByteBufferPool bufferPool;
- private ClientConnectionFactory connectionFactory;
- private SelectorManager selector;
- private int selectors = 1;
- private long idleTimeout = 30000;
- private long connectTimeout = 10000;
- private boolean connectBlocking;
- private SocketAddress bindAddress;
+ private final ClientConnector connector;
private int inputBufferSize = 8192;
- private List protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
+ private List protocols = List.of("h2");
private int initialSessionRecvWindow = 16 * 1024 * 1024;
private int initialStreamRecvWindow = 8 * 1024 * 1024;
private int maxFrameLength = Frame.DEFAULT_MAX_LENGTH;
@@ -136,96 +116,50 @@ public class HTTP2Client extends ContainerLifeCycle
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
- @Override
- protected void doStart() throws Exception
+ public HTTP2Client()
{
- if (executor == null)
- setExecutor(new QueuedThreadPool());
-
- if (scheduler == null)
- setScheduler(new ScheduledExecutorScheduler());
-
- if (bufferPool == null)
- setByteBufferPool(new MappedByteBufferPool());
-
- if (connectionFactory == null)
- {
- HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
- setClientConnectionFactory((endPoint, context) ->
- {
- ClientConnectionFactory factory = h2;
- SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
- if (sslContextFactory != null)
- {
- ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
- factory = newSslClientConnectionFactory(sslContextFactory, alpn);
- }
- return factory.newConnection(endPoint, context);
- });
- }
-
- if (selector == null)
- {
- selector = newSelectorManager();
- addBean(selector);
- }
- selector.setConnectTimeout(getConnectTimeout());
-
- super.doStart();
+ this(new ClientConnector());
}
- protected SelectorManager newSelectorManager()
+ public HTTP2Client(ClientConnector connector)
{
- return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
+ this.connector = connector;
+ addBean(connector);
}
- protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory sslContextFactory, ClientConnectionFactory connectionFactory)
+ public ClientConnector getClientConnector()
{
- return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory);
+ return connector;
}
public Executor getExecutor()
{
- return executor;
+ return connector.getExecutor();
}
public void setExecutor(Executor executor)
{
- this.updateBean(this.executor, executor);
- this.executor = executor;
+ connector.setExecutor(executor);
}
public Scheduler getScheduler()
{
- return scheduler;
+ return connector.getScheduler();
}
public void setScheduler(Scheduler scheduler)
{
- this.updateBean(this.scheduler, scheduler);
- this.scheduler = scheduler;
+ connector.setScheduler(scheduler);
}
public ByteBufferPool getByteBufferPool()
{
- return bufferPool;
+ return connector.getByteBufferPool();
}
public void setByteBufferPool(ByteBufferPool bufferPool)
{
- this.updateBean(this.bufferPool, bufferPool);
- this.bufferPool = bufferPool;
- }
-
- public ClientConnectionFactory getClientConnectionFactory()
- {
- return connectionFactory;
- }
-
- public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
- {
- this.updateBean(this.connectionFactory, connectionFactory);
- this.connectionFactory = connectionFactory;
+ connector.setByteBufferPool(bufferPool);
}
public FlowControlStrategy.Factory getFlowControlStrategyFactory()
@@ -241,58 +175,55 @@ public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowContro
@ManagedAttribute("The number of selectors")
public int getSelectors()
{
- return selectors;
+ return connector.getSelectors();
}
public void setSelectors(int selectors)
{
- this.selectors = selectors;
+ connector.setSelectors(selectors);
}
@ManagedAttribute("The idle timeout in milliseconds")
public long getIdleTimeout()
{
- return idleTimeout;
+ return connector.getIdleTimeout().toMillis();
}
public void setIdleTimeout(long idleTimeout)
{
- this.idleTimeout = idleTimeout;
+ connector.setIdleTimeout(Duration.ofMillis(idleTimeout));
}
@ManagedAttribute("The connect timeout in milliseconds")
public long getConnectTimeout()
{
- return connectTimeout;
+ return connector.getConnectTimeout().toMillis();
}
public void setConnectTimeout(long connectTimeout)
{
- this.connectTimeout = connectTimeout;
- SelectorManager selector = this.selector;
- if (selector != null)
- selector.setConnectTimeout(connectTimeout);
+ connector.setConnectTimeout(Duration.ofMillis(connectTimeout));
}
@ManagedAttribute("Whether the connect() operation is blocking")
public boolean isConnectBlocking()
{
- return connectBlocking;
+ return connector.isConnectBlocking();
}
public void setConnectBlocking(boolean connectBlocking)
{
- this.connectBlocking = connectBlocking;
+ connector.setConnectBlocking(connectBlocking);
}
public SocketAddress getBindAddress()
{
- return bindAddress;
+ return connector.getBindAddress();
}
public void setBindAddress(SocketAddress bindAddress)
{
- this.bindAddress = bindAddress;
+ connector.setBindAddress(bindAddress);
}
@ManagedAttribute("The size of the buffer used to read from the network")
@@ -374,6 +305,7 @@ public void setMaxSettingsKeys(int maxSettingsKeys)
public void connect(InetSocketAddress address, Session.Listener listener, Promise promise)
{
+ // Prior-knowledge clear-text HTTP/2 (h2c).
connect(null, address, listener, promise);
}
@@ -384,112 +316,49 @@ public void connect(SslContextFactory sslContextFactory, InetSocketAddress addre
public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context)
{
- try
- {
- SocketChannel channel = SocketChannel.open();
- SocketAddress bindAddress = getBindAddress();
- if (bindAddress != null)
- channel.bind(bindAddress);
- configure(channel);
- boolean connected = true;
- if (isConnectBlocking())
- {
- channel.socket().connect(address, (int)getConnectTimeout());
- channel.configureBlocking(false);
- }
- else
- {
- channel.configureBlocking(false);
- connected = channel.connect(address);
- }
- context = contextFrom(sslContextFactory, address, listener, promise, context);
- if (connected)
- selector.accept(channel, context);
- else
- selector.connect(channel, context);
- }
- catch (Throwable x)
- {
- promise.failed(x);
- }
+ ClientConnectionFactory factory = newClientConnectionFactory(sslContextFactory);
+ connect(address, factory, listener, promise, context);
+ }
+
+ public void connect(SocketAddress address, ClientConnectionFactory factory, Session.Listener listener, Promise promise, Map context)
+ {
+ context = contextFrom(factory, listener, promise, context);
+ context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
+ connector.connect(address, context);
}
public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise promise)
{
- try
- {
- if (!channel.isConnected())
- throw new IllegalStateException("SocketChannel must be connected");
- channel.configureBlocking(false);
- Map context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
- selector.accept(channel, context);
- }
- catch (Throwable x)
- {
- promise.failed(x);
- }
+ ClientConnectionFactory factory = newClientConnectionFactory(sslContextFactory);
+ accept(channel, factory, listener, promise);
}
- private Map contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context)
+ public void accept(SocketChannel channel, ClientConnectionFactory factory, Session.Listener listener, Promise promise)
+ {
+ Map context = contextFrom(factory, listener, promise, null);
+ context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
+ connector.accept(channel, context);
+ }
+
+ private Map contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise promise, Map context)
{
if (context == null)
context = new HashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
- if (sslContextFactory != null)
- context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
- context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
- context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
- context.putIfAbsent(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, this);
+ context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, factory);
return context;
}
- protected void configure(SocketChannel channel) throws IOException
- {
- channel.socket().setTcpNoDelay(true);
- }
-
- private class ClientSelectorManager extends SelectorManager
+ private ClientConnectionFactory newClientConnectionFactory(SslContextFactory sslContextFactory)
{
- private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
- {
- super(executor, scheduler, selectors);
- }
-
- @Override
- protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
- {
- SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
- endp.setIdleTimeout(getIdleTimeout());
- return endp;
- }
-
- @Override
- public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
- {
- @SuppressWarnings("unchecked")
- Map context = (Map)attachment;
- context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
- context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
- context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
- return getClientConnectionFactory().newConnection(endpoint, context);
- }
-
- @Override
- protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
+ ClientConnectionFactory factory = new HTTP2ClientConnectionFactory();
+ if (sslContextFactory != null)
{
- @SuppressWarnings("unchecked")
- Map context = (Map)attachment;
- if (LOG.isDebugEnabled())
- {
- Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
- Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
- LOG.debug("Could not connect to {}:{}", host, port);
- }
- @SuppressWarnings("unchecked")
- Promise promise = (Promise)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
- promise.failed(failure);
+ ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), factory, getProtocols());
+ factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
}
+ return factory;
}
}
diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
index cfe45c13d49e..5236e9e143e5 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
@@ -42,12 +42,9 @@
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
{
- public static final String CLIENT_CONTEXT_KEY = "http2.client";
- public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool";
- public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor";
- public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler";
- public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
- public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
+ public static final String CLIENT_CONTEXT_KEY = "org.eclipse.jetty.client.http2";
+ public static final String SESSION_LISTENER_CONTEXT_KEY = "org.eclipse.jetty.client.http2.sessionListener";
+ public static final String SESSION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.http2.sessionPromise";
private final Connection.Listener connectionListener = new ConnectionListener();
@@ -55,9 +52,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public Connection newConnection(EndPoint endPoint, Map context)
{
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
- ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
- Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY);
- Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY);
+ ByteBufferPool byteBufferPool = client.getByteBufferPool();
+ Executor executor = client.getExecutor();
+ Scheduler scheduler = client.getScheduler();
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise promise = (Promise)context.get(SESSION_PROMISE_CONTEXT_KEY);
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/Client.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/Client.java
deleted file mode 100644
index 956a7b39c8f9..000000000000
--- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/Client.java
+++ /dev/null
@@ -1,94 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-// ========================================================================
-//
-
-package org.eclipse.jetty.http2.client;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.http.HttpFields;
-import org.eclipse.jetty.http.HttpURI;
-import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.http.MetaData;
-import org.eclipse.jetty.http2.api.Session;
-import org.eclipse.jetty.http2.api.Stream;
-import org.eclipse.jetty.http2.api.server.ServerSessionListener;
-import org.eclipse.jetty.http2.frames.DataFrame;
-import org.eclipse.jetty.http2.frames.HeadersFrame;
-import org.eclipse.jetty.http2.frames.PushPromiseFrame;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.FuturePromise;
-import org.eclipse.jetty.util.Jetty;
-import org.eclipse.jetty.util.Promise;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-
-public class Client
-{
- public static void main(String[] args) throws Exception
- {
- HTTP2Client client = new HTTP2Client();
- SslContextFactory sslContextFactory = new SslContextFactory();
- client.addBean(sslContextFactory);
- client.start();
-
- String host = "webtide.com";
- int port = 443;
-
- FuturePromise sessionPromise = new FuturePromise<>();
- client.connect(sslContextFactory, new InetSocketAddress(host, port), new ServerSessionListener.Adapter(), sessionPromise);
- Session session = sessionPromise.get(5, TimeUnit.SECONDS);
-
- HttpFields requestFields = new HttpFields();
- requestFields.put("User-Agent", client.getClass().getName() + "/" + Jetty.VERSION);
- MetaData.Request metaData = new MetaData.Request("GET", new HttpURI("https://" + host + ":" + port + "/"), HttpVersion.HTTP_2, requestFields);
- HeadersFrame headersFrame = new HeadersFrame(metaData, null, true);
- final Phaser phaser = new Phaser(2);
- session.newStream(headersFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
- {
- @Override
- public void onHeaders(Stream stream, HeadersFrame frame)
- {
- System.err.println(frame);
- if (frame.isEndStream())
- phaser.arrive();
- }
-
- @Override
- public void onData(Stream stream, DataFrame frame, Callback callback)
- {
- System.err.println(frame);
- callback.succeeded();
- if (frame.isEndStream())
- phaser.arrive();
- }
-
- @Override
- public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
- {
- System.err.println(frame);
- phaser.register();
- return this;
- }
- });
-
- phaser.awaitAdvanceInterruptibly(phaser.arrive(), 5, TimeUnit.SECONDS);
-
- client.stop();
- }
-}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
index 7584d1966e1d..cacb023db7dd 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
@@ -45,7 +45,6 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
@ManagedObject("The HTTP/2 client transport")
public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
@@ -101,13 +100,7 @@ protected void doStart() throws Exception
}
addBean(client);
super.doStart();
-
- this.connectionFactory = new HTTP2ClientConnectionFactory();
- client.setClientConnectionFactory((endPoint, context) ->
- {
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- return destination.getClientConnectionFactory().newConnection(endPoint, context);
- });
+ connectionFactory = new HTTP2ClientConnectionFactory();
}
@Override
@@ -134,16 +127,12 @@ public void connect(InetSocketAddress address, Map context)
SessionListenerPromise listenerPromise = new SessionListenerPromise(context);
HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- SslContextFactory sslContextFactory = null;
- if (HttpScheme.HTTPS.is(destination.getScheme()))
- sslContextFactory = httpClient.getSslContextFactory();
-
- connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
+ connect(address, destination.getClientConnectionFactory(), listenerPromise, listenerPromise, context);
}
- protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context)
+ protected void connect(InetSocketAddress address, ClientConnectionFactory factory, Session.Listener listener, Promise promise, Map context)
{
- getHTTP2Client().connect(sslContextFactory, address, listener, promise, context);
+ getHTTP2Client().connect(address, factory, listener, promise, context);
}
@Override
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
index cee3802220d2..21225c96a95b 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
@@ -51,11 +51,11 @@
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
+import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Promise;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;
@@ -171,9 +171,9 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client())
{
@Override
- protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context)
+ protected void connect(InetSocketAddress address, ClientConnectionFactory factory, Session.Listener listener, Promise promise, Map context)
{
- super.connect(sslContextFactory, address, new Wrapper(listener)
+ super.connect(address, factory, new Wrapper(listener)
{
@Override
public void onSettings(Session session, SettingsFrame frame)
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnectionFactory.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnectionFactory.java
index 9902ce0d4e04..d935b3d1eb23 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnectionFactory.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnectionFactory.java
@@ -28,10 +28,9 @@
*/
public interface ClientConnectionFactory
{
- public static final String CONNECTOR_CONTEXT_KEY = "client.connector";
+ public static final String CLIENT_CONTEXT_KEY = "org.eclipse.jetty.client";
/**
- *
* @param endPoint the {@link org.eclipse.jetty.io.EndPoint} to link the newly created connection to
* @param context the context data to create the connection
* @return a new {@link Connection}
@@ -41,8 +40,9 @@ public interface ClientConnectionFactory
public default Connection customize(Connection connection, Map context)
{
- ContainerLifeCycle connector = (ContainerLifeCycle)context.get(CONNECTOR_CONTEXT_KEY);
- connector.getBeans(Connection.Listener.class).forEach(connection::addListener);
+ ContainerLifeCycle client = (ContainerLifeCycle)context.get(CLIENT_CONTEXT_KEY);
+ if (client != null)
+ client.getBeans(Connection.Listener.class).forEach(connection::addListener);
return connection;
}
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
new file mode 100644
index 000000000000..5bb069124fb9
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
@@ -0,0 +1,333 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.eclipse.jetty.util.Promise;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+public class ClientConnector extends ContainerLifeCycle
+{
+ public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector";
+ public static final String SOCKET_ADDRESS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".socketAddress";
+ public static final String CLIENT_CONNECTION_FACTORY_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".clientConnectionFactory";
+ public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
+ private static final Logger LOG = Log.getLogger(ClientConnector.class);
+
+ private Executor executor;
+ private Scheduler scheduler;
+ private ByteBufferPool byteBufferPool;
+ private SslContextFactory sslContextFactory;
+ private SelectorManager selectorManager;
+ private int selectors = 1;
+ private boolean connectBlocking;
+ private Duration connectTimeout = Duration.ofSeconds(5);
+ private Duration idleTimeout = Duration.ofSeconds(30);
+ private SocketAddress bindAddress;
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
+ public void setExecutor(Executor executor)
+ {
+ if (isStarted())
+ throw new IllegalStateException();
+ updateBean(this.executor, executor);
+ this.executor = executor;
+ }
+
+ public Scheduler getScheduler()
+ {
+ return scheduler;
+ }
+
+ public void setScheduler(Scheduler scheduler)
+ {
+ if (isStarted())
+ throw new IllegalStateException();
+ updateBean(this.scheduler, scheduler);
+ this.scheduler = scheduler;
+ }
+
+ public ByteBufferPool getByteBufferPool()
+ {
+ return byteBufferPool;
+ }
+
+ public void setByteBufferPool(ByteBufferPool byteBufferPool)
+ {
+ if (isStarted())
+ throw new IllegalStateException();
+ updateBean(this.byteBufferPool, byteBufferPool);
+ this.byteBufferPool = byteBufferPool;
+ }
+
+ public SslContextFactory getSslContextFactory()
+ {
+ return sslContextFactory;
+ }
+
+ public void setSslContextFactory(SslContextFactory sslContextFactory)
+ {
+ if (isStarted())
+ throw new IllegalStateException();
+ updateBean(this.sslContextFactory, sslContextFactory);
+ this.sslContextFactory = sslContextFactory;
+ }
+
+ public int getSelectors()
+ {
+ return selectors;
+ }
+
+ public void setSelectors(int selectors)
+ {
+ if (isStarted())
+ throw new IllegalStateException();
+ this.selectors = selectors;
+ }
+
+ public boolean isConnectBlocking()
+ {
+ return connectBlocking;
+ }
+
+ public void setConnectBlocking(boolean connectBlocking)
+ {
+ this.connectBlocking = connectBlocking;
+ }
+
+ public Duration getConnectTimeout()
+ {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(Duration connectTimeout)
+ {
+ this.connectTimeout = connectTimeout;
+ if (selectorManager != null)
+ selectorManager.setConnectTimeout(connectTimeout.toMillis());
+ }
+
+ public Duration getIdleTimeout()
+ {
+ return idleTimeout;
+ }
+
+ public void setIdleTimeout(Duration idleTimeout)
+ {
+ this.idleTimeout = idleTimeout;
+ }
+
+ public SocketAddress getBindAddress()
+ {
+ return bindAddress;
+ }
+
+ public void setBindAddress(SocketAddress bindAddress)
+ {
+ this.bindAddress = bindAddress;
+ }
+
+ @Override
+ protected void doStart() throws Exception
+ {
+ if (executor == null)
+ setExecutor(new QueuedThreadPool());
+ if (scheduler == null)
+ setScheduler(new ScheduledExecutorScheduler());
+ if (byteBufferPool == null)
+ setByteBufferPool(new MappedByteBufferPool());
+ if (sslContextFactory == null)
+ setSslContextFactory(newSslContextFactory());
+ selectorManager = newSelectorManager();
+ selectorManager.setConnectTimeout(getConnectTimeout().toMillis());
+ addBean(selectorManager);
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception
+ {
+ super.doStop();
+ removeBean(selectorManager);
+ }
+
+ protected SslContextFactory newSslContextFactory()
+ {
+ SslContextFactory sslContextFactory = new SslContextFactory(false);
+ sslContextFactory.setEndpointIdentificationAlgorithm("HTTPS");
+ return sslContextFactory;
+ }
+
+ protected SelectorManager newSelectorManager()
+ {
+ return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
+ }
+
+ public void connect(SocketAddress address, Map context)
+ {
+ SocketChannel channel = null;
+ try
+ {
+ if (context == null)
+ context = new HashMap<>();
+ context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
+ context.putIfAbsent(SOCKET_ADDRESS_CONTEXT_KEY, address);
+
+ channel = SocketChannel.open();
+ SocketAddress bindAddress = getBindAddress();
+ if (bindAddress != null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Binding to {} to connect to {}", bindAddress, address);
+ channel.bind(bindAddress);
+ }
+ configure(channel);
+
+ boolean connected = true;
+ boolean blocking = isConnectBlocking();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Connecting {} to {}", blocking ? "blocking" : "non-blocking", address);
+ if (blocking)
+ {
+ channel.socket().connect(address, (int)getConnectTimeout().toMillis());
+ channel.configureBlocking(false);
+ }
+ else
+ {
+ channel.configureBlocking(false);
+ connected = channel.connect(address);
+ }
+
+ if (connected)
+ selectorManager.accept(channel, context);
+ else
+ selectorManager.connect(channel, context);
+ }
+ // Must catch all exceptions, since some like
+ // UnresolvedAddressException are not IOExceptions.
+ catch (Throwable x)
+ {
+ // If IPv6 is not deployed, a generic SocketException "Network is unreachable"
+ // exception is being thrown, so we attempt to provide a better error message.
+ if (x.getClass() == SocketException.class)
+ x = new SocketException("Could not connect to " + address).initCause(x);
+
+ try
+ {
+ if (channel != null)
+ channel.close();
+ }
+ catch (IOException xx)
+ {
+ LOG.ignore(xx);
+ }
+ finally
+ {
+ connectFailed(x, context);
+ }
+ }
+ }
+
+ public void accept(SocketChannel channel, Map context)
+ {
+ try
+ {
+ context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
+
+ if (!channel.isConnected())
+ throw new IllegalStateException("SocketChannel must be connected");
+ configure(channel);
+ channel.configureBlocking(false);
+ selectorManager.accept(channel, context);
+ }
+ catch (Throwable failure)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not accept {}", channel);
+ Promise> promise = (Promise>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
+ promise.failed(failure);
+ }
+ }
+
+ protected void configure(SocketChannel channel) throws IOException
+ {
+ channel.socket().setTcpNoDelay(true);
+ }
+
+ private void connectFailed(Throwable failure, Map context)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not connect to {}", context.get(SOCKET_ADDRESS_CONTEXT_KEY));
+ Promise> promise = (Promise>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
+ promise.failed(failure);
+ }
+
+ private class ClientSelectorManager extends SelectorManager
+ {
+ private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
+ {
+ super(executor, scheduler, selectors);
+ }
+
+ @Override
+ protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
+ {
+ SocketChannelEndPoint endPoint = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
+ endPoint.setIdleTimeout(getIdleTimeout().toMillis());
+ return endPoint;
+ }
+
+ @Override
+ public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
+ {
+ @SuppressWarnings("unchecked")
+ Map context = (Map)attachment;
+ ClientConnectionFactory factory = (ClientConnectionFactory)context.get(CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
+ return factory.newConnection(endPoint, context);
+ }
+
+ @Override
+ protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
+ {
+ @SuppressWarnings("unchecked")
+ Map context = (Map)attachment;
+ connectFailed(failure, context);
+ }
+ }
+}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java
index 932644d6e620..867f88957aac 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java
@@ -37,9 +37,9 @@ public abstract class NegotiatingClientConnection extends AbstractConnection
private final Map context;
private volatile boolean completed;
- protected NegotiatingClientConnection(EndPoint endp, Executor executor, SSLEngine sslEngine, ClientConnectionFactory connectionFactory, Map context)
+ protected NegotiatingClientConnection(EndPoint endPoint, Executor executor, SSLEngine sslEngine, ClientConnectionFactory connectionFactory, Map context)
{
- super(endp, executor);
+ super(endPoint, executor);
this.engine = sslEngine;
this.connectionFactory = connectionFactory;
this.context = context;
@@ -67,7 +67,7 @@ public void onOpen()
else
fillInterested();
}
- catch (IOException x)
+ catch (Throwable x)
{
close();
throw new RuntimeIOException(x);
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslClientConnectionFactory.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslClientConnectionFactory.java
index a86fcb9f3c76..874f2a815aa6 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslClientConnectionFactory.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslClientConnectionFactory.java
@@ -19,6 +19,7 @@
package org.eclipse.jetty.io.ssl;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
@@ -27,6 +28,7 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
+import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@@ -34,10 +36,7 @@
public class SslClientConnectionFactory implements ClientConnectionFactory
{
- public static final String SSL_CONTEXT_FACTORY_CONTEXT_KEY = "ssl.context.factory";
- public static final String SSL_PEER_HOST_CONTEXT_KEY = "ssl.peer.host";
- public static final String SSL_PEER_PORT_CONTEXT_KEY = "ssl.peer.port";
- public static final String SSL_ENGINE_CONTEXT_KEY = "ssl.engine";
+ public static final String SSL_ENGINE_CONTEXT_KEY = "org.eclipse.jetty.client.ssl.engine";
private final SslContextFactory sslContextFactory;
private final ByteBufferPool byteBufferPool;
@@ -88,9 +87,8 @@ public void setAllowMissingCloseMessage(boolean allowMissingCloseMessage)
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
{
- String host = (String)context.get(SSL_PEER_HOST_CONTEXT_KEY);
- int port = (Integer)context.get(SSL_PEER_PORT_CONTEXT_KEY);
- SSLEngine engine = sslContextFactory.newSSLEngine(host, port);
+ InetSocketAddress address = (InetSocketAddress)context.get(ClientConnector.SOCKET_ADDRESS_CONTEXT_KEY);
+ SSLEngine engine = sslContextFactory.newSSLEngine(address);
engine.setUseClientMode(true);
context.put(SSL_ENGINE_CONTEXT_KEY, engine);
@@ -119,8 +117,9 @@ public Connection customize(Connection connection, Map context)
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
sslConnection.setRenegotiationLimit(sslContextFactory.getRenegotiationLimit());
sslConnection.setAllowMissingCloseMessage(isAllowMissingCloseMessage());
- ContainerLifeCycle connector = (ContainerLifeCycle)context.get(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY);
- connector.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener);
+ ContainerLifeCycle client = (ContainerLifeCycle)context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY);
+ if (client != null)
+ client.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener);
}
return ClientConnectionFactory.super.customize(connection, context);
}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java
index a42b626d11aa..fe62af7044e9 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java
@@ -41,6 +41,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
@@ -171,16 +172,23 @@ private void startProxy(Class extends ProxyServlet> proxyServletClass, Map consumer) throws Exception
+ {
+ client = prepareClient(consumer);
+ }
+
+ private HttpClient prepareClient(Consumer consumer) throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
HttpClient result = new HttpClient();
result.setExecutor(clientPool);
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
+ if (consumer != null)
+ consumer.accept(result);
result.start();
return result;
}
@@ -987,7 +995,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
assertEquals(name, cookies.get(0).getName());
assertEquals(value1, cookies.get(0).getValue());
- HttpClient client2 = prepareClient();
+ HttpClient client2 = prepareClient(null);
try
{
String value2 = "2";
@@ -1373,10 +1381,8 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
}
});
startProxy(proxyServletClass);
- startClient();
-
long idleTimeout = 1000;
- client.setIdleTimeout(idleTimeout);
+ startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
byte[] content = new byte[1024];
new Random().nextBytes(content);
diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
index 9e30713099aa..aa2c77463a82 100644
--- a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
+++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
@@ -21,85 +21,110 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.channels.SocketChannel;
import java.util.Map;
+import jnr.enxio.channels.NativeSelectorProvider;
+import jnr.unixsocket.UnixSocketAddress;
+import jnr.unixsocket.UnixSocketChannel;
+import org.eclipse.jetty.client.AbstractHttpClientTransport;
+import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
-import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
+import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
+import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
-import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.unixsocket.UnixSocketEndPoint;
+import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import jnr.enxio.channels.NativeSelectorProvider;
-import jnr.unixsocket.UnixSocketAddress;
-import jnr.unixsocket.UnixSocketChannel;
-
-public class HttpClientTransportOverUnixSockets
- extends HttpClientTransportOverHTTP
+// TODO: this class needs a thorough review.
+public class HttpClientTransportOverUnixSockets extends AbstractHttpClientTransport
{
- private static final Logger LOG = Log.getLogger( HttpClientTransportOverUnixSockets.class );
-
+ private static final Logger LOG = Log.getLogger(HttpClientTransportOverUnixSockets.class);
+
private String _unixSocket;
private SelectorManager selectorManager;
private UnixSocketChannel channel;
- public HttpClientTransportOverUnixSockets( String unixSocket )
+ public HttpClientTransportOverUnixSockets(String unixSocket)
{
- if ( unixSocket == null )
- {
- throw new IllegalArgumentException( "Unix socket file cannot be null" );
- }
+ if (unixSocket == null)
+ throw new IllegalArgumentException("Unix socket file cannot be null");
this._unixSocket = unixSocket;
+ setConnectionPoolFactory(destination ->
+ {
+ HttpClient httpClient = getHttpClient();
+ int maxConnections = httpClient.getMaxConnectionsPerDestination();
+ return new DuplexConnectionPool(destination, maxConnections, destination);
+ });
}
@Override
- protected SelectorManager newSelectorManager(HttpClient client)
+ protected void doStart() throws Exception
{
- return selectorManager = new UnixSocketSelectorManager(client,getSelectors());
+ HttpClient httpClient = getHttpClient();
+ selectorManager = new UnixSocketSelectorManager(httpClient, 1);
+ selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
+ addBean(selectorManager);
+ super.doStart();
}
@Override
- public void connect( InetSocketAddress address, Map context )
+ protected void doStop() throws Exception
{
+ super.doStop();
+ try
+ {
+ if (channel != null)
+ channel.close();
+ }
+ catch (IOException xx)
+ {
+ LOG.ignore(xx);
+ }
+ }
+
+ @Override
+ public HttpDestination newHttpDestination(Origin origin)
+ {
+ return new HttpDestinationOverHTTP(getHttpClient(), origin);
+ }
+ @Override
+ public void connect(InetSocketAddress address, Map context)
+ {
try
{
InetAddress inet = address.getAddress();
if (!inet.isLoopbackAddress() && !inet.isLinkLocalAddress() && !inet.isSiteLocalAddress())
- throw new IOException("UnixSocket cannot connect to "+address.getHostString());
-
+ throw new IOException("UnixSocket cannot connect to " + address.getHostString());
+
// Open a unix socket
- UnixSocketAddress unixAddress = new UnixSocketAddress( this._unixSocket );
- channel = UnixSocketChannel.open( unixAddress );
-
+ UnixSocketAddress unixAddress = new UnixSocketAddress(this._unixSocket);
+ channel = UnixSocketChannel.open(unixAddress);
+
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
configure(client, channel);
channel.configureBlocking(false);
- selectorManager.accept(channel, context);
+ selectorManager.accept(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
- // If IPv6 is not deployed, a generic SocketException "Network is unreachable"
- // exception is being thrown, so we attempt to provide a better error message.
- if (x.getClass() == SocketException.class)
- x = new SocketException("Could not connect to " + address).initCause(x);
-
try
{
if (channel != null)
@@ -116,11 +141,33 @@ public void connect( InetSocketAddress address, Map context )
}
}
- public class UnixSocketSelectorManager extends ClientSelectorManager
+ @Override
+ public Connection newConnection(EndPoint endPoint, Map context)
+ {
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ @SuppressWarnings("unchecked")
+ Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
+ HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", connection);
+ return customize(connection, context);
+ }
+
+ protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
+ {
+ return new HttpConnectionOverHTTP(endPoint, destination, promise);
+ }
+
+ protected void configure(HttpClient client, SocketChannel channel) throws IOException
+ {
+ channel.socket().setTcpNoDelay(client.isTCPNoDelay());
+ }
+
+ public class UnixSocketSelectorManager extends SelectorManager
{
protected UnixSocketSelectorManager(HttpClient client, int selectors)
{
- super(client,selectors);
+ super(client.getExecutor(), client.getScheduler(), selectors);
}
@Override
@@ -132,25 +179,26 @@ protected Selector newSelector() throws IOException
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
- UnixSocketEndPoint endp = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
- endp.setIdleTimeout(getHttpClient().getIdleTimeout());
- return endp;
+ UnixSocketEndPoint endPoint = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
+ endPoint.setIdleTimeout(getHttpClient().getIdleTimeout());
+ return endPoint;
}
- }
- @Override
- protected void doStop()
- throws Exception
- {
- super.doStop();
- try
+ @Override
+ public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
- if (channel != null)
- channel.close();
+ @SuppressWarnings("unchecked")
+ Map context = (Map)attachment;
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
- catch (IOException xx)
+
+ @Override
+ protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
- LOG.ignore(xx);
+ @SuppressWarnings("unchecked")
+ Map context = (Map)attachment;
+ connectFailed(context, x);
}
}
}
diff --git a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java
index 682d55c78d1d..3481fcb0daf3 100644
--- a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java
+++ b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java
@@ -18,14 +18,6 @@
package org.eclipse.jetty.unixsocket;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.condition.OS.LINUX;
-import static org.junit.jupiter.api.condition.OS.MAC;
-
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@@ -34,7 +26,6 @@
import java.util.Date;
import java.util.concurrent.ExecutionException;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -54,6 +45,14 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.condition.OS.LINUX;
+import static org.junit.jupiter.api.condition.OS.MAC;
+
@EnabledOnOs({LINUX, MAC})
public class UnixSocketTest
{
@@ -68,94 +67,86 @@ public void before() throws Exception
{
server = null;
httpClient = null;
- String unixSocketTmp = System.getProperty( "unix.socket.tmp" );
- if(StringUtil.isNotBlank( unixSocketTmp ) )
- {
- sockFile = Files.createTempFile( Paths.get(unixSocketTmp), "unix", ".sock" );
- } else {
- sockFile = Files.createTempFile("unix", ".sock" );
- }
- assertTrue(Files.deleteIfExists(sockFile),"temp sock file cannot be deleted");
+ String unixSocketTmp = System.getProperty("unix.socket.tmp");
+ if (StringUtil.isNotBlank(unixSocketTmp))
+ sockFile = Files.createTempFile(Paths.get(unixSocketTmp), "unix", ".sock");
+ else
+ sockFile = Files.createTempFile("unix", ".sock");
+ assertTrue(Files.deleteIfExists(sockFile), "temp sock file cannot be deleted");
}
-
+
@AfterEach
public void after() throws Exception
{
- if (httpClient!=null)
+ if (httpClient != null)
httpClient.stop();
- if (server!=null)
+ if (server != null)
server.stop();
// Force delete, this will fail if UnixSocket was not closed properly in the implementation
- FS.delete( sockFile);
+ FS.delete(sockFile);
}
-
+
@Test
public void testUnixSocket() throws Exception
{
server = new Server();
-
HttpConnectionFactory http = new HttpConnectionFactory();
+ UnixSocketConnector connector = new UnixSocketConnector(server, http);
+ connector.setUnixSocket(sockFile.toString());
+ server.addConnector(connector);
- UnixSocketConnector connector = new UnixSocketConnector( server, http );
- connector.setUnixSocket( sockFile.toString() );
- server.addConnector( connector );
-
- server.setHandler( new AbstractHandler.ErrorDispatchHandler()
+ server.setHandler(new AbstractHandler.ErrorDispatchHandler()
{
@Override
- protected void doNonErrorHandle( String target, Request baseRequest, HttpServletRequest request,
- HttpServletResponse response )
- throws IOException, ServletException
+ protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
int l = 0;
- if ( request.getContentLength() != 0 )
+ if (request.getContentLength() != 0)
{
InputStream in = request.getInputStream();
byte[] buffer = new byte[4096];
int r = 0;
- while ( r >= 0 )
+ while (r >= 0)
{
l += r;
- r = in.read( buffer );
+ r = in.read(buffer);
}
}
- log.info( "UnixSocketTest: request received" );
- baseRequest.setHandled( true );
- response.setStatus( 200 );
- response.getWriter().write( "Hello World " + new Date() + "\r\n" );
+ log.info("UnixSocketTest: request received");
+ baseRequest.setHandled(true);
+ response.setStatus(200);
+ response.getWriter().write("Hello World " + new Date() + "\r\n");
response.getWriter().write(
- "remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n" );
+ "remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n");
response.getWriter().write(
- "local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n" );
- response.getWriter().write( "read =" + l + "\r\n" );
+ "local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n");
+ response.getWriter().write("read =" + l + "\r\n");
}
- } );
+ });
server.start();
- httpClient = new HttpClient( new HttpClientTransportOverUnixSockets( sockFile.toString() ), null );
+ httpClient = new HttpClient(new HttpClientTransportOverUnixSockets(sockFile.toString()), null);
httpClient.start();
ContentResponse contentResponse = httpClient
- .newRequest( "http://localhost" )
+ .newRequest("http://localhost")
.send();
- log.debug( "response from server: {}", contentResponse.getContentAsString() );
+ log.debug("response from server: {}", contentResponse.getContentAsString());
- assertThat(contentResponse.getContentAsString(), containsString( "Hello World" ));
+ assertThat(contentResponse.getContentAsString(), containsString("Hello World"));
}
@Test
public void testNotLocal() throws Exception
- {
- httpClient = new HttpClient( new HttpClientTransportOverUnixSockets( sockFile.toString() ), null );
+ {
+ httpClient = new HttpClient(new HttpClientTransportOverUnixSockets(sockFile.toString()), null);
httpClient.start();
-
- ExecutionException e = assertThrows(ExecutionException.class, ()->{
- httpClient.newRequest( "http://google.com" ).send();
- });
+
+ ExecutionException e = assertThrows(ExecutionException.class, () -> httpClient.newRequest("http://google.com").send());
assertThat(e.getCause(), instanceOf(IOException.class));
- assertThat(e.getCause().getMessage(),containsString("UnixSocket cannot connect to google.com"));
+ assertThat(e.getCause().getMessage(), containsString("UnixSocket cannot connect to google.com"));
}
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ConnectionStatisticsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ConnectionStatisticsTest.java
index ef651ee12e59..16c5fd66bdc7 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ConnectionStatisticsTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ConnectionStatisticsTest.java
@@ -18,10 +18,6 @@
package org.eclipse.jetty.http.client;
-import static org.eclipse.jetty.http.client.Transport.H2C;
-import static org.eclipse.jetty.http.client.Transport.HTTP;
-import static org.hamcrest.MatcherAssert.assertThat;
-
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -44,6 +40,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.eclipse.jetty.http.client.Transport.H2C;
+import static org.eclipse.jetty.http.client.Transport.HTTP;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+
public class ConnectionStatisticsTest extends AbstractTest
{
@Override
@@ -73,7 +74,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
{
@Override
public void onOpened(Connection connection)
- {
+ {
}
@Override
@@ -82,7 +83,7 @@ public void onClosed(Connection connection)
closed.countDown();
}
};
-
+
ConnectionStatistics serverStats = new ConnectionStatistics();
scenario.connector.addBean(serverStats);
scenario.connector.addBean(closer);
@@ -93,20 +94,20 @@ public void onClosed(Connection connection)
scenario.client.addBean(closer);
clientStats.start();
- scenario.client.setIdleTimeout(1000);
+ long idleTimeout = 1000;
+ scenario.client.setIdleTimeout(idleTimeout);
byte[] content = new byte[3072];
long contentLength = content.length;
ContentResponse response = scenario.client.newRequest(scenario.newURI())
- .header(HttpHeader.CONNECTION,"close")
+ .header(HttpHeader.CONNECTION, "close")
.content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS)
.send();
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
+ assertTrue(closed.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
- closed.await();
-
assertThat(serverStats.getConnectionsMax(), Matchers.greaterThan(0L));
assertThat(serverStats.getReceivedBytes(), Matchers.greaterThan(contentLength));
assertThat(serverStats.getSentBytes(), Matchers.greaterThan(contentLength));
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java
index 082d7de6b9d4..28d59edf9db2 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java
@@ -327,7 +327,7 @@ public void test_Expect100Continue_WithContent_WithResponseFailure_Before100Cont
{
init(transport);
final long idleTimeout = 1000;
- scenario.start(new AbstractHandler()
+ scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
@@ -343,8 +343,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}
});
-
- scenario.client.setIdleTimeout(2 * idleTimeout);
+ scenario.startClient(httpClient -> httpClient.setIdleTimeout(2 * idleTimeout));
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
@@ -375,7 +374,7 @@ public void test_Expect100Continue_WithContent_WithResponseFailure_After100Conti
{
init(transport);
final long idleTimeout = 1000;
- scenario.start(new AbstractHandler()
+ scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
@@ -393,8 +392,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}
});
-
- scenario.client.setIdleTimeout(idleTimeout);
+ scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientIdleTimeoutTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientIdleTimeoutTest.java
index 3cde40558bf1..6ba09196945e 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientIdleTimeoutTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientIdleTimeoutTest.java
@@ -18,9 +18,6 @@
package org.eclipse.jetty.http.client;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,6 +34,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class HttpClientIdleTimeoutTest extends AbstractTest
{
private long idleTimeout = 1000;
@@ -52,7 +52,7 @@ public void init(Transport transport) throws IOException
public void testClientIdleTimeout(Transport transport) throws Exception
{
init(transport);
- scenario.start(new AbstractHandler()
+ scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
@@ -65,9 +65,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}
});
- scenario.client.stop();
- scenario.client.setIdleTimeout(idleTimeout);
- scenario.client.start();
+ scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
@@ -126,10 +124,8 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
public void testIdleClientIdleTimeout(Transport transport) throws Exception
{
init(transport);
- scenario.start(new EmptyServerHandler());
- scenario.client.stop();
- scenario.client.setIdleTimeout(idleTimeout);
- scenario.client.start();
+ scenario.startServer(new EmptyServerHandler());
+ scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
// Make a first request to open a connection.
ContentResponse response = scenario.client.newRequest(scenario.newURI()).send();
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java
index 97eeb3727e8c..fc5934c7aeb5 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java
@@ -18,15 +18,6 @@
package org.eclipse.jetty.http.client;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -63,6 +54,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
public class HttpClientTest extends AbstractTest
{
@Override
@@ -453,7 +453,9 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
public void testConnectionListener(Transport transport) throws Exception
{
init(transport);
- scenario.start(new EmptyServerHandler());
+ scenario.startServer(new EmptyServerHandler());
+ long idleTimeout = 1000;
+ scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
CountDownLatch openLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
@@ -472,9 +474,6 @@ public void onClosed(org.eclipse.jetty.io.Connection connection)
}
});
- long idleTimeout = 1000;
- scenario.client.setIdleTimeout(idleTimeout);
-
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS)
diff --git a/tests/test-http-client-transport/src/test/resources/jetty-logging.properties b/tests/test-http-client-transport/src/test/resources/jetty-logging.properties
index 1047ac8946d1..914cac87711b 100644
--- a/tests/test-http-client-transport/src/test/resources/jetty-logging.properties
+++ b/tests/test-http-client-transport/src/test/resources/jetty-logging.properties
@@ -2,6 +2,6 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG
-#org.eclipse.jetty.http2.hpack.LEVEL=INFO
+org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.client.LEVEL=DEBUG
#org.eclipse.jetty.io.LEVEL=DEBUG
From 32a6d837395d8f13773a357a30983a97ce6dcc23 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Fri, 18 Jan 2019 08:48:27 +0100
Subject: [PATCH 2/4] Issue #132 - ClientConnector abstraction.
Added name to default executor and scheduler after review.
Signed-off-by: Simone Bordet
---
.../main/java/org/eclipse/jetty/io/ClientConnector.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
index 5bb069124fb9..540c6eeecdab 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
@@ -167,9 +167,13 @@ public void setBindAddress(SocketAddress bindAddress)
protected void doStart() throws Exception
{
if (executor == null)
- setExecutor(new QueuedThreadPool());
+ {
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName(String.format("client-pool@%x", hashCode()));
+ setExecutor(clientThreads);
+ }
if (scheduler == null)
- setScheduler(new ScheduledExecutorScheduler());
+ setScheduler(new ScheduledExecutorScheduler(String.format("client-scheduler@%x", hashCode()), false));
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool());
if (sslContextFactory == null)
From e135de98fab9776d4c68f7a51a99896fbc33e70d Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Tue, 22 Jan 2019 17:33:29 +0100
Subject: [PATCH 3/4] Issue #132 - ClientConnector abstraction.
Rewrote HttpClientTransportOverUnixSockets in light of ClientConnector.
Signed-off-by: Simone Bordet
---
.../AbstractConnectorHttpClientTransport.java | 16 ++
.../http/HttpClientTransportOverHTTP.java | 15 --
.../http/HttpClientTransportOverFCGI.java | 14 --
.../org/eclipse/jetty/io/ClientConnector.java | 38 ++--
.../jetty/unixsocket/UnixSocketEndPoint.java | 15 +-
.../HttpClientTransportOverUnixSockets.java | 176 ++++++------------
.../jetty/unixsocket/UnixSocketTest.java | 4 +-
7 files changed, 103 insertions(+), 175 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
index 0c234daa160f..efa92c222fa8 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
@@ -18,12 +18,14 @@
package org.eclipse.jetty.client;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnector;
+import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@@ -75,4 +77,18 @@ public void connect(InetSocketAddress address, Map context)
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
+
+ @Override
+ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
+ {
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ @SuppressWarnings("unchecked")
+ Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
+ org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", connection);
+ return customize(connection, context);
+ }
+
+ protected abstract org.eclipse.jetty.io.Connection newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
index 9ef6bd7c49ef..52e0c350596e 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
@@ -18,9 +18,6 @@
package org.eclipse.jetty.client.http;
-import java.io.IOException;
-import java.util.Map;
-
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpDestination;
@@ -58,18 +55,6 @@ public HttpDestination newHttpDestination(Origin origin)
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
- @Override
- public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
- {
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- @SuppressWarnings("unchecked")
- Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
- HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
- if (LOG.isDebugEnabled())
- LOG.debug("Created {}", connection);
- return customize(connection, context);
- }
-
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
index 2c095ee2dab4..455b8db66cc4 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
@@ -18,8 +18,6 @@
package org.eclipse.jetty.fcgi.client.http;
-import java.util.Map;
-
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
@@ -76,18 +74,6 @@ public HttpDestination newHttpDestination(Origin origin)
return new HttpDestinationOverFCGI(getHttpClient(), origin);
}
- @Override
- public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context)
- {
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- @SuppressWarnings("unchecked")
- Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
- HttpConnectionOverFCGI connection = newHttpConnection(endPoint, destination, promise);
- if (LOG.isDebugEnabled())
- LOG.debug("Created {}", connection);
- return customize(connection, context);
- }
-
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
{
return new HttpConnectionOverFCGI(endPoint, destination, promise);
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
index 540c6eeecdab..78cd26cfaba4 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.io;
+import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
@@ -251,20 +252,8 @@ public void connect(SocketAddress address, Map context)
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
-
- try
- {
- if (channel != null)
- channel.close();
- }
- catch (IOException xx)
- {
- LOG.ignore(xx);
- }
- finally
- {
- connectFailed(x, context);
- }
+ safeClose(channel);
+ connectFailed(x, context);
}
}
@@ -273,7 +262,6 @@ public void accept(SocketChannel channel, Map context)
try
{
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
-
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
configure(channel);
@@ -284,17 +272,31 @@ public void accept(SocketChannel channel, Map context)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not accept {}", channel);
+ safeClose(channel);
Promise> promise = (Promise>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}
+ protected void safeClose(Closeable closeable)
+ {
+ try
+ {
+ if (closeable != null)
+ closeable.close();
+ }
+ catch (Throwable x)
+ {
+ LOG.ignore(x);
+ }
+ }
+
protected void configure(SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(true);
}
- private void connectFailed(Throwable failure, Map context)
+ protected void connectFailed(Throwable failure, Map context)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(SOCKET_ADDRESS_CONTEXT_KEY));
@@ -302,9 +304,9 @@ private void connectFailed(Throwable failure, Map context)
promise.failed(failure);
}
- private class ClientSelectorManager extends SelectorManager
+ protected class ClientSelectorManager extends SelectorManager
{
- private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
+ protected ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java
index 3556b73c3908..57af91a5110b 100644
--- a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java
+++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java
@@ -20,31 +20,25 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.io.ChannelEndPoint;
-import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
-import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
-import jnr.unixsocket.UnixSocketChannel;
-
public class UnixSocketEndPoint extends ChannelEndPoint
{
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
- private static final Logger CEPLOG = Log.getLogger(ChannelEndPoint.class);
-
private final UnixSocketChannel _channel;
-
+
public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
- super(channel,selector,key,scheduler);
- _channel=channel;
+ super(channel, selector, key, scheduler);
+ _channel = channel;
}
@Override
@@ -59,7 +53,6 @@ public InetSocketAddress getRemoteAddress()
return null;
}
-
@Override
protected void doShutdownOutput()
{
diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
index aa2c77463a82..be94307618e2 100644
--- a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
+++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
@@ -19,25 +19,28 @@
package org.eclipse.jetty.unixsocket.client;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
+import java.util.concurrent.Executor;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
-import org.eclipse.jetty.client.AbstractHttpClientTransport;
+import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
-import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
@@ -45,22 +48,21 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
// TODO: this class needs a thorough review.
-public class HttpClientTransportOverUnixSockets extends AbstractHttpClientTransport
+public class HttpClientTransportOverUnixSockets extends AbstractConnectorHttpClientTransport
{
private static final Logger LOG = Log.getLogger(HttpClientTransportOverUnixSockets.class);
- private String _unixSocket;
- private SelectorManager selectorManager;
-
- private UnixSocketChannel channel;
-
public HttpClientTransportOverUnixSockets(String unixSocket)
{
- if (unixSocket == null)
- throw new IllegalArgumentException("Unix socket file cannot be null");
- this._unixSocket = unixSocket;
+ this(new UnixSocketClientConnector(unixSocket));
+ }
+
+ private HttpClientTransportOverUnixSockets(ClientConnector connector)
+ {
+ super(connector);
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
@@ -69,136 +71,80 @@ public HttpClientTransportOverUnixSockets(String unixSocket)
});
}
- @Override
- protected void doStart() throws Exception
- {
- HttpClient httpClient = getHttpClient();
- selectorManager = new UnixSocketSelectorManager(httpClient, 1);
- selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
- addBean(selectorManager);
- super.doStart();
- }
-
- @Override
- protected void doStop() throws Exception
- {
- super.doStop();
- try
- {
- if (channel != null)
- channel.close();
- }
- catch (IOException xx)
- {
- LOG.ignore(xx);
- }
- }
-
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
- @Override
- public void connect(InetSocketAddress address, Map context)
- {
- try
- {
- InetAddress inet = address.getAddress();
- if (!inet.isLoopbackAddress() && !inet.isLinkLocalAddress() && !inet.isSiteLocalAddress())
- throw new IOException("UnixSocket cannot connect to " + address.getHostString());
-
- // Open a unix socket
- UnixSocketAddress unixAddress = new UnixSocketAddress(this._unixSocket);
- channel = UnixSocketChannel.open(unixAddress);
-
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- HttpClient client = destination.getHttpClient();
-
- configure(client, channel);
-
- channel.configureBlocking(false);
- selectorManager.accept(channel, context);
- }
- // Must catch all exceptions, since some like
- // UnresolvedAddressException are not IOExceptions.
- catch (Throwable x)
- {
- try
- {
- if (channel != null)
- channel.close();
- }
- catch (IOException xx)
- {
- LOG.ignore(xx);
- }
- finally
- {
- connectFailed(context, x);
- }
- }
- }
-
- @Override
- public Connection newConnection(EndPoint endPoint, Map context)
- {
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- @SuppressWarnings("unchecked")
- Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
- HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
- if (LOG.isDebugEnabled())
- LOG.debug("Created {}", connection);
- return customize(connection, context);
- }
-
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
}
- protected void configure(HttpClient client, SocketChannel channel) throws IOException
+ private static class UnixSocketClientConnector extends ClientConnector
{
- channel.socket().setTcpNoDelay(client.isTCPNoDelay());
- }
+ private final String unixSocket;
- public class UnixSocketSelectorManager extends SelectorManager
- {
- protected UnixSocketSelectorManager(HttpClient client, int selectors)
+ private UnixSocketClientConnector(String unixSocket)
{
- super(client.getExecutor(), client.getScheduler(), selectors);
+ this.unixSocket = unixSocket;
}
@Override
- protected Selector newSelector() throws IOException
+ protected SelectorManager newSelectorManager()
{
- return NativeSelectorProvider.getInstance().openSelector();
+ return new UnixSocketSelectorManager(getExecutor(), getScheduler(), getSelectors());
}
@Override
- protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
+ public void connect(SocketAddress address, Map context)
{
- UnixSocketEndPoint endPoint = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
- endPoint.setIdleTimeout(getHttpClient().getIdleTimeout());
- return endPoint;
+ InetSocketAddress socketAddress = (InetSocketAddress)address;
+ InetAddress inetAddress = socketAddress.getAddress();
+ if (inetAddress.isLoopbackAddress() || inetAddress.isLinkLocalAddress() || inetAddress.isSiteLocalAddress())
+ {
+ SocketChannel channel = null;
+ try
+ {
+ UnixSocketAddress unixAddress = new UnixSocketAddress(unixSocket);
+ channel = UnixSocketChannel.open(unixAddress);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {} for {}", channel, unixAddress);
+ accept(channel, context);
+ }
+ catch (Throwable x)
+ {
+ safeClose(channel);
+ connectFailed(x, context);
+ }
+ }
+ else
+ {
+ connectFailed(new ConnectException("UnixSocket cannot connect to " + socketAddress.getHostString()), context);
+ }
}
- @Override
- public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
+ private class UnixSocketSelectorManager extends ClientSelectorManager
{
- @SuppressWarnings("unchecked")
- Map context = (Map)attachment;
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- return destination.getClientConnectionFactory().newConnection(endPoint, context);
- }
+ private UnixSocketSelectorManager(Executor executor, Scheduler scheduler, int selectors)
+ {
+ super(executor, scheduler, selectors);
+ }
- @Override
- protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
- {
- @SuppressWarnings("unchecked")
- Map context = (Map)attachment;
- connectFailed(context, x);
+ @Override
+ protected Selector newSelector() throws IOException
+ {
+ return NativeSelectorProvider.getInstance().openSelector();
+ }
+
+ @Override
+ protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
+ {
+ UnixSocketEndPoint endPoint = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
+ endPoint.setIdleTimeout(getIdleTimeout().toMillis());
+ return endPoint;
+ }
}
}
}
diff --git a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java
index 3481fcb0daf3..b7b8dddf8c93 100644
--- a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java
+++ b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.net.ConnectException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -146,7 +147,6 @@ public void testNotLocal() throws Exception
httpClient.start();
ExecutionException e = assertThrows(ExecutionException.class, () -> httpClient.newRequest("http://google.com").send());
- assertThat(e.getCause(), instanceOf(IOException.class));
- assertThat(e.getCause().getMessage(), containsString("UnixSocket cannot connect to google.com"));
+ assertThat(e.getCause(), instanceOf(ConnectException.class));
}
}
From 20f438d43a2f1690a81560c27d0376d731600771 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Tue, 22 Jan 2019 17:44:06 +0100
Subject: [PATCH 4/4] Issue #132 - ClientConnector abstraction.
Reverted refactoring of newConnection() to avoid
to bind the class to a too specific abstract method.
Signed-off-by: Simone Bordet
---
.../AbstractConnectorHttpClientTransport.java | 16 ----------------
.../client/http/HttpClientTransportOverHTTP.java | 15 +++++++++++++++
.../client/http/HttpClientTransportOverFCGI.java | 15 +++++++++++++++
.../HttpClientTransportOverUnixSockets.java | 13 +++++++++++++
4 files changed, 43 insertions(+), 16 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
index efa92c222fa8..0c234daa160f 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java
@@ -18,14 +18,12 @@
package org.eclipse.jetty.client;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnector;
-import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@@ -77,18 +75,4 @@ public void connect(InetSocketAddress address, Map context)
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
-
- @Override
- public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
- {
- HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
- @SuppressWarnings("unchecked")
- Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
- org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
- if (LOG.isDebugEnabled())
- LOG.debug("Created {}", connection);
- return customize(connection, context);
- }
-
- protected abstract org.eclipse.jetty.io.Connection newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
index 52e0c350596e..ddfacc5f67a4 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
@@ -18,6 +18,9 @@
package org.eclipse.jetty.client.http;
+import java.io.IOException;
+import java.util.Map;
+
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpDestination;
@@ -55,6 +58,18 @@ public HttpDestination newHttpDestination(Origin origin)
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
+ @Override
+ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
+ {
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ @SuppressWarnings("unchecked")
+ Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
+ org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", connection);
+ return customize(connection, context);
+ }
+
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
index 455b8db66cc4..5a641c149998 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java
@@ -18,6 +18,9 @@
package org.eclipse.jetty.fcgi.client.http;
+import java.io.IOException;
+import java.util.Map;
+
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
@@ -74,6 +77,18 @@ public HttpDestination newHttpDestination(Origin origin)
return new HttpDestinationOverFCGI(getHttpClient(), origin);
}
+ @Override
+ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
+ {
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ @SuppressWarnings("unchecked")
+ Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
+ org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", connection);
+ return customize(connection, context);
+ }
+
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
{
return new HttpConnectionOverFCGI(endPoint, destination, promise);
diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
index be94307618e2..36ea14e3bae9 100644
--- a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
+++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java
@@ -38,6 +38,7 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.io.ClientConnector;
@@ -77,6 +78,18 @@ public HttpDestination newHttpDestination(Origin origin)
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
+ @Override
+ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException
+ {
+ HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
+ @SuppressWarnings("unchecked")
+ Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
+ org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", connection);
+ return customize(connection, context);
+ }
+
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);