Skip to content

Commit

Permalink
Fixes #11031 - HttpClient should expose Connection/EndPoint used by H… (
Browse files Browse the repository at this point in the history
#11033)

Introduced method Request.getConnection() to expose the Connection after at the request begin event.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored Dec 13, 2023
1 parent fcf50ff commit efda646
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.AutoLock;
Expand Down Expand Up @@ -53,7 +54,7 @@ public boolean associate(HttpExchange exchange)
{
boolean result = false;
boolean abort = true;
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (_exchange == null)
{
Expand All @@ -64,12 +65,14 @@ public boolean associate(HttpExchange exchange)
}
}

HttpRequest request = exchange.getRequest();
if (abort)
{
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
request.abort(new UnsupportedOperationException("Pipelined requests not supported"));
}
else
{
request.setConnection(getConnection());
if (LOG.isDebugEnabled())
LOG.debug("{} associated {} to {}", exchange, result, this);
}
Expand All @@ -80,7 +83,7 @@ public boolean associate(HttpExchange exchange)
public boolean disassociate(HttpExchange exchange)
{
boolean result = false;
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
HttpExchange existing = _exchange;
_exchange = null;
Expand All @@ -98,12 +101,14 @@ public boolean disassociate(HttpExchange exchange)

public HttpExchange getHttpExchange()
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
return _exchange;
}
}

protected abstract Connection getConnection();

@Override
public long getExpireNanoTime()
{
Expand Down
13 changes: 13 additions & 0 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -277,6 +278,18 @@ private ProxyConnection(Destination destination, Connection connection, Promise<
this.promise = promise;
}

@Override
public SocketAddress getLocalSocketAddress()
{
return connection.getLocalSocketAddress();
}

@Override
public SocketAddress getRemoteSocketAddress()
{
return connection.getRemoteSocketAddress();
}

@Override
public void send(Request request, Response.CompleteListener listener)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.function.LongConsumer;
import java.util.function.Supplier;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class HttpRequest implements Request
private final AtomicReference<Throwable> aborted = new AtomicReference<>();
private final HttpClient client;
private final HttpConversation conversation;
private Connection connection;
private String scheme;
private String host;
private int port;
Expand Down Expand Up @@ -162,6 +164,17 @@ public HttpConversation getConversation()
return conversation;
}

@Override
public Connection getConnection()
{
return connection;
}

void setConnection(Connection connection)
{
this.connection = connection;
}

@Override
public String getScheme()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.client.api;

import java.io.Closeable;
import java.net.SocketAddress;

import org.eclipse.jetty.util.Promise;

Expand Down Expand Up @@ -46,4 +47,20 @@ public interface Connection extends Closeable
* @see #close()
*/
boolean isClosed();

/**
* @return the local socket address associated with the connection

Check warning on line 52 in jetty-client/src/main/java/org/eclipse/jetty/client/api/Connection.java

View check run for this annotation

Webtide Jenkins / Static Analysis jdk17

MissingSummary

NORMAL: A summary fragment is required; consider using the value of the @return block as a summary fragment instead.
Raw output
Did you mean: <pre><code>*Returns the local socket address associated with the connection.</code></pre><p><a href="https://google.github.io/styleguide/javaguide.html#s7.2-summary-fragment">See ErrorProne documentation.</a></p>
*/
default SocketAddress getLocalSocketAddress()
{
return null;
}

/**
* @return the remote socket address associated with the connection

Check warning on line 60 in jetty-client/src/main/java/org/eclipse/jetty/client/api/Connection.java

View check run for this annotation

Webtide Jenkins / Static Analysis jdk17

MissingSummary

NORMAL: A summary fragment is required; consider using the value of the @return block as a summary fragment instead.
Raw output
Did you mean: <pre><code>*Returns the remote socket address associated with the connection.</code></pre><p><a href="https://google.github.io/styleguide/javaguide.html#s7.2-summary-fragment">See ErrorProne documentation.</a></p>
*/
default SocketAddress getRemoteSocketAddress()
{
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@
*/
public interface Request
{
/**
* <p>Returns the connection associated with this request.</p>
* <p>The connection is available only starting from the
* {@link #onRequestBegin(BeginListener) request begin} event,
* when a connection is associated with the request to be sent,
* otherwise {@code null} is returned.</p>
*
* @return the connection associated with this request,
* or {@code null} if there is no connection associated
* with this request
*/
default Connection getConnection()
{
return null;
}

/**
* @return the URI scheme of this request, such as "http" or "https"

Check warning on line 70 in jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java

View check run for this annotation

Webtide Jenkins / Static Analysis jdk17

MissingSummary

NORMAL: A summary fragment is required; consider using the value of the @return block as a summary fragment instead.
Raw output
Did you mean: <pre><code>*Returns the URI scheme of this request, such as &quot;http&quot; or &quot;https&quot;.</code></pre><p><a href="https://google.github.io/styleguide/javaguide.html#s7.2-summary-fragment">See ErrorProne documentation.</a></p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
Expand Down Expand Up @@ -55,6 +56,12 @@ protected HttpReceiverOverHTTP newHttpReceiver()
return new HttpReceiverOverHTTP(this);
}

@Override
protected Connection getConnection()
{
return connection;
}

@Override
protected HttpSenderOverHTTP getHttpSender()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.client.http;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
Expand Down Expand Up @@ -100,6 +101,18 @@ public HttpDestination getHttpDestination()
return delegate.getHttpDestination();
}

@Override
public SocketAddress getLocalSocketAddress()
{
return delegate.getLocalSocketAddress();
}

@Override
public SocketAddress getRemoteSocketAddress()
{
return delegate.getRemoteSocketAddress();
}

@Override
public long getBytesIn()
{
Expand Down Expand Up @@ -285,6 +298,18 @@ protected Iterator<HttpChannel> getHttpChannels()
return Collections.<HttpChannel>singleton(channel).iterator();
}

@Override
public SocketAddress getLocalSocketAddress()
{
return getEndPoint().getLocalSocketAddress();
}

@Override
public SocketAddress getRemoteSocketAddress()
{
return getEndPoint().getRemoteSocketAddress();
}

@Override
public SendFailure send(HttpExchange exchange)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.AsyncRequestContent;
Expand Down Expand Up @@ -68,26 +69,16 @@ public void testFailureBeforeRequestCommit() throws Exception
{
startServer(new EmptyServerHandler());

final AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpConnectionOverHTTP connection = (HttpConnectionOverHTTP)super.newConnection(endPoint, context);
connectionRef.set(connection);
return connection;
}
});
client = new HttpClient(new HttpClientTransportOverHTTP(1));
client.start();

assertThrows(ExecutionException.class, () ->
client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(request -> connectionRef.get().getEndPoint().close())
.timeout(5, TimeUnit.SECONDS)
.send());
Request request = client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(r -> r.getConnection().close())
.timeout(5, TimeUnit.SECONDS);
assertThrows(ExecutionException.class, request::send);

DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
assertEquals(0, connectionPool.getActiveConnections().size());
assertEquals(0, connectionPool.getIdleConnections().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.fcgi.generator.Generator;
Expand All @@ -43,7 +44,7 @@ public class HttpChannelOverFCGI extends HttpChannel
private int request;
private HttpVersion version;

public HttpChannelOverFCGI(final HttpConnectionOverFCGI connection, Flusher flusher, long idleTimeout)
public HttpChannelOverFCGI(HttpConnectionOverFCGI connection, Flusher flusher, long idleTimeout)
{
super(connection.getHttpDestination());
this.connection = connection;
Expand All @@ -63,6 +64,12 @@ void setRequest(int request)
this.request = request;
}

@Override
protected Connection getConnection()
{
return connection;
}

@Override
protected HttpSender getHttpSender()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.fcgi.client.http;

import java.io.EOFException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
Expand Down Expand Up @@ -87,6 +88,18 @@ public HttpDestination getHttpDestination()
return destination;
}

@Override
public SocketAddress getLocalSocketAddress()
{
return delegate.getLocalSocketAddress();
}

@Override
public SocketAddress getRemoteSocketAddress()
{
return delegate.getRemoteSocketAddress();
}

protected Flusher getFlusher()
{
return flusher;
Expand Down Expand Up @@ -319,7 +332,7 @@ private void failAndClose(Throwable failure)

private int acquireRequest()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
int last = requests.getLast();
int request = last + 1;
Expand All @@ -330,7 +343,7 @@ private int acquireRequest()

private void releaseRequest(int request)
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
requests.removeFirstOccurrence(request);
}
Expand Down Expand Up @@ -373,6 +386,18 @@ protected Iterator<HttpChannel> getHttpChannels()
return channel == null ? Collections.emptyIterator() : Collections.singleton(channel).iterator();
}

@Override
public SocketAddress getLocalSocketAddress()
{
return getEndPoint().getLocalSocketAddress();
}

@Override
public SocketAddress getRemoteSocketAddress()
{
return getEndPoint().getRemoteSocketAddress();
}

@Override
public SendFailure send(HttpExchange exchange)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Channel;
Expand Down Expand Up @@ -67,6 +68,12 @@ public Stream.Listener getStreamListener()
return listener;
}

@Override
protected Connection getConnection()
{
return connection;
}

@Override
protected HttpSender getHttpSender()
{
Expand Down
Loading

0 comments on commit efda646

Please sign in to comment.