Skip to content

Commit

Permalink
[fix][broker] Fix entry filter feature for the non-persistent topic (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 authored Apr 20, 2023
1 parent 00dc7a0 commit 575cf23
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -199,7 +199,8 @@ public void publishMessage(ByteBuf data, PublishContext callback) {
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
if (subscription.getDispatcher() != null) {
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
// Dispatcher needs to call the set method to support entry filter feature.
subscription.getDispatcher().sendMessages(Arrays.asList(entry));
} else {
// it happens when subscription is created but dispatcher is not created as consumer is not added
// yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
Expand Down Expand Up @@ -286,10 +287,16 @@ public void testFilter() throws Exception {

}

@DataProvider(name = "topicProvider")
public Object[][] topicProvider() {
return new Object[][]{
{"persistent://prop/ns-abc/topic" + UUID.randomUUID()},
{"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()},
};
}

@Test
public void testFilteredMsgCount() throws Throwable {
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
@Test(dataProvider = "topicProvider")
public void testFilteredMsgCount(String topic) throws Throwable {
String subName = "sub";

try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
Expand All @@ -298,7 +305,7 @@ public void testFilteredMsgCount() throws Throwable {
.subscriptionName(subName).subscribe()) {

// mock entry filters
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
Subscription subscription = pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,22 @@ public void testSubscriptionStats(final String topic, final String subName, bool
hasFilterField.set(dispatcher, true);
}

for (int i = 0; i < 100; i++) {
producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
}
for (int i = 0; i < 100; i++) {
int rejectedCount = 100;
int acceptCount = 100;
int scheduleCount = 100;
for (int i = 0; i < rejectedCount; i++) {
producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
}
for (int i = 0; i < 100; i++) {
for (int i = 0; i < acceptCount; i++) {
producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
}
for (int i = 0; i < scheduleCount; i++) {
producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
}

for (;;) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
for (int i = 0; i < acceptCount; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledge(message);
}

Expand Down Expand Up @@ -263,12 +264,12 @@ public void testSubscriptionStats(final String topic, final String subName, bool
.mapToDouble(m-> m.value).sum();

if (setFilter) {
Assert.assertEquals(filterAccepted, 100);
if (isPersistent) {
Assert.assertEquals(filterRejected, 100);
// Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
}
Assert.assertEquals(filterAccepted, acceptCount);
Assert.assertEquals(filterRejected, rejectedCount);
// Only works on the test, if there are some markers,
// the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
Assert.assertEquals(throughFilter,
filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
} else {
Assert.assertEquals(throughFilter, 0D);
Assert.assertEquals(filterAccepted, 0D);
Expand All @@ -282,19 +283,20 @@ public void testSubscriptionStats(final String topic, final String subName, bool
Assert.assertEquals(rescheduledMetrics.size(), 0);
}

testSubscriptionStatsAdminApi(topic, subName, setFilter);
testSubscriptionStatsAdminApi(topic, subName, setFilter, acceptCount, rejectedCount);
}

private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception {
private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter,
int acceptCount, int rejectedCount) throws Exception {
boolean persistent = TopicName.get(topic).isPersistent();
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
Assert.assertNotNull(stats);

if (setFilter) {
Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
Assert.assertEquals(stats.getFilterAcceptedMsgCount(), acceptCount);
if (persistent) {
Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
Assert.assertEquals(stats.getFilterRejectedMsgCount(), rejectedCount);
// Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
Assert.assertEquals(stats.getFilterProcessedMsgCount(),
stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
Expand Down

0 comments on commit 575cf23

Please sign in to comment.