Skip to content

Commit

Permalink
Issue #132 - ClientConnector abstraction.
Browse files Browse the repository at this point in the history
Introduced ClientConnector and refactored HttpClient transports,
removing duplicated code that was connect() to a remote host.

Refactored also HTTP2Client to reference ClientConnector.

Refactored tests accordingly to the changes introduced in the
implementations.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jan 17, 2019
1 parent 2f6d0b3 commit 6c4ee08
Show file tree
Hide file tree
Showing 36 changed files with 789 additions and 764 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.NegotiatingClientConnection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class ALPNClientConnection extends NegotiatingClientConnection
{
private static final Logger LOG = Log.getLogger(ALPNClientConnection.class);

private final List<String> protocols;

public ALPNClientConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, SSLEngine sslEngine, Map<String, Object> context, List<String> protocols)
Expand All @@ -49,9 +45,9 @@ public List<String> getProtocols()

public void selected(String protocol)
{
if (protocol==null || !protocols.contains(protocol))
if (protocol == null || !protocols.contains(protocol))
close();
else
super.completed();
completed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.alpn.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -96,7 +95,7 @@ public ALPNClientConnectionFactory(Executor executor, ClientConnectionFactory co
}

@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
SSLEngine engine = (SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY);
for (Client processor : processors)
Expand Down
1 change: 1 addition & 0 deletions jetty-alpn/jetty-alpn-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<artifactId>jetty-alpn-client</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class JDK9HTTP2Client
{
public static void main(String[] args) throws Exception
{
HTTP2Client client = new HTTP2Client();
SslContextFactory sslContextFactory = new SslContextFactory();
client.addBean(sslContextFactory);
client.start();

String host = "webtide.com";
int port = 443;

FuturePromise<Session> sessionPromise = new FuturePromise<>();
client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
client.connect(client.getClientConnector().getSslContextFactory(), new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);

HttpFields requestFields = new HttpFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

package org.eclipse.jetty.alpn.java.server;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
Expand All @@ -31,7 +27,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand All @@ -45,8 +40,12 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

public class JDK9ALPNTest
{
private Server server;
Expand All @@ -66,6 +65,13 @@ public void startServer(Handler handler) throws Exception
server.start();
}

@AfterEach
public void stopServer() throws Exception
{
if (server != null)
server.stop();
}

private SslContextFactory newSslContextFactory()
{
SslContextFactory sslContextFactory = new SslContextFactory();
Expand All @@ -84,7 +90,7 @@ public void testClientNotSupportingALPNServerSpeaksDefaultProtocol() throws Exce
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
Expand Down Expand Up @@ -126,7 +132,7 @@ public void testClientSupportingALPNServerSpeaksNegotiatedProtocol() throws Exce
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
Expand Down Expand Up @@ -163,6 +169,5 @@ protected void doNonErrorHandle(String target, Request baseRequest, HttpServletR
break;
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,169 +18,61 @@

package org.eclipse.jetty.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Map;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
{
private final int selectors;
private SelectorManager selectorManager;
private final ClientConnector connector;

protected AbstractConnectorHttpClientTransport(int selectors)
protected AbstractConnectorHttpClientTransport(ClientConnector connector)
{
this.selectors = selectors;
this.connector = connector;
addBean(connector);
}

public ClientConnector getClientConnector()
{
return connector;
}

@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
return selectors;
return connector.getSelectors();
}

@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
selectorManager = newSelectorManager(httpClient);
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
addBean(selectorManager);
connector.setBindAddress(httpClient.getBindAddress());
connector.setByteBufferPool(httpClient.getByteBufferPool());
connector.setConnectBlocking(httpClient.isConnectBlocking());
connector.setConnectTimeout(Duration.ofMillis(httpClient.getConnectTimeout()));
connector.setExecutor(httpClient.getExecutor());
connector.setIdleTimeout(Duration.ofMillis(httpClient.getIdleTimeout()));
connector.setScheduler(httpClient.getScheduler());
connector.setSslContextFactory(httpClient.getSslContextFactory());
super.doStart();
}

@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}

@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(client, channel);

context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());

boolean connected = true;
if (client.isConnectBlocking())
{
channel.socket().connect(address, (int)client.getConnectTimeout());
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
if (connected)
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);

try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}

protected void connectFailed(Map<String, Object> context, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory());
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}

protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}

protected SelectorManager newSelectorManager(HttpClient client)
{
return new ClientSelectorManager(client, getSelectors());
}

protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;

protected ClientSelectorManager(HttpClient client, int selectors)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.client = client;
}

@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(client.getIdleTimeout());
return endp;
}

@Override
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}

@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
}
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.eclipse.jetty.client;

import java.util.Map;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -53,4 +57,13 @@ public void setConnectionPoolFactory(ConnectionPool.Factory factory)
{
this.factory = factory;
}

protected void connectFailed(Map<String, Object> context, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}
Loading

0 comments on commit 6c4ee08

Please sign in to comment.