Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix entry filter feature for the non-persistent topic #20141

Merged

Conversation

gaoran10
Copy link
Contributor

@gaoran10 gaoran10 commented Apr 19, 2023

Motivation

The non-persistent topic can't support the entry filter feature, because the entry list of the non-persistent topic is a SingletonList, it doesn't support the set method. We'll encounter this exception when using the entry filter feature on a non-persistent topic.

2023-04-19T23:52:11,689 - INFO  - [pulsar-io-83-8:EntryFilterTest@40] - filterEntry for {}
2023-04-19T23:52:11,690 - INFO  - [pulsar-io-83-8:EntryFilterTest@54] - metadata {} key REJECT debug '-' outcome REJECT
2023-04-19T23:52:11,690 - WARN  - [pulsar-io-83-8:ServerCnx@406] - [/127.0.0.1:58969] Got exception java.lang.UnsupportedOperationException
	at java.base/java.util.AbstractList.set(AbstractList.java:136)
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:153)
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:100)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherSingleActiveConsumer.sendMessages(NonPersistentDispatcherSingleActiveConsumer.java:60)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$2(NonPersistentTopic.java:204)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:554)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:197)
	at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:281)
	at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:194)
	at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1733)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)

AbstractBaseDispatcher.java

...

if (filterResult == EntryFilter.FilterResult.REJECT) {
    entriesToFiltered.add(entry.getPosition());
    entries.set(i, null);
    // FilterResult will be always `ACCEPTED` when there is No Filter
    // dont need to judge whether `hasFilter` is true or not.
    this.filterRejectedMsgs.add(entryMsgCnt);
    filteredEntryCount++;
    filteredMessageCount += entryMsgCnt;
    filteredBytesCount += metadataAndPayload.readableBytes();
    entry.release();
    continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
    entriesToRedeliver.add((PositionImpl) entry.getPosition());
    entries.set(i, null);
    // FilterResult will be always `ACCEPTED` when there is No Filter
    // dont need to judge whether `hasFilter` is true or not.
    this.filterRescheduledMsgs.add(entryMsgCnt);
    filteredEntryCount++;
    filteredMessageCount += entryMsgCnt;
    filteredBytesCount += metadataAndPayload.readableBytes();
    entry.release();
    continue;
}

...

Modifications

Use method Arrays.asList(entry) instead of the method Collections.singletonList(entry).

Verifying this change

Adjust existing tests.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: gaoran10#27

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 19, 2023
@gaoran10 gaoran10 self-assigned this Apr 19, 2023
Copy link
Contributor

@315157973 315157973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants