Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-16421: Shared subscription + Batch messages: consumerFlow trigger reading the same messages from storage #4484

Open
sijie opened this issue Jul 6, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Jul 6, 2022

Original Issue: apache#16421


I am doing some testing on Shared subscription and Batch messages with the current Pulsar master.

The behaviour that I am observing is that when you have Batch messages the Consumer is sending flow control messages for more messages that it can handle.

This is how to reproduce the problem:

  • write 100.000 messages using batching
  • start a Consumer with a Shared subscription (from the beginning of the topic)
  • you will see that the PersistentDispatcherMultipleConsumers consumerFlow trigger the read of many messages

This is happening because consumerFlow calls readMoreEntries() and readMoreEntries() sees that there are messages to be re-delivered, because the consumer still haven't acknowledged them.

This is turn requests the ManagedCursor to read the data from storage.

I have observed this behaviour while working on offloader performances, but it also happens with regular BK based ledgers.

This simple test case reproduces the problem, I append it to this test https://github.com/apache/pulsar/blob/1ba180cbc7490eff6ac6d3a78d61ce7919236c95/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java#L66

  @Test
   public void testConsumerFlowOnSharedSubscription() throws Exception {
       String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
       admin.topics().createNonPartitionedTopic(topic);
       String subName = "my-sub";
       int numMessages = 20_000;
       final CountDownLatch count = new CountDownLatch(numMessages);
       try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
               .subscriptionMode(SubscriptionMode.Durable)
               .subscriptionType(SubscriptionType.Shared)
               .topic(topic)
               .subscriptionName(subName)
               .messageListener(new MessageListener<byte[]>() {
                   @Override
                   public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                       //log.info("received {} - {}", msg, count.getCount());
                       consumer.acknowledgeAsync(msg);
                       count.countDown();
                   }
               })
               .subscribe();
            Producer<byte[]> producer = pulsarClient
               .newProducer()
               .blockIfQueueFull(true)
               .enableBatching(true)
               .topic(topic)
                    .create()) {
           consumer.pause();
           byte[] message = "foo".getBytes(StandardCharsets.UTF_8);
           List<CompletableFuture<?>> futures = new ArrayList<>();
           for (int i = 0; i < numMessages; i++) {
               futures.add(producer.sendAsync(message).whenComplete( (id,e) -> {
                   if (e != null) {
                       log.error("error", e);
                   }
               }));
               if (futures.size() == 1000) {
                   FutureUtil.waitForAll(futures).get();
                   futures.clear();
               }
           }
           producer.flush();
           consumer.resume();
           assertTrue(count.await(20, TimeUnit.SECONDS));
       }

   }
@sijie sijie added the type/bug label Jul 6, 2022
@sijie sijie added the Stale label Aug 7, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant