This repository has been archived by the owner on Apr 1, 2024. It is now read-only.
forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 25
Bump pulsar version to 2.9.3.21 #5356
Draft
streamnativebot
wants to merge
162
commits into
branch-2.9
Choose a base branch
from
branch-2.9.3.21
base: branch-2.9
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…pache#18074) (cherry picked from commit 26b47ff)
…pache#12037) In load and performance testing, there's a need to simulate production use cases and production workloads. For this purpose, it would be useful to be able to share the thread pools used by Pulsar client instances in order to be able to run a large amount of Pulsar clients in a single JVM without the overhead of a lot of threads. In the current solution, it's already possible to share the EventLoopGroup and HashedWheelTimer instances. The solution for sharing the thread pools for the external / internal executors was missing. This PR adds support for that. Example usage: ```java // shared thread pool related resources ExecutorProvider internalExecutorProvider = new ExecutorProvider(8, "shared-internal-executor"); ExecutorProvider externalExecutorProvider = new ExecutorProvider(8, "shared-external-executor"); Timer sharedTimer = new HashedWheelTimer(getThreadFactory("shared-pulsar-timer"), 1, TimeUnit.MILLISECONDS); EventLoopGroup sharedEventLoopGroup = new EpollEventLoopGroup(); // example of creating a client which uses the shared thread pools PulsarClientImpl client = PulsarClientImpl.builder().conf(conf) .internalExecutorProvider(internalExecutorProvider) .externalExecutorProvider(externalExecutorProvider) .timer(sharedTimer) .eventLoopGroup(sharedEventLoopGroup) .build(); ``` It seems that this would also improve the performance of the Pulsar Proxy since new thread pools for every client connection. That happens in the Pulsar Proxy currently: https://github.com/apache/pulsar/blob/af63e96d4aaa0ae4c4086583aa4f9b1edd72279b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L445-L451 An optimization was added in apache#9802 for sharing the timer, but it would be useful to also share the internal / external executors. (cherry picked from commit 4591a21)
…ce of message consumption (apache#16236) The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method without any schedule delay. <img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png"> <img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png"> Profile result: [perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt) Running a performance test for single topic max message read rate test: ``` bin/pulsar-perf consume test -q 1000000 -p 100000000 bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2 ``` Without this PR (2.10.1): ``` Profiling started 2022-06-27T13:44:01,183+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851 msg/s --- 2.027 Mbit/s --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057 2022-06-27T13:44:11,196+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125 msg/s --- 2.112 Mbit/s --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042 2022-06-27T13:44:21,216+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861 msg/s --- 1.598 Mbit/s --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548 2022-06-27T13:44:31,233+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932 msg/s --- 1.878 Mbit/s --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847 2022-06-27T13:44:41,247+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313 msg/s --- 1.791 Mbit/s --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286 ``` With this PR: ``` Profiling started 2022-06-27T13:56:20,426+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516 msg/s --- 8.235 Mbit/s --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524 2022-06-27T13:56:30,438+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852 msg/s --- 7.634 Mbit/s --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55 2022-06-27T13:56:40,450+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040 msg/s --- 7.629 Mbit/s --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177 2022-06-27T13:56:50,462+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458 msg/s --- 7.624 Mbit/s --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52 2022-06-27T13:57:00,475+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584 msg/s --- 7.635 Mbit/s --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66 ``` Profile result with this PR: [perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt) - Change internal executor and external executor to normal executor service - Added a new ScheduledExecutorProvider to handle the scheduled tasks. (cherry picked from commit 96237a9)
(cherry picked from commit a32edc7)
…th more than one IO thread (apache#16336) (cherry picked from commit bdda1eb)
… issue. (apache#18454) (cherry picked from commit 7712aa3)
…ite (apache#12788) ### Motivation When we're doing a write to the store from outside the `MetadataCache`, we are immediately invalidating the cache to ensure read-after-write consistency through the cache. The only issue is that the invalidation, will not trigger a reloading of the value. Instead it is relying on the next call to `cache.get()` which will see the cache miss and it will load the new value into the cache. This means that calls `cache.getIfCached()`, which is not triggering a cache load, will keep seeing the key as missing. ### Modification Ensure we're calling refresh on the cache to get the value automatically reloaded in background and make sure the `getIfCached()` will eventually return the new value, even if there are no calls to `cache.get()`. (cherry picked from commit 2bc4499)
Co-authored-by: Jiang Haiting <jianghaiting@didichuxing.com> (cherry picked from commit 2b939b7)
This issue was triggered by apache#13880, it doesn't handle the NPE in [here](https://github.com/apache/pulsar/pull/13880/files#diff-66aeb65a64cbe7c541f013ae807c5bcbeab567bef77706c7ff2e0cbfe0d77eb1R3502), we can check the [diff code](https://github.com/apache/pulsar/pull/13880/files#diff-66aeb65a64cbe7c541f013ae807c5bcbeab567bef77706c7ff2e0cbfe0d77eb1L3489) in apache#13880. (cherry picked from commit a3c8525)
(cherry picked from commit 2e16b43)
…close topic. (apache#15811) (cherry picked from commit e8ee996)
…ge ack owner (apache#16245) ### Motivation The broker don't need to go through all the consumers to get the ack owner consumer. Instead, it should check the current consumer first. If the pending acks of current consumer don't have the ack position, go through all the consumers to find the owner consumer. (cherry picked from commit 68484f9)
…pache#16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent). (cherry picked from commit 291fedc)
…xception (apache#17512) * [fix][tiered-storage] Don't cleanup data when offload met BadVersion --- *Motivation* There have two ways that will cause the offload data cleanup. One is met offload conflict exception, and another is completeLedgerInfoForOffloaded reaches max retry time and throws zookeeper exceptions. We retry the zookeeper operation on connection loss exception. We should be careful about this exception, because we may loss data if the metadata update successfully. When a MetaStore exception happens, we can not make sure the metadata update is failed or not. Because we have a retry on the connection loss, it is possible to get a BadVersion or other exception after retrying. So we don't clean up the data if this happens. *Modification* - don't delete data if has meta store exception * log error when skip deleting * improve logs (cherry picked from commit c2588ba)
…#17548) link apache#17548 ### Motivation now delayed features and transaction messages cannot be used together. When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers. Code, eg. ``` @test public void testDelayedTransactionMessages() throws Exception { String topic = NAMESPACE1 + "/testDelayedTransactionMessages"; @cleanup Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("shared-sub") .subscriptionType(SubscriptionType.Shared) .subscribe(); @cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(false) .create(); Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); // send delayed messages for (int i = 0; i < 10; i++) { producer.newMessage(transaction) .value("msg-" + i) .deliverAfter(5, TimeUnit.SECONDS) .sendAsync(); } producer.flush(); transaction.commit().get(); Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS); // the msg now is not null assertNull(msg); } ``` This PR will implement clients to send delayed messages with transactions. ### Modifications make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction. It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md). Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened ### Verifying this change add the test (cherry picked from commit 1246d79)
…ntry (apache#18305) (cherry picked from commit 79a3f85)
…multiple messages (apache#18238) (cherry picked from commit 67a3de7)
(cherry picked from commit c732852)
(cherry picked from commit 39270f0)
(cherry picked from commit 882fcfb)
(cherry picked from commit 628e760)
…e failed (apache#17652) (cherry picked from commit dc54997)
…pache#17687) * Fix parsing partitionedKey with Base64 encode issue. * release the buf * fix checkstyle issue. (cherry picked from commit f3cc107)
…nnel inactive (apache#17856) ### Motivation https://github.com/apache/pulsar/blob/b89c1451551a6bbe681465726906a2e61c9d8a69/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L282-L297 The `pendingLookupRequestSemaphore` will leak when channel inactive. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests` ### Modifications We can't easily release the semaphore in `channelInactive`, because there are not only `LookUpRequest`. So release the semaphore when connectionException ### Verifying this change Add unit test case to cover this change ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` bug fixs, no need doc - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added) (cherry picked from commit b451880)
apache#18219) ### Motivation https://github.com/apache/pulsar/blob/b061c6ac5833c21e483368febebd0d30679a35e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L748-L774 The `pendingLookupRequestSemaphore` will leak when handleError. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests` related PR: apache#17856 ### Modifications We can't easily release the semaphore in `handleError`, because there are not only `LookUpRequest`. So release the semaphore when LookupException ### Verifying this change Add unit test case to cover this change ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` bug fixs, no need doc - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added) (cherry picked from commit fad3ccc)
* Support LocalDateTime Conversion * move `TimestampMicrosConversion` to correct line (cherry picked from commit b31c5a6)
in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage. https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456 after the schema is updated, close the `ledgerHandle`, just like schema-read: https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525 (cherry picked from commit 2620450)
(cherry picked from commit 16adb61)
…mes topics in one broker (apache#17526) (cherry picked from commit 260f5c6)
…eliveryTracker (apache#18000) (cherry picked from commit 44ae348)
(cherry picked from commit b0213b2)
…letedPosition (apache#14985) (cherry picked from commit 9b36dcd)
* Extracted interface for EntryCacheManager * Fixed references * added more methods to the interface * Fixed mocked test * Removed unused import * Fixed wrong casting in reflection access (cherry picked from commit c7faf62)
…#17248) Fixes apache#16584 With the `RangeCache`, it is hard to reason about its behavior other than cache hits/misses or the cache's size hitting the limit and triggering a size based eviction. This PR adds 3 new metrics to help provide additional insight into the cache's behavior. It adds `pulsar_ml_cache_inserted_entries_total`, `pulsar_ml_cache_evicted_entries_total`, and `pulsar_ml_cache_entries`. * Add new metrics for cache insertion, eviction, and current number of entries. * Add new methods to the `ManagedLedgerFactoryMXBean` interface. * Update several method return values in the `RangeCache`. * Update tests. This change is covered by modified tests that already existed. There is a breaking change to the `RangeCache` class for the `clear` and the `evictLEntriesBeforeTimestamp` methods. The previous result was a `long`, and now it is a `Pair<Integer, Long>`. The new result matches the same style as `evictLeastAccessedEntries`. Given that this class is only meant for use within the broker, I think it is reasonable to break these methods. I will send a note to the mailing list. - [x] `doc` (cherry picked from commit e3b2540)
…pache#17045) (cherry picked from commit e0ff3d7)
…pache#14488) (cherry picked from commit 3619edc)
…sors (apache#17273) * [fix][broker] Fix broken build caused by conflict between apache#17195 and apache#16605 - apache#17195 changed the method signature that apache#16605 depended upon * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false Fixes apache#16054 - calculate the sorted list of when a read position gets updated - this resolves apache#9958 in a proper way - apache#12045 broke the caching solution as explained in apache#16054 - remove invalid tests - fix tests - add more tests to handle corner cases * Address review comment * Handle durable & non-durable in the correct way * Fix cache tests since now entries get evicted reactively * Address review comment about method names * Change signature for add method so that position must be passed - this is more consistent with cursorUpdated method where the position is passed * Update javadoc for ManagedCursorContainer * Address review comment * Simplify ManagedCursorContainer * Clarify javadoc * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally * Prevent race in updating activeCursors (cherry picked from commit 856ef15)
…mport (apache#18793) ### Motivation fix cherry-pick apache#17609 import fix cherry-pick apache#17957 import fix cherry-pick apache#16878 lose problem fix cherry-pick apache#17503 problem
…pache#18237) (cherry picked from commit d612858)
(cherry picked from commit 94a6d36)
…sages (apache#17256) (cherry picked from commit 5d6a88e)
…apache#18486) apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
Co-authored-by: congbobo184 <congbobo184@github.com>
…e namespace by force) (apache#18803) ### Motivation Cherry-pick apache#18307 for release 2.9.4. ### Modifications Cherry-pick apache#18307 for release 2.9.4.
Co-authored-by: congbobo184 <congbobo184@github.com>
… deleted topic (apache#18824) Fix the uncompleted future when getting the topic policies of a deleted topic. https://github.com/apache/pulsar/blob/30b52a1ac11b4be485258140a167b5e635586a36/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L513-L535 `future.complete(null);` when `msg.getValue() == null`. (cherry picked from commit b311961)
…pache#18823) (apache#18831) cherry-pick apache#18823 ### Motivation If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted. ### Modification Only change_events topic do not need to clear topic policies. ### Matching PR in forked repository PR in forked repository: liangyepianzhou#16
(cherry picked from commit a951549)
…ckInitializedBefore` failed (apache#18859) (cherry picked from commit 1be5a69)
apache#18943) when `MLPendingAckStoreProvider` init PendingAckStore gets the ManagedLedger config throw exception, we don't handle the exception. and the `pendingAckStoreFeture` can't be complete, topic unload will use this future to close the pendingAck. https://github.com/apache/pulsar/blob/3011946a5c3b64ed7c08b6bfb1f6492f8aaaca9c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java#L114-L115 when getting managedledger config to fail, `pendingAckStoreFeture` will `completeExceptionally()`; when pendingAckStore init fail, close pendingAckHandle success directly mock get managedLeger config throw exception, then unload can success (cherry picked from commit 1d9956c)
apache#18970) ### Motivation Since PR apache#18833 can not cherry-pick to `branch-2.9`, create a separate PR. #### Context for Transaction Buffer - If turn on `transactionCoordinatorEnabled`, then `TransactionBuffer` will be initialized when a user topic create. - The `TransactionBuffer` reads the aborted logs of transactions from topic `__transaction_buffer_snapshot` -- this process is called `recovery`. - During recovery, the reading from that snapshot ledger is done via a `Reader`; the reader works like this: ``` while (reader.hasMessageAvailable()){ reader.readNext(); } ``` #### Context for Compaction - After [pip-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction), the consumer that enabled feature read-compacted will read messages from the compacted topic instead of the original topic if the task-compaction is done, and read messages from the original topic if task-compaction is not done. - If the data of the last message with key k sent to a topic is null, the compactor will mark all messages for that key as deleted. #### Issue There is a race condition: after executing `reader.hasMessageAvailable`, the following messages have been deleted by compaction-task, so read next will be blocked because there have no messages to read. ---- ### Modifications - If hits this issue, do recover again. ---- #### Why not just let the client try to load the topic again to retry the recover? If the topic load is failed, the client will receive an error response. This is a behavior that we can handle, so should not be perceived by the users.
(cherry picked from commit e07b67f)
…rCnx (apache#18987) In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads. Here is the single `BaseCommand`: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99 Here is the method call that resets the object: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114 Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object. This PR copies relevant values or objects into other variables. * Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread. * Move logic to copy certain command fields to earlier in method for `handleSubscribe` * Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured. This is a trivial change that is already covered by tests. - [x] `doc-not-needed` This is an internal change. PR in forked repository: michaeljmarshall#8 (cherry picked from commit a408e9e)
…broker interceptors (apache#18997) (cherry picked from commit 1154d0a) (cherry picked from commit b75f068) (cherry picked from commit cc781f7)
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This is a PR created by snbot to trigger the check suite in each repository.