Skip to content

Commit a2442fc

Browse files
ivankellyIvan Kelly
authored andcommitted
PIP-91: Separate lookup timeout from operation timeout (apache#11627)
* PIP-91: Separate lookup timeout from operation timeout This patch contains a number of changes. TooManyRequests is retried for partition metadata and lookups Lookup timeout configuration has been added. By default it matches operation timeout. Partition metadata timeout calculation has been fixed to calculate the elapsed time correctly. Small refactor on broker construction to allow a mocked ServerCnx implementation for testing. Unfortunately, the test takes over 50 seconds, but this is unavoidable due to the fact that we're working with timeouts here. PulsarClientExceptions have been reworked to contain more context (remote/local/reqid) and any previous exceptions which may have occurred triggering retries. The previous exceptions must be manually recorded, so this only applies to lookups on the consumer side for now. * Fixup for test failures BrokerClientIntegrationTest#testCloseConnectionOnBrokerRejected was depending on the fact that TooManyRequests was previously fatal for partition metadata request. Now that it retries, that test was failing. It's a bad test anyhow, depending on thread interactions and whatnot. I've rewritten it to use the ServerCnx mock. It now actually tests for the thing it should, that clients close the connection after the max rejects. The schema tests were failing because they expected a certain exception message which has been extended. I changes endsWith to contains. I also added Producer retries similiar to the Consumer ones. I was going to do as a followon PR, but decided to put in this one. Co-authored-by: Ivan Kelly <ikelly@splunk.com>
1 parent a5fbe5b commit a2442fc

File tree

20 files changed

+801
-212
lines changed

20 files changed

+801
-212
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
2525
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
2626
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
27+
import com.google.common.annotations.VisibleForTesting;
2728
import com.google.common.collect.ImmutableMap;
2829
import com.google.common.collect.Lists;
2930
import com.google.common.collect.Maps;
@@ -241,7 +242,7 @@ public class PulsarService implements AutoCloseable {
241242
private ProtocolHandlers protocolHandlers = null;
242243

243244
private final ShutdownService shutdownService;
244-
private final EventLoopGroup ioEventLoopGroup;
245+
protected final EventLoopGroup ioEventLoopGroup;
245246

246247
private MetricsGenerator metricsGenerator;
247248

@@ -658,7 +659,7 @@ public void start() throws PulsarServerException {
658659
config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
659660
);
660661

661-
this.brokerService = new BrokerService(this, ioEventLoopGroup);
662+
this.brokerService = newBrokerService(this);
662663

663664
// Start load management service (even if load balancing is disabled)
664665
this.loadManager.set(LoadManager.create(this));
@@ -1678,4 +1679,8 @@ private static boolean isTransactionSystemTopic(TopicName topicName) {
16781679
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
16791680
}
16801681

1682+
@VisibleForTesting
1683+
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
1684+
return new BrokerService(pulsar, ioEventLoopGroup);
1685+
}
16811686
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
2727
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
2828
import com.fasterxml.jackson.core.type.TypeReference;
29+
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.collect.Lists;
3031
import com.google.common.collect.Maps;
3132
import com.google.common.collect.Queues;
@@ -254,6 +255,7 @@ public class BrokerService implements Closeable {
254255
@Getter
255256
private final BundlesQuotas bundlesQuotas;
256257

258+
private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
257259
private Channel listenChannel;
258260
private Channel listenChannelTls;
259261

@@ -410,7 +412,8 @@ public void start() throws Exception {
410412

411413
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
412414

413-
bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
415+
bootstrap.childHandler(
416+
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, false));
414417

415418
Optional<Integer> port = serviceConfig.getBrokerServicePort();
416419
if (port.isPresent()) {
@@ -427,7 +430,8 @@ public void start() throws Exception {
427430
Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
428431
if (tlsPort.isPresent()) {
429432
ServerBootstrap tlsBootstrap = bootstrap.clone();
430-
tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true));
433+
tlsBootstrap.childHandler(
434+
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true));
431435
try {
432436
listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(
433437
pulsar.getBindAddress(), tlsPort.get())).sync()
@@ -2647,4 +2651,9 @@ public void resumedConnections(int numberOfConnections) {
26472651
public long getPausedConnections() {
26482652
return pausedConnections.longValue();
26492653
}
2654+
2655+
@VisibleForTesting
2656+
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
2657+
this.pulsarChannelInitFactory = factory;
2658+
}
26502659
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
2222
import com.github.benmanes.caffeine.cache.Cache;
2323
import com.github.benmanes.caffeine.cache.Caffeine;
24+
import com.google.common.annotations.VisibleForTesting;
2425
import io.netty.channel.ChannelInitializer;
2526
import io.netty.channel.socket.SocketChannel;
2627
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -129,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
129130
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
130131
// auto-read.
131132
ch.pipeline().addLast("flowController", new FlowControlHandler());
132-
ServerCnx cnx = new ServerCnx(pulsar);
133+
ServerCnx cnx = newServerCnx(pulsar);
133134
ch.pipeline().addLast("handler", cnx);
134135

135136
connections.put(ch.remoteAddress(), cnx);
@@ -144,4 +145,16 @@ private void refreshAuthenticationCredentials() {
144145
}
145146
});
146147
}
148+
149+
@VisibleForTesting
150+
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
151+
return new ServerCnx(pulsar);
152+
}
153+
154+
public interface Factory {
155+
PulsarChannelInitializer newPulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception;
156+
}
157+
158+
public static final Factory DEFAULT_FACTORY =
159+
(pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls);
147160
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -280,13 +280,12 @@ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuil
280280
}
281281

