Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #132 - ClientConnector abstraction. #3267

Merged
merged 4 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.NegotiatingClientConnection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class ALPNClientConnection extends NegotiatingClientConnection
{
private static final Logger LOG = Log.getLogger(ALPNClientConnection.class);

private final List<String> protocols;

public ALPNClientConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, SSLEngine sslEngine, Map<String, Object> context, List<String> protocols)
Expand All @@ -49,9 +45,9 @@ public List<String> getProtocols()

public void selected(String protocol)
{
if (protocol==null || !protocols.contains(protocol))
if (protocol == null || !protocols.contains(protocol))
close();
else
super.completed();
completed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.alpn.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -96,7 +95,7 @@ public ALPNClientConnectionFactory(Executor executor, ClientConnectionFactory co
}

@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
SSLEngine engine = (SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY);
for (Client processor : processors)
Expand Down
1 change: 1 addition & 0 deletions jetty-alpn/jetty-alpn-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<artifactId>jetty-alpn-client</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class JDK9HTTP2Client
{
public static void main(String[] args) throws Exception
{
HTTP2Client client = new HTTP2Client();
SslContextFactory sslContextFactory = new SslContextFactory();
client.addBean(sslContextFactory);
client.start();

String host = "webtide.com";
int port = 443;

FuturePromise<Session> sessionPromise = new FuturePromise<>();
client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
client.connect(client.getClientConnector().getSslContextFactory(), new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);

HttpFields requestFields = new HttpFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

package org.eclipse.jetty.alpn.java.server;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
Expand All @@ -31,7 +27,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand All @@ -45,8 +40,12 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

public class JDK9ALPNTest
{
private Server server;
Expand All @@ -66,6 +65,13 @@ public void startServer(Handler handler) throws Exception
server.start();
}

@AfterEach
public void stopServer() throws Exception
{
if (server != null)
server.stop();
}

private SslContextFactory newSslContextFactory()
{
SslContextFactory sslContextFactory = new SslContextFactory();
Expand All @@ -84,7 +90,7 @@ public void testClientNotSupportingALPNServerSpeaksDefaultProtocol() throws Exce
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
Expand Down Expand Up @@ -126,7 +132,7 @@ public void testClientSupportingALPNServerSpeaksNegotiatedProtocol() throws Exce
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
Expand Down Expand Up @@ -163,6 +169,5 @@ protected void doNonErrorHandle(String target, Request baseRequest, HttpServletR
break;
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,169 +18,61 @@

package org.eclipse.jetty.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Map;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
{
private final int selectors;
private SelectorManager selectorManager;
private final ClientConnector connector;

protected AbstractConnectorHttpClientTransport(int selectors)
protected AbstractConnectorHttpClientTransport(ClientConnector connector)
{
this.selectors = selectors;
this.connector = connector;
addBean(connector);
}

public ClientConnector getClientConnector()
{
return connector;
}

@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
return selectors;
return connector.getSelectors();
}

@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
selectorManager = newSelectorManager(httpClient);
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
addBean(selectorManager);
connector.setBindAddress(httpClient.getBindAddress());
connector.setByteBufferPool(httpClient.getByteBufferPool());
connector.setConnectBlocking(httpClient.isConnectBlocking());
connector.setConnectTimeout(Duration.ofMillis(httpClient.getConnectTimeout()));
connector.setExecutor(httpClient.getExecutor());
connector.setIdleTimeout(Duration.ofMillis(httpClient.getIdleTimeout()));
connector.setScheduler(httpClient.getScheduler());
connector.setSslContextFactory(httpClient.getSslContextFactory());
super.doStart();
}

@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}

@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(client, channel);

context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());

boolean connected = true;
if (client.isConnectBlocking())
{
channel.socket().connect(address, (int)client.getConnectTimeout());
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
if (connected)
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);

try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}

protected void connectFailed(Map<String, Object> context, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory());
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}

protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}

protected SelectorManager newSelectorManager(HttpClient client)
{
return new ClientSelectorManager(client, getSelectors());
}

protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;

protected ClientSelectorManager(HttpClient client, int selectors)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.client = client;
}

@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(client.getIdleTimeout());
return endp;
}

@Override
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}

@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
}
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.eclipse.jetty.client;

import java.util.Map;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -53,4 +57,13 @@ public void setConnectionPoolFactory(ConnectionPool.Factory factory)
{
this.factory = factory;
}

protected void connectFailed(Map<String, Object> context, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}
Loading