Skip to content

Commit 5d9b48b

Browse files
committed
use AsyncHttpClient to perform http request, avoiding introduce other dependency
1 parent f5f162b commit 5d9b48b

File tree

3 files changed

+65
-45
lines changed

3 files changed

+65
-45
lines changed

pulsar-client/pom.xml

-5
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,6 @@
114114
<artifactId>gson</artifactId>
115115
</dependency>
116116

117-
<dependency>
118-
<groupId>org.apache.httpcomponents</groupId>
119-
<artifactId>httpclient</artifactId>
120-
</dependency>
121-
122117
<!--Schema dependencies-->
123118

124119
<dependency>

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java

+59-34
Original file line numberDiff line numberDiff line change
@@ -21,74 +21,88 @@
2121
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
2222
import com.fasterxml.jackson.core.JsonProcessingException;
2323
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.google.common.annotations.VisibleForTesting;
2425
import com.google.common.base.Strings;
26+
import io.netty.handler.codec.http.HttpRequest;
27+
import io.netty.handler.codec.http.HttpResponse;
2528
import io.netty.util.concurrent.DefaultThreadFactory;
2629
import java.io.IOException;
30+
import java.net.InetSocketAddress;
31+
import java.nio.charset.StandardCharsets;
2732
import java.util.Map;
2833
import java.util.Objects;
34+
import java.util.concurrent.ExecutionException;
2935
import java.util.concurrent.Executors;
3036
import java.util.concurrent.ScheduledExecutorService;
3137
import java.util.concurrent.TimeUnit;
3238
import lombok.Data;
3339
import lombok.NonNull;
3440
import lombok.extern.slf4j.Slf4j;
35-
import org.apache.http.HttpEntity;
36-
import org.apache.http.HttpHeaders;
37-
import org.apache.http.client.ResponseHandler;
38-
import org.apache.http.client.methods.HttpUriRequest;
39-
import org.apache.http.client.methods.RequestBuilder;
40-
import org.apache.http.impl.client.CloseableHttpClient;
41-
import org.apache.http.impl.client.HttpClients;
42-
import org.apache.http.util.EntityUtils;
41+
import org.apache.pulsar.PulsarVersion;
4342
import org.apache.pulsar.client.api.Authentication;
4443
import org.apache.pulsar.client.api.AuthenticationFactory;
4544
import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
4645
import org.apache.pulsar.client.api.PulsarClient;
4746
import org.apache.pulsar.client.api.ServiceUrlProvider;
4847
import org.apache.pulsar.common.util.ObjectMapperFactory;
48+
import org.asynchttpclient.AsyncHttpClient;
49+
import org.asynchttpclient.AsyncHttpClientConfig;
50+
import org.asynchttpclient.BoundRequestBuilder;
51+
import org.asynchttpclient.DefaultAsyncHttpClient;
52+
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
53+
import org.asynchttpclient.Request;
54+
import org.asynchttpclient.Response;
55+
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
4956
import org.checkerframework.checker.nullness.qual.Nullable;
5057

5158
@Slf4j
5259
public class ControlledClusterFailover implements ServiceUrlProvider {
60+
private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
61+
private static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
62+
private static final int DEFAULT_MAX_REDIRECTS = 20;
63+
5364
private PulsarClientImpl pulsarClient;
5465
private volatile String currentPulsarServiceUrl;
5566
private volatile ControlledConfiguration currentControlledConfiguration;
5667
private final ScheduledExecutorService executor;
5768
private final long interval;
5869
private ObjectMapper objectMapper = null;
59-
private final CloseableHttpClient httpClient;
60-
private final HttpUriRequest request;
61-
private final ResponseHandler<String> responseHandler;
70+
private final AsyncHttpClient httpClient;
71+
private final BoundRequestBuilder requestBuilder;
6272

6373
private ControlledClusterFailover(ControlledClusterFailoverBuilderImpl builder) throws IOException {
6474
this.currentPulsarServiceUrl = builder.defaultServiceUrl;
6575
this.interval = builder.interval;
6676
this.executor = Executors.newSingleThreadScheduledExecutor(
6777
new DefaultThreadFactory("pulsar-service-provider"));
68-
this.httpClient = HttpClients.custom().build();
6978

70-
RequestBuilder requestBuilder = RequestBuilder.get()
71-
.setUri(builder.urlProvider)
72-
.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
79+
this.httpClient = buildHttpClient();
80+
this.requestBuilder = httpClient.prepareGet(builder.urlProvider)
81+
.addHeader("Accept", "application/json");
7382

7483
if (builder.header != null && !builder.header.isEmpty()) {
75-
builder.header.forEach(requestBuilder::setHeader);
84+
builder.header.forEach(requestBuilder::addHeader);
7685
}
77-
this.request = requestBuilder.build();
78-
responseHandler = httpResponse -> {
79-
int status = httpResponse.getStatusLine().getStatusCode();
80-
if (status >= 200 && status < 300) {
81-
HttpEntity entity = httpResponse.getEntity();
82-
return entity != null ? EntityUtils.toString(entity) : null;
83-
} else {
84-
log.warn("Unexpected response status: {}", status);
85-
return null;
86-
}
87-
};
8886
}
8987

90-
public HttpUriRequest getRequest() {
91-
return this.request;
88+
private AsyncHttpClient buildHttpClient() {
89+
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
90+
confBuilder.setFollowRedirect(true);
91+
confBuilder.setMaxRedirects(DEFAULT_MAX_REDIRECTS);
92+
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
93+
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
94+
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
95+
confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
96+
@Override
97+
public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
98+
HttpRequest request, HttpResponse response) {
99+
// Close connection upon a server error or per HTTP spec
100+
return (response.status().code() / 100 != 5)
101+
&& super.keepAlive(remoteAddress, ahcRequest, request, response);
102+
}
103+
});
104+
AsyncHttpClientConfig config = confBuilder.build();
105+
return new DefaultAsyncHttpClient(config);
92106
}
93107