282282
protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
283-
284283
return startBrokerWithoutAuthorization(conf);
285284
}
286285

287286
protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
288287
conf.setBrokerShutdownTimeoutMs(0L);
289-
PulsarService pulsar = spy(new PulsarService(conf));
288+
PulsarService pulsar = spy(newPulsarService(conf));
290289
setupBrokerMocks(pulsar);
291290
beforePulsarStartMocks(pulsar);
292291
pulsar.start();
@@ -295,6 +294,10 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con
295294
return pulsar;
296295
}
297296

297+
protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
298+
return new PulsarService(conf);
299+
}
300+
298301
protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
299302
// Override default providers with mocked ones
300303
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ private void checkLookupException(String tenant, String namespace, PulsarClient
129129
.create();
130130
} catch (PulsarClientException t) {
131131
Assert.assertTrue(t instanceof PulsarClientException.LookupException);
132-
Assert.assertEquals(t.getMessage(), "java.lang.IllegalStateException: The leader election has not yet been completed!");
132+
Assert.assertTrue(
133+
t.getMessage().contains(
134+
"java.lang.IllegalStateException: The leader election has not yet been completed!"));
133135
}
134136
}
135137

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

+110-30
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
import com.google.gson.Gson;
3434
import com.google.gson.JsonArray;
3535
import com.google.gson.JsonObject;
36+
import io.netty.buffer.ByteBuf;
37+
import io.netty.channel.EventLoopGroup;
38+
import io.netty.util.concurrent.DefaultThreadFactory;
3639
import java.io.BufferedReader;
3740
import java.io.IOException;
3841
import java.io.InputStream;
@@ -79,13 +82,18 @@
7982
import org.apache.pulsar.client.api.PulsarClient;
8083
import org.apache.pulsar.client.api.PulsarClientException;
8184
import org.apache.pulsar.client.api.SubscriptionType;
85+
import org.apache.pulsar.client.impl.ConnectionPool;
86+
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
8287
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
88+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
8389
import org.apache.pulsar.common.naming.NamespaceBundle;
8490
import org.apache.pulsar.common.naming.TopicName;
8591
import org.apache.pulsar.common.policies.data.BundlesData;
8692
import org.apache.pulsar.common.policies.data.LocalPolicies;
8793
import org.apache.pulsar.common.policies.data.SubscriptionStats;
8894
import org.apache.pulsar.common.policies.data.TopicStats;
95+
import org.apache.pulsar.common.protocol.Commands;
96+
import org.apache.pulsar.common.util.netty.EventLoopUtil;
8997
import org.testng.Assert;
9098
import org.testng.annotations.AfterClass;
9199
import org.testng.annotations.BeforeClass;
@@ -879,42 +887,114 @@ public void testTlsAuthUseTrustCert() throws Exception {
879887
*/
880888
@Test
881889
public void testLookupThrottlingForClientByClient() throws Exception {
890+
// This test looks like it could be flakey, if the broker responds
891+
// quickly enough, there may never be concurrency in requests
882892
final String topicName = "persistent://prop/ns-abc/newTopic";
883893

884-
@Cleanup
885-
PulsarClient pulsarClient = PulsarClient.builder()
886-
.serviceUrl(pulsar.getBrokerServiceUrl())
887-
.statsInterval(0, TimeUnit.SECONDS)
888-
.maxConcurrentLookupRequests(1)
889-
.maxLookupRequests(2)
890-
.build();
894+
PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
895+
resolver.updateServiceUrl(pulsar.getBrokerServiceUrl());
896+
ClientConfigurationData conf = new ClientConfigurationData();
897+
conf.setConcurrentLookupRequest(1);
898+
conf.setMaxLookupRequest(2);
899+
900+
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
901+
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
902+
long reqId = 0xdeadbeef;
903+
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
904+
// for PMR
905+
// 2 lookup will succeed
906+
long reqId1 = reqId++;
907+
ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1);
908+
CompletableFuture<?> f1 = pool.getConnection(resolver.resolveHost())
909+
.thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1));
910+
911+
long reqId2 = reqId++;
912+
ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2);
913+
CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost())
914+
.thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2));
915+
916+
f1.get();
917+
f2.get();
918+
919+
// 3 lookup will fail
920+
long reqId3 = reqId++;
921+
ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3);
922+
f1 = pool.getConnection(resolver.resolveHost())
923+
.thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3));
924+
925+
long reqId4 = reqId++;
926+
ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4);
927+
f2 = pool.getConnection(resolver.resolveHost())
928+
.thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4));
929+
930+
long reqId5 = reqId++;
931+
ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5);
932+
CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost())
933+
.thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5));
891934

