Skip to content

Commit

Permalink
Promote WebClient interceptor to public API.
Browse files Browse the repository at this point in the history
Motivation:

WebClient interceptor API is still internal and has proven to be stable

Changes:

Move WebClient interceptor to public API.
  • Loading branch information
vietj committed Nov 19, 2024
1 parent c425122 commit 21b253e
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.ext.auth.User;
import io.vertx.ext.auth.authentication.Credentials;
import io.vertx.ext.auth.oauth2.OAuth2Auth;
import io.vertx.ext.web.client.impl.HttpContext;
import io.vertx.ext.web.client.impl.Oauth2WebClientAware;

/**
Expand Down Expand Up @@ -64,6 +66,10 @@ static OAuth2WebClient create(WebClient webClient, OAuth2Auth oAuth2Auth, OAuth2
@GenIgnore(GenIgnore.PERMITTED_TYPE)
OAuth2WebClient withCredentials(Credentials credentials);

@GenIgnore
@Override
OAuth2WebClient addInterceptor(Handler<HttpContext<?>> interceptor);

/**
* Get the authenticated user (if any) that is associated with this client.
* @return the current user associated with this client or null if no user is associated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package io.vertx.ext.web.client;

import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
Expand All @@ -26,6 +28,7 @@
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.net.ClientSSLOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.client.impl.HttpContext;
import io.vertx.ext.web.client.impl.WebClientBase;
import io.vertx.uritemplate.UriTemplate;

Expand Down Expand Up @@ -775,6 +778,22 @@ default HttpRequest<Buffer> headAbs(UriTemplate absoluteURI) {
return requestAbs(HttpMethod.HEAD, absoluteURI);
}

/**
* Add interceptor in the chain.
* <p/>
* The interceptor can maintain per request state with {@link HttpContext#get(String)}/{@link HttpContext#set(String, Object)}.
* <p/>
* A request/response can be processed several times (in case of retry) and thus they should use the per request state
* to ensure an operation is not done twice.
* <p/>
* This API is internal.
*
* @param interceptor the interceptor to add, must not be null
* @return a reference to this, so the API can be used fluently
*/
@GenIgnore
WebClient addInterceptor(Handler<HttpContext<?>> interceptor);

/**
* <p>Update the client with new SSL {@code options}, the update happens if the options object is valid and different
* from the existing options object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.ext.web.client.impl.HttpContext;
import io.vertx.ext.web.client.impl.WebClientSessionAware;
import io.vertx.ext.web.client.spi.CookieStore;

Expand Down Expand Up @@ -135,4 +137,8 @@ static WebClientSession create(WebClient webClient, CookieStore cookieStore) {
*/
@GenIgnore(PERMITTED_TYPE)
CookieStore cookieStore();