94108
@Override
@@ -139,15 +153,26 @@ public String getCurrentPulsarServiceUrl() {
139153
return this.currentPulsarServiceUrl;
140154
}
141155

156+
@VisibleForTesting
157+
protected BoundRequestBuilder getRequestBuilder() {
158+
return this.requestBuilder;
159+
}
160+
142161
protected ControlledConfiguration fetchControlledConfiguration() throws IOException {
143162
// call the service to get service URL
144163
try {
145-
String jsonStr = httpClient.execute(request, responseHandler);
146-
return getObjectMapper().readValue(jsonStr, ControlledConfiguration.class);
147-
} catch (IOException e) {
148-
log.warn("Failed to fetch controlled configuration. ", e);
149-
return null;
164+
Response response = requestBuilder.execute().get();
165+
int statusCode = response.getStatusCode();
166+
if (statusCode == 200) {
167+
String content = response.getResponseBody(StandardCharsets.UTF_8);
168+
return getObjectMapper().readValue(content, ControlledConfiguration.class);
169+
}
170+
log.warn("Failed to fetch controlled configuration, status code: {}", statusCode);
171+
} catch (InterruptedException | ExecutionException e) {
172+
log.error("Failed to fetch controlled configuration ", e);
150173
}
174+
175+
return null;
151176
}
152177

153178
private ObjectMapper getObjectMapper() {

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424
import java.util.concurrent.TimeUnit;
25-
import org.apache.http.client.methods.HttpUriRequest;
2625
import org.apache.pulsar.client.api.Authentication;
2726
import org.apache.pulsar.client.api.ServiceUrlProvider;
27+
import org.asynchttpclient.Request;
2828
import org.awaitility.Awaitility;
2929
import org.mockito.Mockito;
3030
import org.powermock.api.mockito.PowerMockito;
@@ -36,7 +36,7 @@ public class ControlledClusterFailoverTest {
3636
@Test
3737
public void testBuildControlledClusterFailoverInstance() throws IOException {
3838
String defaultServiceUrl = "pulsar://localhost:6650";
39-
String urlProvider = "http://localhost:8080";
39+
String urlProvider = "http://localhost:8080/test";
4040
String keyA = "key-a";
4141
String valueA = "value-a";
4242
String keyB = "key-b";
@@ -52,14 +52,14 @@ public void testBuildControlledClusterFailoverInstance() throws IOException {
5252
.build();
5353

5454
ControlledClusterFailover controlledClusterFailover = (ControlledClusterFailover) provider;
55-
HttpUriRequest request = controlledClusterFailover.getRequest();
55+
Request request = controlledClusterFailover.getRequestBuilder().build();
5656

5757
Assert.assertTrue(provider instanceof ControlledClusterFailover);
5858
Assert.assertEquals(defaultServiceUrl, provider.getServiceUrl());
5959
Assert.assertEquals(defaultServiceUrl, controlledClusterFailover.getCurrentPulsarServiceUrl());
60-
Assert.assertEquals(urlProvider, request.getURI().toString());
61-
Assert.assertEquals(request.getFirstHeader(keyA).getValue(), valueA);
62-
Assert.assertEquals(request.getFirstHeader(keyB).getValue(), valueB);
60+
Assert.assertEquals(urlProvider, request.getUri().toUrl());
61+
Assert.assertEquals(request.getHeaders().get(keyA), valueA);
62+
Assert.assertEquals(request.getHeaders().get(keyB), valueB);
6363
}
6464

6565
@Test

0 commit comments

Comments
 (0)