diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index fe63942fd133b..72435aa265e54 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; @@ -197,9 +198,19 @@ public void testManagedCursorMetrics() throws Exception { admin.topics().delete(topicName, true); } + private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName) + throws ExecutionException, InterruptedException { + final PersistentSubscription persistentSubscription = + (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topicName, false).get().get().getSubscription(subscriptionName); + final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor(); + return managedCursor.getStats(); + } + @Test public void testCursorReadWriteMetrics() throws Exception { - final String subName = "read-write"; + final String subName1 = "read-write-sub-1"; + final String subName2 = "read-write-sub-2"; final String topicName = "persistent://my-namespace/use/my-ns/read-write"; final int messageSize = 10; @@ -216,7 +227,7 @@ public void testCursorReadWriteMetrics() throws Exception { .topic(topicName) .subscriptionType(SubscriptionType.Shared) .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName) + .subscriptionName(subName1) .subscribe(); @Cleanup @@ -224,7 +235,7 @@ public void testCursorReadWriteMetrics() throws Exception { .topic(topicName) .subscriptionType(SubscriptionType.Shared) .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName + "-2") + .subscriptionName(subName2) .subscribe(); @Cleanup @@ -241,6 +252,13 @@ public void testCursorReadWriteMetrics() throws Exception { consumer2.acknowledge(consumer.receive().getMessageId()); } } + + // Wait for persistent cursor meta. + ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1); + ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2); + Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0); + Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0); + metricsList = metrics.generate(); Assert.assertEquals(metricsList.size(), 2); Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);