892-
// 2 lookup will success.
893-
try {
894-
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
895-
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
935+
try {
936+
f1.get();
937+
f2.get();
938+
f3.get();
939+
fail("At least one should fail");
940+
} catch (ExecutionException e) {
941+
Throwable rootCause = e;
942+
while (rootCause instanceof ExecutionException) {
943+
rootCause = rootCause.getCause();
944+
}
945+
if (!(rootCause instanceof
946+
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
947+
throw e;
948+
}
949+
}
896950

897-
consumer1.get().close();
898-
consumer2.get().close();
899-
} catch (Exception e) {
900-
fail("Subscribe should success with 2 requests");
901-
}
951+
// for Lookup
952+
// 2 lookup will succeed
953+
long reqId6 = reqId++;
954+
ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
955+
f1 = pool.getConnection(resolver.resolveHost())
956+
.thenCompose(clientCnx -> clientCnx.newLookup(request6, reqId6));
957+
958+
long reqId7 = reqId++;
959+
ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
960+
f2 = pool.getConnection(resolver.resolveHost())
961+
.thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7));
962+
963+
f1.get();
964+
f2.get();
965+
966+
// 3 lookup will fail
967+
long reqId8 = reqId++;
968+
ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
969+
f1 = pool.getConnection(resolver.resolveHost())
970+
.thenCompose(clientCnx -> clientCnx.newLookup(request8, reqId8));
971+
972+
long reqId9 = reqId++;
973+
ByteBuf request9 = Commands.newLookup(topicName, true, reqId9);
974+
f2 = pool.getConnection(resolver.resolveHost())
975+
.thenCompose(clientCnx -> clientCnx.newLookup(request9, reqId9));
976+
977+
long reqId10 = reqId++;
978+
ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
979+
f3 = pool.getConnection(resolver.resolveHost())
980+
.thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10));
902981

903-
// 3 lookup will fail
904-
try {
905-
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
906-
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
907-
CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();
908-
909-
consumer1.get().close();
910-
consumer2.get().close();
911-
consumer3.get().close();
912-
fail("It should fail as throttling should only receive 2 requests");
913-
} catch (Exception e) {
914-
if (!(e.getCause() instanceof
915-
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
916-
fail("Subscribe should fail with TooManyRequestsException");
982+
try {
983+
f1.get();
984+
f2.get();
985+
f3.get();
986+
fail("At least one should fail");
987+
} catch (ExecutionException e) {
988+
Throwable rootCause = e;
989+
while (rootCause instanceof ExecutionException) {
990+
rootCause = rootCause.getCause();
991+
}
992+
if (!(rootCause instanceof
993+
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
994+
throw e;
995+
}
917996
}
997+
918998
}
919999
}
9201000

0 commit comments

Comments
 (0)