|
33 | 33 | import com.google.gson.Gson;
|
34 | 34 | import com.google.gson.JsonArray;
|
35 | 35 | import com.google.gson.JsonObject;
|
| 36 | +import io.netty.buffer.ByteBuf; |
| 37 | +import io.netty.channel.EventLoopGroup; |
| 38 | +import io.netty.util.concurrent.DefaultThreadFactory; |
36 | 39 | import java.io.BufferedReader;
|
37 | 40 | import java.io.IOException;
|
38 | 41 | import java.io.InputStream;
|
|
79 | 82 | import org.apache.pulsar.client.api.PulsarClient;
|
80 | 83 | import org.apache.pulsar.client.api.PulsarClientException;
|
81 | 84 | import org.apache.pulsar.client.api.SubscriptionType;
|
| 85 | +import org.apache.pulsar.client.impl.ConnectionPool; |
| 86 | +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; |
82 | 87 | import org.apache.pulsar.client.impl.auth.AuthenticationTls;
|
| 88 | +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; |
83 | 89 | import org.apache.pulsar.common.naming.NamespaceBundle;
|
84 | 90 | import org.apache.pulsar.common.naming.TopicName;
|
85 | 91 | import org.apache.pulsar.common.policies.data.BundlesData;
|
86 | 92 | import org.apache.pulsar.common.policies.data.LocalPolicies;
|
87 | 93 | import org.apache.pulsar.common.policies.data.SubscriptionStats;
|
88 | 94 | import org.apache.pulsar.common.policies.data.TopicStats;
|
| 95 | +import org.apache.pulsar.common.protocol.Commands; |
| 96 | +import org.apache.pulsar.common.util.netty.EventLoopUtil; |
89 | 97 | import org.testng.Assert;
|
90 | 98 | import org.testng.annotations.AfterClass;
|
91 | 99 | import org.testng.annotations.BeforeClass;
|
@@ -879,42 +887,114 @@ public void testTlsAuthUseTrustCert() throws Exception {
|
879 | 887 | */
|
880 | 888 | @Test
|
881 | 889 | public void testLookupThrottlingForClientByClient() throws Exception {
|
| 890 | + // This test looks like it could be flakey, if the broker responds |
| 891 | + // quickly enough, there may never be concurrency in requests |
882 | 892 | final String topicName = "persistent://prop/ns-abc/newTopic";
|
883 | 893 |
|
884 |
| - @Cleanup |
885 |
| - PulsarClient pulsarClient = PulsarClient.builder() |
886 |
| - .serviceUrl(pulsar.getBrokerServiceUrl()) |
887 |
| - .statsInterval(0, TimeUnit.SECONDS) |
888 |
| - .maxConcurrentLookupRequests(1) |
889 |
| - .maxLookupRequests(2) |
890 |
| - .build(); |
| 894 | + PulsarServiceNameResolver resolver = new PulsarServiceNameResolver(); |
| 895 | + resolver.updateServiceUrl(pulsar.getBrokerServiceUrl()); |
| 896 | + ClientConfigurationData conf = new ClientConfigurationData(); |
| 897 | + conf.setConcurrentLookupRequest(1); |
| 898 | + conf.setMaxLookupRequest(2); |
| 899 | + |
| 900 | + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false, |
| 901 | + new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon())); |
| 902 | + long reqId = 0xdeadbeef; |
| 903 | + try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) { |
| 904 | + // for PMR |
| 905 | + // 2 lookup will succeed |
| 906 | + long reqId1 = reqId++; |
| 907 | + ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1); |
| 908 | + CompletableFuture<?> f1 = pool.getConnection(resolver.resolveHost()) |
| 909 | + .thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1)); |
| 910 | + |
| 911 | + long reqId2 = reqId++; |
| 912 | + ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2); |
| 913 | + CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost()) |
| 914 | + .thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2)); |
| 915 | + |
| 916 | + f1.get(); |
| 917 | + f2.get(); |
| 918 | + |
| 919 | + // 3 lookup will fail |
| 920 | + long reqId3 = reqId++; |
| 921 | + ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3); |
| 922 | + f1 = pool.getConnection(resolver.resolveHost()) |
| 923 | + .thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3)); |
| 924 | + |
| 925 | + long reqId4 = reqId++; |
| 926 | + ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4); |
| 927 | + f2 = pool.getConnection(resolver.resolveHost()) |
| 928 | + .thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4)); |
| 929 | + |
| 930 | + long reqId5 = reqId++; |
| 931 | + ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5); |
| 932 | + CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost()) |
| 933 | + .thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5)); |
891 | 934 |
|
892 |
| - // 2 lookup will success. |
893 |
| - try { |
894 |
| - CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync(); |
895 |
| - CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync(); |
| 935 | + try { |
| 936 | + f1.get(); |
| 937 | + f2.get(); |
| 938 | + f3.get(); |
| 939 | + fail("At least one should fail"); |
| 940 | + } catch (ExecutionException e) { |
| 941 | + Throwable rootCause = e; |
| 942 | + while (rootCause instanceof ExecutionException) { |
| 943 | + rootCause = rootCause.getCause(); |
| 944 | + } |
| 945 | + if (!(rootCause instanceof |
| 946 | + org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) { |
| 947 | + throw e; |
| 948 | + } |
| 949 | + } |
896 | 950 |
|
897 |
| - consumer1.get().close(); |
898 |
| - consumer2.get().close(); |
899 |
| - } catch (Exception e) { |
900 |
| - fail("Subscribe should success with 2 requests"); |
901 |
| - } |
| 951 | + // for Lookup |
| 952 | + // 2 lookup will succeed |
| 953 | + long reqId6 = reqId++; |
| 954 | + ByteBuf request6 = Commands.newLookup(topicName, true, reqId6); |
| 955 | + f1 = pool.getConnection(resolver.resolveHost()) |
| 956 | + .thenCompose(clientCnx -> clientCnx.newLookup(request6, reqId6)); |
| 957 | + |
| 958 | + long reqId7 = reqId++; |
| 959 | + ByteBuf request7 = Commands.newLookup(topicName, true, reqId7); |
| 960 | + f2 = pool.getConnection(resolver.resolveHost()) |
| 961 | + .thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7)); |
| 962 | + |
| 963 | + f1.get(); |
| 964 | + f2.get(); |
| 965 | + |
| 966 | + // 3 lookup will fail |
| 967 | + long reqId8 = reqId++; |
| 968 | + ByteBuf request8 = Commands.newLookup(topicName, true, reqId8); |
| 969 | + f1 = pool.getConnection(resolver.resolveHost()) |
| 970 | + .thenCompose(clientCnx -> clientCnx.newLookup(request8, reqId8)); |
| 971 | + |
| 972 | + long reqId9 = reqId++; |
| 973 | + ByteBuf request9 = Commands.newLookup(topicName, true, reqId9); |
| 974 | + f2 = pool.getConnection(resolver.resolveHost()) |
| 975 | + .thenCompose(clientCnx -> clientCnx.newLookup(request9, reqId9)); |
| 976 | + |
| 977 | + long reqId10 = reqId++; |
| 978 | + ByteBuf request10 = Commands.newLookup(topicName, true, reqId10); |
| 979 | + f3 = pool.getConnection(resolver.resolveHost()) |
| 980 | + .thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10)); |
902 | 981 |
|
903 |
| - // 3 lookup will fail |
904 |
| - try { |
905 |
| - CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync(); |
906 |
| - CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync(); |
907 |
| - CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync(); |
908 |
| - |
909 |
| - consumer1.get().close(); |
910 |
| - consumer2.get().close(); |
911 |
| - consumer3.get().close(); |
912 |
| - fail("It should fail as throttling should only receive 2 requests"); |
913 |
| - } catch (Exception e) { |
914 |
| - if (!(e.getCause() instanceof |
915 |
| - org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) { |
916 |
| - fail("Subscribe should fail with TooManyRequestsException"); |
| 982 | + try { |
| 983 | + f1.get(); |
| 984 | + f2.get(); |
| 985 | + f3.get(); |
| 986 | + fail("At least one should fail"); |
| 987 | + } catch (ExecutionException e) { |
| 988 | + Throwable rootCause = e; |
| 989 | + while (rootCause instanceof ExecutionException) { |
| 990 | + rootCause = rootCause.getCause(); |
| 991 | + } |
| 992 | + if (!(rootCause instanceof |
| 993 | + org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) { |
| 994 | + throw e; |
| 995 | + } |
917 | 996 | }
|
| 997 | + |
918 | 998 | }
|
919 | 999 | }
|
920 | 1000 |
|
|
0 commit comments