Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix compaction subscription acknowledge Marker msg issue. #16205

Merged

Conversation

Technoboy-
Copy link
Contributor

Motivation

06:03:58.778 [broker-topic-workers-OrderedScheduler-4-0] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
java.lang.IllegalArgumentException: null
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:128)
        at org.apache.pulsar.broker.service.persistent.CompactorSubscription.acknowledgeMessage(CompactorSubscription.java:61) 
        at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:154) 
        at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:103)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:203) 
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:146) 
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 
        at java.lang.Thread.run(Thread.java:829) [?:?]

If the topic enabled transaction or replicated cluster, the original topic will write some Marker msg. Then if enabled compaction, it will trigger CompactionSubscription to acknowledge failure due to inconsistent AckType:

line-178 will ack as Individual:

} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()

But CompactorSubscription only supports Cumulative :

public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
checkArgument(ackType == AckType.Cumulative);
checkArgument(positions.size() == 1);
checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);

Above is the root cause.

Then if occurs this error, it may cause the broker OOM, because some entries are not released.

Modification

  • Check subscription type when acknowledge.

Verifying this change

  • Add a new test to cover this change.

Documentation

  • doc-not-needed
    (Please explain why)

@Technoboy- Technoboy- marked this pull request as ready for review June 24, 2022 01:45
@Technoboy- Technoboy- self-assigned this Jun 24, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 24, 2022
@Technoboy- Technoboy- added type/bug The PR fixed a bug or issue reported a bug area/broker and removed doc-not-needed Your PR changes do not impact docs labels Jun 24, 2022
@Technoboy- Technoboy- added this to the 2.11.0 milestone Jun 24, 2022
@Technoboy- Technoboy- added the doc-not-needed Your PR changes do not impact docs label Jun 24, 2022
@apache apache deleted a comment from github-actions bot Jun 24, 2022
@apache apache deleted a comment from github-actions bot Jun 24, 2022
Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a great catch!

Left some questions would you like to answer?

@HQebupt
Copy link
Contributor

HQebupt commented Jun 26, 2022

Nice catch! 👍

@codelipenghui codelipenghui merged commit 8e0cd9c into apache:master Jun 26, 2022
@codelipenghui codelipenghui changed the title Fix compaction subscription acknowledge Marker msg issue. [fix][broker] Fix compaction subscription acknowledge Marker msg issue. Jun 26, 2022
codelipenghui pushed a commit that referenced this pull request Jun 28, 2022
@mattisonchao mattisonchao added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Jul 2, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 4, 2022
…e. (apache#16205)

(cherry picked from commit 8e0cd9c)
(cherry picked from commit 8856606)
@BewareMyPower
Copy link
Contributor

@Technoboy- Could you help cherry-pick this PR to branch-2.8? It's weird that the createNonPartitionedTopic call failed in my local env.

12:37:49.693 [AsyncHttpClient-43-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:53870/admin/persistent/my-property/use/my-ns/testWriteMarker-caf36fc4-1e8c-45f8-a49f-674b1d145a19] Failed to perform http put request: javax.ws.rs.NotAllowedException: HTTP 405 Method Not Allowed

org.apache.pulsar.client.admin.PulsarAdminException$NotAllowedException: HTTP 405 Method Not Allowed

	at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:232)
	at org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130)

@BewareMyPower
Copy link
Contributor

Move the release/2.8.4 label to #16918

@Technoboy- Technoboy- deleted the fix-compaction-subscription-ack-issue branch August 10, 2022 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 doc-not-needed Your PR changes do not impact docs release/2.9.4 release/2.10.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants