Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] waitingCursors potential heap memory leak #13939

Merged
merged 2 commits into from
Feb 21, 2022

Conversation

gaozhangmin
Copy link
Contributor

@gaozhangmin gaozhangmin commented Jan 25, 2022

Motivation

org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#waitingCursors has memory leak problem, reproduced steps:

  1. Create a consumer
  2. Consumers is closed due to some error, without consuming any message.
  3. Day by day, it will cause broker heap memory leak problem.

Reproduce test: I don't know why getWaitingCursorsCount aways 1 less than for loop count.

@Test
    public void testSub() throws Exception {
        final String topicName = "persistent://prop/ns-abc/stuckSubscriptionTopic";
        for (int i=0;i<10;i++) {
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                    .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
            consumer.close();
        }
        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.ledger);
        assertEquals(9, ml.getWaitingCursorsCount());
    }

Detail:

  1. When create consumer and connection opens.
    if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) {
    increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
    }
  2. Cursor was added to waitingCursors by org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#checkForNewEntries
    private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Object ctx) {
    try {
    if (log.isDebugEnabled()) {
    log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
    }
    if (!hasMoreEntries()) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
    name);
    }
    // Let the managed ledger know we want to be notified whenever a new entry is published
    ledger.waitingCursors.add(this);
    } else {
    if (log.isDebugEnabled()) {
    log.debug("[{}] [{}] Skip notification registering since we do have entries available",
    ledger.getName(), name);
    }
    }
  3. But cursor were only removed when there were new messages. if consumer close without consuming messages, The cursor was not removed permantly.
    org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#notifyCursors
    void notifyCursors() {
    while (true) {
    final ManagedCursorImpl waitingCursor = waitingCursors.poll();
    if (waitingCursor == null) {
    break;
    }
    executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
    }
    }

Modifications

1、remove cursor when consumer closed.

Documentation

Check the box below or label this PR directly (if you have committer privilege).

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@github-actions
Copy link

@gaozhangmin:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions
Copy link

@gaozhangmin:Thanks for providing doc info!

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 25, 2022
@merlimat
Copy link
Contributor

@gaozhangmin I'm not sure I see the case where the cursor is added multiple times:

ledger.getScheduledExecutor()
                        .schedule(() -> checkForNewEntries(op, callback, ctx),
                                config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);

This is called only the first time, as a kind of optimization to avoid inserting in the queue for cases where consumer and producers are very close. After the first time is unsuccessful, then we insert in the queue and wait for the notification. There shouldn't be more than 1 entry per cursor in that queue.

@gaozhangmin
Copy link
Contributor Author

@gaozhangmin I'm not sure I see the case where the cursor is added multiple times:

ledger.getScheduledExecutor()
                        .schedule(() -> checkForNewEntries(op, callback, ctx),
                                config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);

This is called only the first time, as a kind of optimization to avoid inserting in the queue for cases where consumer and producers are very close. After the first time is unsuccessful, then we insert in the queue and wait for the notification. There shouldn't be more than 1 entry per cursor in that queue.

@merlimat You can verify by this test

@Test
    public void testSub() throws Exception {
        final String topicName = "persistent://prop/ns-abc/stuckSubscriptionTopic";
        for (int i=0;i<10000;i++) {
            Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                    .subscriptionType(SubscriptionType.Failover).subscriptionName("test").subscribe();
            consumer1.close();
        }
        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.ledger);
        System.out.println(ml.getWaitingCursorsCount());

    }

@liudezhi2098
Copy link
Contributor

This maybe will not cause memory leak, it's not a periodic task. constantly creating and shutting down consumers can lead to this problem.

ledger.getScheduledExecutor()
                        .schedule(() -> checkForNewEntries(op, callback, ctx),
                                config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);

@gaozhangmin gaozhangmin force-pushed the fix-memory-leak-waitingcursor branch 2 times, most recently from 4834395 to 96bc879 Compare January 26, 2022 10:27
@gaozhangmin gaozhangmin force-pushed the fix-memory-leak-waitingcursor branch 2 times, most recently from 9018954 to 718120c Compare February 14, 2022 05:27
@gaozhangmin
Copy link
Contributor Author

@gaozhangmin
Copy link
Contributor Author

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor

@liudezhi2098 Please help review this Pr, thanks a lot.

@gaozhangmin gaozhangmin force-pushed the fix-memory-leak-waitingcursor branch 3 times, most recently from 121f0b3 to 16ff5c3 Compare February 15, 2022 09:48
@gaozhangmin gaozhangmin force-pushed the fix-memory-leak-waitingcursor branch from 16ff5c3 to 878bd6b Compare February 15, 2022 10:38
@gaozhangmin
Copy link
Contributor Author

@merlimat PTAL

…ateSubscriptionTest.java

Co-authored-by: lipenghui <penghui@apache.org>
@gaozhangmin
Copy link
Contributor Author

/pulsarbot run-failure-checks

@gaozhangmin
Copy link
Contributor Author

/pulsarbot run-failure-checks

@eolivelli eolivelli merged commit 478fd36 into apache:master Feb 21, 2022
michaeljmarshall pushed a commit that referenced this pull request Feb 23, 2022
@michaeljmarshall michaeljmarshall added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Feb 23, 2022
@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.10.0 Feb 25, 2022
codelipenghui pushed a commit that referenced this pull request Feb 25, 2022
nicoloboschi pushed a commit to nicoloboschi/pulsar that referenced this pull request Mar 1, 2022
gaoran10 pushed a commit that referenced this pull request Mar 1, 2022
@gaoran10 gaoran10 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Mar 2, 2022
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
@lhotari lhotari mentioned this pull request Feb 29, 2024
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-not-needed Your PR changes do not impact docs release/2.8.3 release/2.9.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants