Skip to content

Commit

Permalink
[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeco…
Browse files Browse the repository at this point in the history
…nds to PulsarAdminBuilder (apache#22541)

(cherry picked from commit 3e7dbb4)
(cherry picked from commit 4c480fd)
lhotari authored and srinath-ctds committed Aug 12, 2024
1 parent 5149aef commit 62fbcc9
Showing 24 changed files with 687 additions and 230 deletions.
20 changes: 11 additions & 9 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
@@ -499,6 +499,8 @@ The Apache Software License, Version 2.0
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-1.2.0.jar
* Spotify completable-futures
- com.spotify-completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
@@ -543,15 +545,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
- org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
* Jersey
- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
- org.glassfish.jersey.core-jersey-client-2.41.jar
- org.glassfish.jersey.core-jersey-common-2.41.jar
- org.glassfish.jersey.core-jersey-server-2.41.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
- org.glassfish.jersey.inject-jersey-hk2-2.41.jar
- org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
- org.glassfish.jersey.core-jersey-client-2.42.jar
- org.glassfish.jersey.core-jersey-common-2.42.jar
- org.glassfish.jersey.core-jersey-server-2.42.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
- org.glassfish.jersey.inject-jersey-hk2-2.42.jar
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
13 changes: 7 additions & 6 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
@@ -409,6 +409,7 @@ The Apache Software License, Version 2.0
* Apache Avro
- avro-1.11.3.jar
- avro-protobuf-1.11.3.jar
* Spotify completable-futures -- completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
@@ -438,12 +439,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- aopalliance-repackaged-2.6.1.jar
- osgi-resource-locator-1.0.3.jar
* Jersey
- jersey-client-2.41.jar
- jersey-common-2.41.jar
- jersey-entity-filtering-2.41.jar
- jersey-media-json-jackson-2.41.jar
- jersey-media-multipart-2.41.jar
- jersey-hk2-2.41.jar
- jersey-client-2.42.jar
- jersey-common-2.42.jar
- jersey-entity-filtering-2.42.jar
- jersey-media-json-jackson-2.42.jar
- jersey-media-multipart-2.42.jar
- jersey-hk2-2.42.jar
* Mimepull -- mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ flexible messaging model and an intuitive client API.</description>
<netty-iouring.version>0.0.21.Final</netty-iouring.version>
<jetty.version>9.4.54.v20240208</jetty.version>
<conscrypt.version>2.5.2</conscrypt.version>
<jersey.version>2.41</jersey.version>
<jersey.version>2.42</jersey.version>
<athenz.version>1.10.50</athenz.version>
<prometheus.version>0.16.0</prometheus.version>
<vertx.version>4.5.8</vertx.version>
@@ -255,6 +255,7 @@ flexible messaging model and an intuitive client API.</description>
<disruptor.version>3.4.3</disruptor.version>
<zstd-jni.version>1.5.2-3</zstd-jni.version>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
<completable-futures.version>0.3.6</completable-futures.version>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
@@ -646,6 +647,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
<version>${completable-futures.version}</version>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Original file line number Diff line number Diff line change
@@ -327,4 +327,30 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
*/
PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader);


/**
* Configures the maximum number of connections that the client library will establish with a single host.
* <p>
* By default, the connection pool maintains up to 16 connections to a single host. This method allows you to
* modify this default behavior and limit the number of connections.
* <p>
* This setting can be useful in scenarios where you want to limit the resources used by the client library,
* or control the level of parallelism for operations so that a single client does not overwhelm
* the Pulsar cluster with too many concurrent connections.
*
* @param maxConnectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable
* the limit.
* @return the PulsarAdminBuilder instance, allowing for method chaining
*/
PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost);

/**
* Sets the maximum idle time for a pooled connection. If a connection is idle for more than the specified
* amount of seconds, it will be released back to the connection pool.
* Defaults to 25 seconds.
*
* @param connectionMaxIdleSeconds the maximum idle time, in seconds, for a pooled connection
* @return the PulsarAdminBuilder instance
*/
PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
}
5 changes: 5 additions & 0 deletions pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
@@ -123,6 +123,7 @@
<include>com.google.protobuf:protobuf-java</include>
<include>com.google.guava:guava</include>
<include>com.google.code.gson:gson</include>
<include>com.spotify:completable-futures</include>
<include>com.fasterxml.jackson.*:*</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
@@ -192,6 +193,10 @@
<exclude>com.google.protobuf.*</exclude>
</excludes>
</relocation>
<relocation>
<pattern>com.spotify.futures</pattern>
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
import static org.asynchttpclient.Dsl.post;
import static org.asynchttpclient.Dsl.put;
import com.google.gson.Gson;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -41,6 +40,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
@@ -54,10 +54,8 @@
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncCompletionHandlerBase;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
import org.asynchttpclient.request.body.multipart.FilePart;
@@ -70,12 +68,14 @@
public class FunctionsImpl extends ComponentResource implements Functions {

private final WebTarget functions;
private final AsyncHttpClient asyncHttpClient;
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;

public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
public FunctionsImpl(WebTarget web, Authentication auth,
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
long requestTimeoutMs) {
super(auth, requestTimeoutMs);
this.functions = web.path("/admin/v3/functions");
this.asyncHttpClient = asyncHttpClient;
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
}

@Override
@@ -171,8 +171,7 @@ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
@@ -263,8 +262,7 @@ public CompletableFuture<Void> updateFunctionAsync(
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
@@ -464,7 +462,7 @@ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String pat
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
@@ -543,55 +541,31 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar

RequestBuilder builder = get(target.getUri().toASCIIString());

CompletableFuture<HttpResponseStatus> statusFuture =
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(),
new AsyncHandler<HttpResponseStatus>() {
private HttpResponseStatus status;

@Override
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
status = responseStatus;
if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
return State.ABORT;
}
return State.CONTINUE;
}

@Override
public State onHeadersReceived(HttpHeaders headers) throws Exception {
return State.CONTINUE;
}
CompletableFuture<org.asynchttpclient.Response> responseFuture =
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(),
() -> new AsyncCompletionHandlerBase() {

@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
os.write(bodyPart.getBodyByteBuffer());
return State.CONTINUE;
}
});

@Override
public HttpResponseStatus onCompleted() throws Exception {
return status;
}

@Override
public void onThrowable(Throwable t) {
}
}).toCompletableFuture();

