Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Aug 12, 2022
1 parent 89e82a2 commit e0ff3d7
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
Expand Down Expand Up @@ -197,9 +198,19 @@ public void testManagedCursorMetrics() throws Exception {
admin.topics().delete(topicName, true);
}

private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName)
throws ExecutionException, InterruptedException {
final PersistentSubscription persistentSubscription =
(PersistentSubscription) pulsar.getBrokerService()
.getTopic(topicName, false).get().get().getSubscription(subscriptionName);
final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
return managedCursor.getStats();
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String subName1 = "read-write-sub-1";
final String subName2 = "read-write-sub-2";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

Expand All @@ -216,15 +227,15 @@ public void testCursorReadWriteMetrics() throws Exception {
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscriptionName(subName1)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscriptionName(subName2)
.subscribe();

@Cleanup
Expand All @@ -241,6 +252,13 @@ public void testCursorReadWriteMetrics() throws Exception {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}

// Wait for persistent cursor meta.
ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1);
ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2);
Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0);
Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);

metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 2);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
Expand Down

0 comments on commit e0ff3d7

Please sign in to comment.