@GenIgnore
@Override
WebClientSession addInterceptor(Handler<HttpContext<?>> interceptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.vertx.ext.web.client.OAuth2WebClient;
import io.vertx.ext.web.client.OAuth2WebClientOptions;

public class Oauth2WebClientAware extends WebClientBase implements OAuth2WebClient {
public class Oauth2WebClientAware extends WebClientBase<Oauth2WebClientAware> implements OAuth2WebClient {

private final OAuth2Auth oauth2Auth;
private final OAuth2WebClientOptions option;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class WebClientBase implements WebClientInternal {
public class WebClientBase<C extends WebClientBase<C>> implements WebClientInternal {

final HttpClient client;
final WebClientOptions options;
Expand Down Expand Up @@ -152,13 +152,13 @@ public HttpRequest<Buffer> requestAbs(HttpMethod method, SocketAddress serverAdd
}

@Override
public WebClientInternal addInterceptor(Handler<HttpContext<?>> interceptor) {
public C addInterceptor(Handler<HttpContext<?>> interceptor) {
// If a web client is constructed using another client, interceptors could get added twice.
if (interceptors.stream().anyMatch(i -> i.getClass() == interceptor.getClass())) {
throw new IllegalStateException(String.format("Client already contains a %s interceptor", interceptor.getClass()));
}
interceptors.add(interceptor);
return this;
return (C) this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,6 @@ public interface WebClientInternal extends WebClient {

<T> HttpContext<T> createContext(ContextInternal context);

/**
* Add interceptor in the chain.
* <p/>
* The interceptor can maintain per request state with {@link HttpContext#get(String)}/{@link HttpContext#set(String, Object)}.
* <p/>
* A request/response can be processed several times (in case of retry) and thus they should use the per request state
* to ensure an operation is not done twice.
* <p/>
* This API is internal.
*
* @param interceptor the interceptor to add, must not be null
* @return a reference to this, so the API can be used fluently
*/
@Override
WebClientInternal addInterceptor(Handler<HttpContext<?>> interceptor);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* @author <a href="mailto:tommaso.nolli@gmail.com">Tommaso Nolli</a>
*/
public class WebClientSessionAware extends WebClientBase implements WebClientSession {
public class WebClientSessionAware extends WebClientBase<WebClientSessionAware> implements WebClientSession {

private final CookieStore cookieStore;
private final CacheStore cacheStore;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package io.vertx.ext.web.client.tests;

import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.test.tls.Cert;
import io.vertx.test.tls.Trust;

import java.util.*;
import java.util.Timer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static io.vertx.core.Future.all;
import static io.vertx.core.Future.await;
import static java.util.concurrent.ThreadLocalRandom.current;
import static java.util.concurrent.TimeUnit.SECONDS;

public class VerticleUndeployTest {

private static final Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));

final static Buffer b_12000 = Buffer.buffer();
final static Buffer b_1024 = Buffer.buffer();

static {
for (int i = 0;i < 12_000;i++) {
b_12000.appendByte((byte) 65);
}
for (int i = 0;i < 1_024;i++) {
b_1024.appendByte((byte) 65);
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
vertx.exceptionHandler(Throwable::printStackTrace);
startServer();

//setup scanner, 1 deploys verticles, 2 undeploys
Timer timer = new Timer();
TimerTask timerTask = null;
Scanner scanner = new Scanner(System.in);
while (!Thread.currentThread().isInterrupted()) {
try {
switch (scanner.nextInt()) {
case 1:
startAll();
break;
case 2:
stopAll();
break;
case 3: {
if(timerTask == null) {
timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("Test starting");
startAll();
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1250, 5000));
stopAll();
Thread.sleep(5000);
} catch (Exception e) {
System.out.println("Test failed: " + e.getMessage());
}
}
};
timer.schedule(timerTask, 0, 10_000);
} else {
System.out.println("Stopping test");
timerTask.cancel();
timerTask = null;
}
}
}
} catch (Exception e) {
//ignore
}
}
}

public static void startAll() {
for (int i = 0; i < 1000; i++) {
vertx.deployVerticle(new TestHttpVerticle(), new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)).await();
}
}

public static void stopAll() throws ExecutionException, InterruptedException {
List<Future<Void>> futs = vertx.deploymentIDs().stream().map(vertx::undeploy).collect(Collectors.toList());
all(futs).await();
}

private static void startServer() throws ExecutionException, InterruptedException {
vertx.createHttpServer(createHttp2ServerOptions(8080, "0.0.0.0"))
.requestHandler(req -> {
req.body().onComplete(ar -> {
// System.out.println("Request received "+req.connection() + " "+ar.result().length());
req.response().end(b_1024);
});
})
.listen()
.toCompletionStage()
.toCompletableFuture()
.get();
}

private static class TestHttpVerticle extends AbstractVerticle {
private HttpClient client;
private boolean isStopped;

@Override
public void start() throws Exception {
this.client = this.vertx.createHttpClient(createHttp2ClientOptions());
this.context.runOnContext(v -> loopAwait());
}

private void loopAwait() {
while (!this.isStopped) {
try {
HttpClientRequest req = await(this.client.request(HttpMethod.POST, 8080, "localhost", "/"));
req.idleTimeout(10_000);
await(req.end(b_12000));
HttpClientResponse resp = await(req.response());
Buffer body = await(resp.body());
sleep();
} catch (Throwable e) {
String msg = e.getMessage() == null ? e.getClass().getName() : e.getMessage();
// System.out.println("Failed to get response " + msg);
}
}
}

private void sleep() {
Promise<Void> p = Promise.promise();
this.vertx.setTimer(current().nextInt(400, 6000), x -> p.tryComplete());
await(p.future());
}

@Override
public void stop(Promise<Void> stopPromise) throws Exception {
this.isStopped = true;
this.client.close().onComplete(stopPromise);
}
}

private static class TestWebClientVerticle extends AbstractVerticle {
private WebClient client;
private boolean isStopped;

@Override
public void start() throws Exception {
this.client = WebClient.create(this.vertx, new WebClientOptions(createHttp2ClientOptions()).setShared(false));
this.context.runOnContext(v -> loopAwait());
}

private void loop() {
try {
this.client.getAbs("https://localhost:8080").timeout(10_000).sendBuffer(b_12000).onComplete(ar -> {
if (!ar.succeeded()) {
System.out.println("Failed to get response " + ar.cause().getMessage());
}
this.vertx.setTimer(current().nextInt(400, 6000), x -> loop());
});
} catch (Exception e) {
System.out.println("Failed to loop request: "+ e.getMessage());
}
}

private void loopAwait() {
while (!this.isStopped) {
try {
await(this.client.postAbs("https://localhost:8080").timeout(10_000).sendBuffer(b_12000));
sleep();
} catch (Exception e) {
String msg = e.getMessage() == null ? e.getClass().getName() : e.getMessage();
// System.out.println("Failed to get response " + msg);
}
}
}

private void sleep() {
Promise<Void> p = Promise.promise();
this.vertx.setTimer(current().nextInt(400, 6000), x -> p.tryComplete());
await(p.future());
}

@Override
public void stop() throws Exception {
this.isStopped = true;
Optional.of(this.client).ifPresent(WebClient::close);
// System.out.println("Verticle stopped");
}
}

public static HttpServerOptions createHttp2ServerOptions(int port, String host) {
return (new HttpServerOptions()).setPort(port).setHost(host).setSslEngineOptions(new JdkSSLEngineOptions()).setUseAlpn(true).setSsl(true).addEnabledCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA").setKeyCertOptions((KeyCertOptions) Cert.SERVER_JKS.get());
}

public static HttpClientOptions createHttp2ClientOptions() {
return (new HttpClientOptions()).setSslEngineOptions(new JdkSSLEngineOptions()).setUseAlpn(true).setSsl(true).setTrustOptions((TrustOptions) Trust.SERVER_JKS.get()).setProtocolVersion(HttpVersion.HTTP_2);
}
}

0 comments on commit 21b253e

Please sign in to comment.