statusFuture
.whenComplete((status, throwable) -> {
responseFuture
.whenComplete((response, throwable) -> {
try {
os.close();
} catch (IOException e) {
future.completeExceptionally(getApiException(e));
}
})
.thenAccept(status -> {
if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
getApiException(Response
.status(status.getStatusCode())
.entity(status.getStatusText())
.status(response.getStatusCode())
.entity(response.getStatusText())
.build()));
} else {
future.complete(null);
@@ -700,7 +674,7 @@ public CompletableFuture<Void> putFunctionStateAsync(
.path("state").path(state.getKey()).getUri().toASCIIString());
builder.addBodyPart(new StringPart("state", objectWriter()
.writeValueAsString(state), MediaType.APPLICATION_JSON));
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
@@ -740,7 +714,7 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
.addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
.addBodyPart(new StringPart("delete", Boolean.toString(delete)));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@

import static org.asynchttpclient.Dsl.get;
import com.google.gson.Gson;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -36,15 +35,14 @@
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncCompletionHandlerBase;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.StringPart;
@@ -55,11 +53,12 @@
public class PackagesImpl extends ComponentResource implements Packages {

private final WebTarget packages;
private final AsyncHttpClient httpClient;
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;

public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpClient client, long requestTimeoutMs) {
public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpRequestExecutor asyncHttpRequestExecutor,
long requestTimeoutMs) {
super(auth, requestTimeoutMs);
this.httpClient = client;
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
this.packages = webTarget.path("/admin/v3/packages");
}

@@ -98,7 +97,7 @@ public CompletableFuture<Void> uploadAsync(PackageMetadata metadata, String pack
.post(packages.path(PackageName.get(packageName).toRestPath()).getUri().toASCIIString())
.addBodyPart(new FilePart("file", new File(path), MediaType.APPLICATION_OCTET_STREAM))
.addBodyPart(new StringPart("metadata", new Gson().toJson(metadata), MediaType.APPLICATION_JSON));
httpClient.executeRequest(addAuthHeaders(packages, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(packages, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
@@ -138,55 +137,30 @@ public CompletableFuture<Void> downloadAsync(String packageName, String path) {
FileChannel os = new FileOutputStream(destinyPath.toFile()).getChannel();
RequestBuilder builder = get(webTarget.getUri().toASCIIString());

CompletableFuture<HttpResponseStatus> statusFuture =
httpClient.executeRequest(addAuthHeaders(webTarget, builder).build(),
new AsyncHandler<HttpResponseStatus>() {
private HttpResponseStatus status;
CompletableFuture<org.asynchttpclient.Response> responseFuture =
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(webTarget, builder).build(),
() -> new AsyncCompletionHandlerBase() {

@Override
public State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
status = httpResponseStatus;
if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
return State.ABORT;
@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
os.write(bodyPart.getBodyByteBuffer());
return State.CONTINUE;
}
return State.CONTINUE;
}

@Override
public State onHeadersReceived(HttpHeaders httpHeaders) throws Exception {
return State.CONTINUE;
}

@Override
public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
os.write(httpResponseBodyPart.getBodyByteBuffer());
return State.CONTINUE;
}

@Override
public void onThrowable(Throwable throwable) {
// we don't need to handle that throwable and use the returned future to handle it.
}

@Override
public HttpResponseStatus onCompleted() throws Exception {
return status;
}
}).toCompletableFuture();
statusFuture
.whenComplete((status, throwable) -> {
});
responseFuture
.whenComplete((response, throwable) -> {
try {
os.close();
} catch (IOException e) {
future.completeExceptionally(getApiException(throwable));
}
})
.thenAccept(status -> {
if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
getApiException(Response
.status(status.getStatusCode())
.entity(status.getStatusText())
.status(response.getStatusCode())
.entity(response.getStatusText())
.build()));
} else {
future.complete(null);
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ public PulsarAdmin build() throws PulsarClientException {

public PulsarAdminBuilderImpl() {
this.conf = new ClientConfigurationData();
this.conf.setConnectionsPerBroker(16);
}

private PulsarAdminBuilderImpl(ClientConfigurationData conf) {
@@ -61,6 +62,15 @@ public PulsarAdminBuilder clone() {
public PulsarAdminBuilder loadConf(Map<String, Object> config) {
conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
setAuthenticationFromPropsIfAvailable(conf);
// in ClientConfigurationData, the maxConnectionsPerHost maps to connectionsPerBroker
if (config.containsKey("maxConnectionsPerHost")) {
Object maxConnectionsPerHostObj = config.get("maxConnectionsPerHost");
if (maxConnectionsPerHostObj instanceof Integer) {
maxConnectionsPerHost((Integer) maxConnectionsPerHostObj);
} else {
maxConnectionsPerHost(Integer.parseInt(maxConnectionsPerHostObj.toString()));
}
}
return this;
}

@@ -227,4 +237,18 @@ public PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLo
this.clientBuilderClassLoader = clientBuilderClassLoader;
return this;
}

@Override
public PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost) {
// reuse the same configuration as the client, however for the admin client, the connection
// is usually established to a cluster address and not to a broker address
this.conf.setConnectionsPerBroker(maxConnectionsPerHost);
return this;
}

@Override
public PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds) {
this.conf.setConnectionMaxIdleSeconds(connectionMaxIdleSeconds);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -168,13 +168,13 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, requestTimeoutMs);
this.resourceQuotas = new ResourceQuotasImpl(root, auth, requestTimeoutMs);
this.lookups = new LookupImpl(root, auth, useTls, requestTimeoutMs, topics);
this.functions = new FunctionsImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs);
this.sources = new SourcesImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs);
this.sinks = new SinksImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs);
this.functions = new FunctionsImpl(root, auth, asyncHttpConnector, requestTimeoutMs);
this.sources = new SourcesImpl(root, auth, asyncHttpConnector, requestTimeoutMs);
this.sinks = new SinksImpl(root, auth, asyncHttpConnector, requestTimeoutMs);
this.worker = new WorkerImpl(root, auth, requestTimeoutMs);
this.schemas = new SchemasImpl(root, auth, requestTimeoutMs);
this.bookies = new BookiesImpl(root, auth, requestTimeoutMs);
this.packages = new PackagesImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs);
this.packages = new PackagesImpl(root, auth, asyncHttpConnector, requestTimeoutMs);
this.transactions = new TransactionsImpl(root, auth, requestTimeoutMs);

if (originalCtxLoader != null) {
Original file line number Diff line number Diff line change
@@ -34,13 +34,13 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.StringPart;
@@ -51,12 +51,13 @@
public class SinksImpl extends ComponentResource implements Sinks, Sink {

private final WebTarget sink;
private final AsyncHttpClient asyncHttpClient;
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;

public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
public SinksImpl(WebTarget web, Authentication auth, AsyncHttpRequestExecutor asyncHttpRequestExecutor,
long requestTimeoutMs) {
super(auth, requestTimeoutMs);
this.sink = web.path("/admin/v3/sink");
this.asyncHttpClient = asyncHttpClient;
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
}

@Override
@@ -145,7 +146,7 @@ public CompletableFuture<Void> createSinkAsync(SinkConfig sinkConfig, String fil
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(sink, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
@@ -233,7 +234,7 @@ public CompletableFuture<Void> updateSinkAsync(
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(sink, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
Original file line number Diff line number Diff line change
@@ -33,13 +33,13 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Source;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.StringPart;
@@ -50,12 +50,13 @@
public class SourcesImpl extends ComponentResource implements Sources, Source {

private final WebTarget source;
private final AsyncHttpClient asyncHttpClient;
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;

public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpRequestExecutor asyncHttpRequestExecutor,
long requestTimeoutMs) {
super(auth, requestTimeoutMs);
this.source = web.path("/admin/v3/source");
this.asyncHttpClient = asyncHttpClient;
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
}

@Override
@@ -124,7 +125,7 @@ public CompletableFuture<Void> createSourceAsync(SourceConfig sourceConfig, Stri
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(source, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
@@ -202,7 +203,7 @@ public CompletableFuture<Void> updateSourceAsync(
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(source, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal.http;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;

/**
* Interface for executing HTTP requests asynchronously.
* This is used internally in the Pulsar Admin client for executing HTTP requests that by-pass the Jersey client
* and use the AsyncHttpClient API directly.
*/
public interface AsyncHttpRequestExecutor {
/**
* Execute the given HTTP request asynchronously.
*
* @param request the HTTP request to execute
* @return a future that will be completed with the HTTP response
*/
CompletableFuture<Response> executeRequest(Request request);
/**
* Execute the given HTTP request asynchronously.
*
* @param request the HTTP request to execute
* @param handlerSupplier a supplier for the async handler to use for the request
* @return a future that will be completed with the HTTP response
*/
CompletableFuture<Response> executeRequest(Request request, Supplier<AsyncHandler<Response>> handlerSupplier);
}
Original file line number Diff line number Diff line change
@@ -65,13 +65,15 @@ public void testGetPropertiesFromConf() throws Exception {
config.put("autoCertRefreshSeconds", 20);
config.put("connectionTimeoutMs", 30);
config.put("readTimeoutMs", 40);
config.put("maxConnectionsPerHost", 50);
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().loadConf(config);
PulsarAdminImpl admin = (PulsarAdminImpl) adminBuilder.build();
ClientConfigurationData clientConfigData = admin.getClientConfigData();
Assert.assertEquals(clientConfigData.getRequestTimeoutMs(), 10);
Assert.assertEquals(clientConfigData.getAutoCertRefreshSeconds(), 20);
Assert.assertEquals(clientConfigData.getConnectionTimeoutMs(), 30);
Assert.assertEquals(clientConfigData.getReadTimeoutMs(), 40);
Assert.assertEquals(clientConfigData.getConnectionsPerBroker(), 50);
}

@Test
Original file line number Diff line number Diff line change
@@ -20,23 +20,34 @@

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.common.FileSource;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.Parameters;
import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientRequest;
@@ -52,10 +63,74 @@

public class AsyncHttpConnectorTest {
WireMockServer server;
ConcurrencyTestTransformer concurrencyTestTransformer = new ConcurrencyTestTransformer();

private static class CopyRequestBodyToResponseBodyTransformer extends ResponseTransformer {
@Override
public com.github.tomakehurst.wiremock.http.Response transform(
com.github.tomakehurst.wiremock.http.Request request,
com.github.tomakehurst.wiremock.http.Response response, FileSource fileSource, Parameters parameters) {
return com.github.tomakehurst.wiremock.http.Response.Builder.like(response)
.body(request.getBodyAsString())
.build();
}

@Override
public String getName() {
return "copy-body";
}

@Override
public boolean applyGlobally() {
return false;
}
}

private static class ConcurrencyTestTransformer extends ResponseTransformer {
private static final long DELAY_MS = 100;
private final AtomicInteger concurrencyCounter = new AtomicInteger(0);
private final AtomicInteger maxConcurrency = new AtomicInteger(0);

@Override
public com.github.tomakehurst.wiremock.http.Response transform(
com.github.tomakehurst.wiremock.http.Request request,
com.github.tomakehurst.wiremock.http.Response response, FileSource fileSource, Parameters parameters) {
int currentCounter = concurrencyCounter.incrementAndGet();
maxConcurrency.updateAndGet(v -> Math.max(v, currentCounter));
try {
try {
Thread.sleep(DELAY_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return com.github.tomakehurst.wiremock.http.Response.Builder.like(response)
.body(String.valueOf(currentCounter))
.build();
} finally {
concurrencyCounter.decrementAndGet();
}
}

public int getMaxConcurrency() {
return maxConcurrency.get();
}

@Override
public String getName() {
return "concurrency-test";
}

@Override
public boolean applyGlobally() {
return false;
}
}

@BeforeClass(alwaysRun = true)
void beforeClass() throws IOException {
server = new WireMockServer(WireMockConfiguration.wireMockConfig()
.extensions(new CopyRequestBodyToResponseBodyTransformer(), concurrencyTestTransformer)
.containerThreads(100)
.port(0));
server.start();
}
@@ -137,4 +212,129 @@ public void failure(Throwable failure) {
assertEquals(scenarioState, "next");
assertTrue(future.isCompletedExceptionally());
}

@Test
void testMaxRedirects() {
// Redirect to itself to test max redirects
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.willReturn(aResponse()
.withStatus(301)
.withHeader("Location", "http://localhost:" + server.port() + "/admin/v2/clusters")));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/admin/v2/clusters")
.build();

try {
connector.executeRequest(request).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof AsyncHttpConnector.MaxRedirectException);
} catch (InterruptedException e) {
fail();
}
}

@Test
void testRelativeRedirect() throws ExecutionException, InterruptedException {
doTestRedirect("path2");
}

@Test
void testAbsoluteRedirect() throws ExecutionException, InterruptedException {
doTestRedirect("/path2");
}

@Test
void testUrlRedirect() throws ExecutionException, InterruptedException {
doTestRedirect("http://localhost:" + server.port() + "/path2");
}

private void doTestRedirect(String location) throws InterruptedException, ExecutionException {
server.stubFor(get(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(301)
.withHeader("Location", location)));

server.stubFor(get(urlEqualTo("/path2"))
.willReturn(aResponse()
.withBody("OK")));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/path1")
.build();

Response response = connector.executeRequest(request).get();
assertEquals(response.getResponseBody(), "OK");
}

@Test
void testRedirectWithBody() throws ExecutionException, InterruptedException {
server.stubFor(post(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(307)
.withHeader("Location", "/path2")));

server.stubFor(post(urlEqualTo("/path2"))
.willReturn(aResponse()
.withTransformers("copy-body")));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf);

Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/path1")
.setBody("Hello world!")
.build();

Response response = connector.executeRequest(request).get();
assertEquals(response.getResponseBody(), "Hello world!");
}

@Test
void testMaxConnections() throws ExecutionException, InterruptedException {
server.stubFor(post(urlEqualTo("/concurrency-test"))
.willReturn(aResponse()
.withTransformers("concurrency-test")));

ClientConfigurationData conf = new ClientConfigurationData();
int maxConnections = 10;
conf.setConnectionsPerBroker(maxConnections);
conf.setServiceUrl("http://localhost:" + server.port());

@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf);

Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/concurrency-test")
.build();

List<CompletableFuture<Response>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add(connector.executeRequest(request));
}
FutureUtil.waitForAll(futures).get();
int maxConcurrency = concurrencyTestTransformer.getMaxConcurrency();
assertTrue(maxConcurrency > maxConnections / 2 && maxConcurrency <= maxConnections,
"concurrency didn't get limited as expected (max: " + maxConcurrency + ")");
}
}
5 changes: 5 additions & 0 deletions pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
@@ -167,6 +167,7 @@
<include>com.google.errorprone:*</include>
<include>com.google.j2objc:*</include>
<include>com.google.code.gson:gson</include>
<include>com.spotify:completable-futures</include>
<include>com.fasterxml.jackson.*:*</include>
<include>io.netty:netty</include>
<include>io.netty:netty-all</include>
@@ -243,6 +244,10 @@
<exclude>com.google.protobuf.*</exclude>
</excludes>
</relocation>
<relocation>
<pattern>com.spotify.futures</pattern>
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
Original file line number Diff line number Diff line change
@@ -128,6 +128,8 @@ public interface ClientBuilder extends Serializable, Cloneable {

/**
* Release the connection if it is not used for more than {@param connectionMaxIdleSeconds} seconds.
* Defaults to 25 seconds.
*
* @return the client builder instance
*/
ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
5 changes: 5 additions & 0 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
@@ -145,6 +145,7 @@
<include>com.google.errorprone:*</include>
<include>com.google.j2objc:*</include>
<include>com.google.code.gson:gson</include>
<include>com.spotify:completable-futures</include>
<include>com.fasterxml.jackson.*:*</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
@@ -204,6 +205,10 @@
<exclude>com.google.protobuf.*</exclude>
</excludes>
</relocation>
<relocation>
<pattern>com.spotify.futures</pattern>
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@

public class ConnectionPool implements AutoCloseable {

public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60;
public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 15;

protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;

Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.conf;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.net.InetSocketAddress;
@@ -45,6 +46,7 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClientConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

@@ -130,7 +132,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
value = "Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. "
+ "If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections"
)
private int connectionMaxIdleSeconds = 180;
private int connectionMaxIdleSeconds = 25;

@ApiModelProperty(
name = "useTcpNoDelay",
Original file line number Diff line number Diff line change
@@ -109,7 +109,7 @@ public void testConnectionMaxIdleSeconds() throws Exception {
PulsarClient.builder().connectionMaxIdleSeconds(60);
// test config not correct.
try {
PulsarClient.builder().connectionMaxIdleSeconds(30);
PulsarClient.builder().connectionMaxIdleSeconds(14);
fail();
} catch (IllegalArgumentException e){
}
5 changes: 5 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
@@ -206,6 +206,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
</dependency>

<!-- test -->
<dependency>
<groupId>org.bouncycastle</groupId>
20 changes: 11 additions & 9 deletions pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
@@ -479,6 +479,8 @@ The Apache Software License, Version 2.0
- hppc-0.9.1.jar
* RoaringBitmap
- RoaringBitmap-1.2.0.jar
* Spotify completable-futures
- completable-futures-0.3.6.jar

Protocol Buffers License
* Protocol Buffers
@@ -535,15 +537,15 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
- aopalliance-repackaged-2.6.1.jar
* Jersey
- jaxrs-213.jar
- jersey-client-2.41.jar
- jersey-common-2.41.jar
- jersey-container-servlet-2.41.jar
- jersey-container-servlet-core-2.41.jar
- jersey-entity-filtering-2.41.jar
- jersey-hk2-2.41.jar
- jersey-media-json-jackson-2.41.jar
- jersey-media-multipart-2.41.jar
- jersey-server-2.41.jar
- jersey-client-2.42.jar
- jersey-common-2.42.jar
- jersey-container-servlet-2.42.jar
- jersey-container-servlet-core-2.42.jar
- jersey-entity-filtering-2.42.jar
- jersey-hk2-2.42.jar
- jersey-media-json-jackson-2.42.jar
- jersey-media-multipart-2.42.jar
- jersey-server-2.42.jar
* JAXB
- jaxb-api-2.3.1.jar
- jaxb-runtime-2.3.4.jar
2 changes: 1 addition & 1 deletion pulsar-sql/presto-distribution/pom.xml
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@

<properties>
<skipBuildDistribution>false</skipBuildDistribution>
<jersey.version>2.41</jersey.version>
<jersey.version>2.42</jersey.version>
<objenesis.version>2.6</objenesis.version>
<objectsize.version>0.0.12</objectsize.version>
<maven.version>3.0.5</maven.version>

0 comments on commit 62fbcc9

Please sign in to comment.