Skip to content

Commit b46250d

Browse files
authored
Update azure-core-http-vertx to work with Vert.x 5.0 (#45567)
Update azure-core-http-vertx to work with Vert.x 5.0
1 parent 1ec8fd0 commit b46250d

File tree

15 files changed

+150
-493
lines changed

15 files changed

+150
-493
lines changed

common/perf-test-core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
<include>com.beust:jcommander:[1.82]</include> <!-- {x-include-update;com.beust:jcommander;external_dependency} -->
5858

5959
<include>io.projectreactor:reactor-core:[3.4.41]</include> <!-- {x-include-update;io.projectreactor:reactor-core;external_dependency} -->
60-
<include>io.vertx:vertx-codegen:[4.5.13]</include> <!-- {x-include-update;io.vertx:vertx-codegen;external_dependency} -->
60+
<include>io.vertx:vertx-codegen:[4.5.15]</include> <!-- {x-include-update;io.vertx:vertx-codegen;external_dependency} -->
6161
</includes>
6262
</bannedDependencies>
6363
</rules>
@@ -95,7 +95,7 @@
9595
<dependency>
9696
<groupId>io.vertx</groupId>
9797
<artifactId>vertx-codegen</artifactId>
98-
<version>4.5.13</version> <!-- {x-version-update;io.vertx:vertx-codegen;external_dependency} -->
98+
<version>4.5.15</version> <!-- {x-version-update;io.vertx:vertx-codegen;external_dependency} -->
9999
<scope>provided</scope>
100100
</dependency>
101101
<dependency>

eng/versioning/external_dependencies.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ io.netty:netty-transport-native-unix-common;4.1.118.Final
6363
io.netty:netty-transport-native-kqueue;4.1.118.Final
6464
io.projectreactor.netty:reactor-netty-http;1.0.48
6565
io.projectreactor:reactor-core;3.4.41
66-
io.vertx:vertx-codegen;4.5.13
67-
io.vertx:vertx-core;4.5.13
66+
io.vertx:vertx-codegen;4.5.15
67+
io.vertx:vertx-core;4.5.15
6868
javax.websocket:javax.websocket-api;1.1
6969
org.apache.commons:commons-compress;1.26.0
7070
org.apache.ant:ant;1.10.14

sdk/core/azure-core-http-vertx/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
### Other Changes
1212

13+
- Few small changes to implementation to improve support for Vert.x 5.x.
14+
1315
## 1.0.3 (2025-03-10)
1416

1517
### Other Changes

sdk/core/azure-core-http-vertx/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,14 @@
8585
<dependency>
8686
<groupId>io.vertx</groupId>
8787
<artifactId>vertx-codegen</artifactId>
88-
<version>4.5.13</version> <!-- {x-version-update;io.vertx:vertx-codegen;external_dependency} -->
88+
<version>4.5.15</version> <!-- {x-version-update;io.vertx:vertx-codegen;external_dependency} -->
8989
<scope>provided</scope>
9090
</dependency>
9191

9292
<dependency>
9393
<groupId>io.vertx</groupId>
9494
<artifactId>vertx-core</artifactId>
95-
<version>4.5.13</version> <!-- {x-version-update;io.vertx:vertx-core;external_dependency} -->
95+
<version>4.5.15</version> <!-- {x-version-update;io.vertx:vertx-core;external_dependency} -->
9696
</dependency>
9797

9898
<!-- test dependencies on azure-core, because we want to run tests inherited from this module using Vert.x Web Client -->
@@ -175,8 +175,8 @@
175175
<rules>
176176
<bannedDependencies>
177177
<includes>
178-
<include>io.vertx:vertx-codegen:[4.5.13]</include> <!-- {x-include-update;io.vertx:vertx-codegen;external_dependency} -->
179-
<include>io.vertx:vertx-core:[4.5.13]</include> <!-- {x-include-update;io.vertx:vertx-core;external_dependency} -->
178+
<include>io.vertx:vertx-codegen:[4.5.15]</include> <!-- {x-include-update;io.vertx:vertx-codegen;external_dependency} -->
179+
<include>io.vertx:vertx-core:[4.5.15]</include> <!-- {x-include-update;io.vertx:vertx-core;external_dependency} -->
180180
</includes>
181181
</bannedDependencies>
182182
</rules>

sdk/core/azure-core-http-vertx/src/main/java/com/azure/core/http/vertx/VertxHttpClient.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import com.azure.core.util.Contexts;
2525
import com.azure.core.util.ProgressReporter;
2626
import com.azure.core.util.logging.ClientLogger;
27-
import io.netty.buffer.Unpooled;
2827
import io.vertx.core.Future;
2928
import io.vertx.core.Promise;
3029
import io.vertx.core.buffer.Buffer;
30+
import io.vertx.core.http.HttpClientOptions;
3131
import io.vertx.core.http.HttpClientRequest;
3232
import io.vertx.core.http.HttpClientResponse;
3333
import io.vertx.core.http.HttpMethod;
@@ -50,15 +50,17 @@ class VertxHttpClient implements HttpClient {
5050
private static final ClientLogger LOGGER = new ClientLogger(VertxHttpClient.class);
5151

5252
final io.vertx.core.http.HttpClient client;
53+
final HttpClientOptions buildOptions;
5354
private final Duration responseTimeout;
5455

5556
/**
5657
* Constructs a {@link VertxHttpClient}.
5758
*
5859
* @param client The Vert.x {@link io.vertx.core.http.HttpClient}
5960
*/
60-
VertxHttpClient(io.vertx.core.http.HttpClient client, Duration responseTimeout) {
61+
VertxHttpClient(io.vertx.core.http.HttpClient client, HttpClientOptions buildOptions, Duration responseTimeout) {
6162
this.client = Objects.requireNonNull(client, "client cannot be null");
63+
this.buildOptions = buildOptions;
6264
this.responseTimeout = responseTimeout;
6365
}
6466

@@ -117,7 +119,7 @@ private CompletableFuture<HttpResponse> sendInternal(HttpRequest request, Contex
117119

118120
// Create the Promise that will be returned. Promise.promise() is an uncompleted Promise.
119121
Promise<HttpResponse> promise = Promise.promise();
120-
client.request(options, requestResult -> {
122+
client.request(options).andThen(requestResult -> {
121123
if (requestResult.failed()) {
122124
promise.fail(requestResult.cause());
123125
return;
@@ -175,7 +177,6 @@ private CompletableFuture<HttpResponse> sendInternal(HttpRequest request, Contex
175177
return promise.future().toCompletionStage().toCompletableFuture();
176178
}
177179

178-
@SuppressWarnings("deprecation")
179180
private void sendBody(ContextView contextView, HttpRequest azureRequest, ProgressReporter progressReporter,
180181
HttpClientRequest vertxRequest, Promise<HttpResponse> promise) {
181182
BinaryData body = azureRequest.getBodyAsBinaryData();
@@ -188,14 +189,18 @@ private void sendBody(ContextView contextView, HttpRequest azureRequest, Progres
188189
|| bodyContent instanceof ByteBufferContent
189190
|| bodyContent instanceof StringContent
190191
|| bodyContent instanceof SerializableContent) {
192+
// This cannot produce a NullPointerException for the BinaryDataContent types that trigger this as they
193+
// are in-memory and have a length.
191194
long contentLength = bodyContent.getLength();
192-
vertxRequest.send(Buffer.buffer(Unpooled.wrappedBuffer(bodyContent.toByteBuffer())))
195+
vertxRequest.send(Buffer.buffer(bodyContent.toBytes()))
193196
.onSuccess(ignored -> reportProgress(contentLength, progressReporter))
194197
.onFailure(promise::fail);
195198
} else {
196199
// Right now both Flux<ByteBuffer> and InputStream bodies are being handled reactively.
197200
azureRequest.getBody()
198-
.subscribe(new VertxRequestWriteSubscriber(vertxRequest, promise, progressReporter, contextView));
201+
.subscribe(new VertxRequestWriteSubscriber(vertxRequest::exceptionHandler,
202+
vertxRequest::drainHandler, vertxRequest::write, vertxRequest::writeQueueFull,
203+
vertxRequest::reset, vertxRequest::end, promise, progressReporter, contextView));
199204
}
200205
}
201206
}

sdk/core/azure-core-http-vertx/src/main/java/com/azure/core/http/vertx/VertxHttpClientBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public HttpClient build() {
254254
}
255255

256256
io.vertx.core.http.HttpClient client = configuredVertx.createHttpClient(buildOptions);
257-
return new VertxHttpClient(client, getTimeout(this.responseTimeout, getDefaultResponseTimeout()));
257+
return new VertxHttpClient(client, buildOptions, getTimeout(this.responseTimeout, getDefaultResponseTimeout()));
258258
}
259259

260260
static Vertx getVertx(Iterator<VertxProvider> iterator) {
@@ -320,7 +320,7 @@ private static Runnable getVertxCloseRunnable(Vertx vertxToClose) {
320320
return () -> {
321321
CountDownLatch latch = new CountDownLatch(1);
322322
if (vertxToClose != null) {
323-
vertxToClose.close(event -> {
323+
vertxToClose.close().andThen(event -> {
324324
if (event.failed() && event.cause() != null) {
325325
LOGGER.logThrowableAsError(event.cause());
326326
}

sdk/core/azure-core-http-vertx/src/main/java/com/azure/core/http/vertx/implementation/VertxHttpResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private Flux<ByteBuffer> streamResponseBody() {
5353
public void close() {
5454
HttpClientResponse vertxHttpResponse = getVertxHttpResponse();
5555
if (vertxHttpResponse != null && !closed) {
56-
vertxHttpResponse.netSocket().close(ignored -> closed = true);
56+
vertxHttpResponse.netSocket().close().andThen(ignored -> closed = true);
5757
}
5858
}
5959
}

sdk/core/azure-core-http-vertx/src/main/java/com/azure/core/http/vertx/implementation/VertxRequestWriteSubscriber.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
package com.azure.core.http.vertx.implementation;
55

66
import com.azure.core.http.HttpResponse;
7+
import com.azure.core.util.FluxUtil;
78
import com.azure.core.util.ProgressReporter;
89
import com.azure.core.util.logging.ClientLogger;
9-
import io.netty.buffer.Unpooled;
10+
import io.vertx.core.Future;
11+
import io.vertx.core.Handler;
1012
import io.vertx.core.Promise;
1113
import io.vertx.core.buffer.Buffer;
1214
import io.vertx.core.http.HttpClientRequest;
@@ -18,6 +20,10 @@
1820
import reactor.util.context.ContextView;
1921

2022
import java.nio.ByteBuffer;
23+
import java.util.function.BiConsumer;
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
import java.util.function.Supplier;
2127

2228
/**
2329
* Subscriber that writes a stream of {@link ByteBuffer ByteBuffers} to a {@link HttpClientRequest Vert.x request}.
@@ -26,7 +32,10 @@
2632
public final class VertxRequestWriteSubscriber implements Subscriber<ByteBuffer> {
2733
private static final ClientLogger LOGGER = new ClientLogger(VertxRequestWriteSubscriber.class);
2834

29-
private final HttpClientRequest request;
35+
private final Function<Buffer, Future<Void>> writeHandler;
36+
private final Supplier<Boolean> isWriteQueueFull;
37+
private final BiConsumer<Long, Throwable> reset;
38+
private final Supplier<Future<Void>> end;
3039
private final Promise<HttpResponse> promise;
3140
private final ProgressReporter progressReporter;
3241
private final ContextView contextView;
@@ -41,14 +50,28 @@ public final class VertxRequestWriteSubscriber implements Subscriber<ByteBuffer>
4150
* Creates a new {@link VertxRequestWriteSubscriber} that writes a stream of {@link ByteBuffer ByteBuffers} to a
4251
* {@link HttpClientRequest Vert.x request}.
4352
*
44-
* @param request The {@link HttpClientRequest Vert.x request} to write to.
53+
* @param exceptionHandlerUpdater A {@link Handler} that updates the
54+
* {@link HttpClientRequest#exceptionHandler(Handler)} to be {@link #onError(Throwable)}.
55+
* @param drainHandlerUpdater A {@link Handler} that updates the {@link HttpClientRequest#drainHandler(Handler)}
56+
* to be {@link #requestNext()}.
57+
* @param writeHandler A {@link Function} that will call {@link HttpClientRequest}.
58+
* @param isWriteQueueFull A {@link Supplier} that will call {@link HttpClientRequest#writeQueueFull()}.
59+
* @param reset A {@link BiConsumer} that will call {@link HttpClientRequest#reset(long, Throwable)}.
60+
* @param end A {@link Supplier} that will call {@link HttpClientRequest#end()}.
4561
* @param promise The {@link MonoSink} to emit the {@link HttpResponse response} to.
4662
* @param progressReporter The {@link ProgressReporter} to report progress to.
4763
* @param contextView The {@link ContextView} to use when dropping errors.
4864
*/
49-
public VertxRequestWriteSubscriber(HttpClientRequest request, Promise<HttpResponse> promise,
50-
ProgressReporter progressReporter, ContextView contextView) {
51-
this.request = request.exceptionHandler(this::onError).drainHandler(ignored -> requestNext());
65+
public VertxRequestWriteSubscriber(Consumer<Handler<Throwable>> exceptionHandlerUpdater,
66+
Consumer<Handler<Void>> drainHandlerUpdater, Function<Buffer, Future<Void>> writeHandler,
67+
Supplier<Boolean> isWriteQueueFull, BiConsumer<Long, Throwable> reset, Supplier<Future<Void>> end,
68+
Promise<HttpResponse> promise, ProgressReporter progressReporter, ContextView contextView) {
69+
exceptionHandlerUpdater.accept(this::onError);
70+
drainHandlerUpdater.accept(ignored -> requestNext());
71+
this.writeHandler = writeHandler;
72+
this.isWriteQueueFull = isWriteQueueFull;
73+
this.reset = reset;
74+
this.end = end;
5275
this.promise = promise;
5376
this.progressReporter = progressReporter;
5477
this.contextView = contextView;
@@ -81,25 +104,24 @@ public void onNext(ByteBuffer bytes) {
81104
}
82105
}
83106

84-
@SuppressWarnings("deprecation")
85107
private void write(ByteBuffer bytes) {
86-
int remaining = bytes.remaining();
87-
request.write(Buffer.buffer(Unpooled.wrappedBuffer(bytes)), result -> {
108+
final int remaining = bytes.remaining();
109+
writeHandler.apply(Buffer.buffer(FluxUtil.byteBufferToArray(bytes))).onComplete(result -> {
88110
State state = this.state;
89111
if (state == State.WRITING) {
90112
this.state = State.UNINITIALIZED;
91113
}
92114

93115
if (result.succeeded()) {
94-
if (progressReporter != null) {
95-
progressReporter.reportProgress(remaining);
96-
}
97116
if (state == State.WRITING) {
98-
if (!request.writeQueueFull()) {
117+
if (remaining > 0 && progressReporter != null) {
118+
progressReporter.reportProgress(remaining);
119+
}
120+
if (!isWriteQueueFull.get()) {
99121
requestNext();
100122
}
101123
} else if (state == State.COMPLETE) {
102-
endRequest();
124+
endRequest(remaining);
103125
} else if (state == State.ERROR) {
104126
resetRequest(error);
105127
}
@@ -177,7 +199,7 @@ private void resetRequest(Throwable throwable) {
177199
+ "completed successfully.", throwable);
178200
}
179201
}
180-
request.reset(0, throwable);
202+
reset.accept(0L, throwable);
181203
}
182204

183205
@Override
@@ -192,12 +214,15 @@ public void onComplete() {
192214

193215
this.state = State.COMPLETE;
194216
if (state != State.WRITING) {
195-
endRequest();
217+
endRequest(0);
196218
}
197219
}
198220

199-
private void endRequest() {
200-
request.end().onFailure(promise::fail);
221+
private void endRequest(int finishingWriteSize) {
222+
if (finishingWriteSize > 0 && progressReporter != null) {
223+
progressReporter.reportProgress(finishingWriteSize);
224+
}
225+
end.get().onFailure(promise::fail);
201226
}
202227

203228
private enum State {

0 commit comments

Comments
 (0)