Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 100% CPU usage when starting multiple ChangeStreams #181

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

jmoghisi
Copy link
Contributor

@jmoghisi jmoghisi commented Mar 24, 2021

We have observed a bug with mongo-java-server that causes 100% CPU usage when starting multiple ChangeStreams.

The fix is to introduce a small delay before returning documents from the OpLog Cursor. This also emulates real behaviour in which a ChangeStream cursor returns only after a timeout or once data has been received from all shards.

The change also includes:

  • Refactoring ChangeStream-specific behaviour into its own Cursor class.
  • Introducing a TailableCursor interface in preparation for supporting Tailable Cursors.

@jmoghisi jmoghisi force-pushed the multiple_change_streams branch from 104a1a6 to 1fc4959 Compare March 24, 2021 12:53
@jmoghisi jmoghisi force-pushed the multiple_change_streams branch 2 times, most recently from 9ef5712 to 9be4583 Compare April 6, 2021 19:59
@jmoghisi
Copy link
Contributor Author

jmoghisi commented Apr 7, 2021

@bwaldvogel please can you check this pull request.

@bwaldvogel
Copy link
Owner

@jmoghisi: Yes, as you probably saw, I’ve started reviewing this PR and I've cherry-picked a couple of commits on master.
I’ll continue when time permits…

@jmoghisi
Copy link
Contributor Author

jmoghisi commented Apr 7, 2021

Apologies, I missed that. Thanks for checking.

@jmoghisi jmoghisi force-pushed the multiple_change_streams branch from 9be4583 to 0ea8ad1 Compare April 8, 2021 23:01

import de.bwaldvogel.mongo.oplog.OplogPosition;

public interface TailableCursor extends Cursor {
Copy link
Owner

@bwaldvogel bwaldvogel Apr 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TailableCursor refactoring relevant for the bugfix?
If not, please keep it out of this PR and we could discuss the refactoring in a follow-up discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not, you are right, I will raise a separate PR for tailable cursors.


@Override
public OplogPosition getPosition() {
return null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this supposed to be ever invoked?
If not, wouldn’t it be cleaner to throw an UnsupportedOperationException?

OTH, if it’s never supposed to be invoked, to me this fact suggests that the TailableCursor refactoring might not be a good idea after all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is meant to be invoked, i'll move it to a separate PR that requires it.

// emulates real ChangeStream behaviour of waiting for all shards to provide data
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// ignore
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s almost never okay to just ignore an InterruptedException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will fix

try {
// artificial delay to avoid 100% CPU usage when starting multiple ChangeStreams
// emulates real ChangeStream behaviour of waiting for all shards to provide data
TimeUnit.MILLISECONDS.sleep(100);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that this kind of artificial delay is a good implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would you suggest as an alternative? the problem we have observed is that without this delay the clients are constantly polling and causing 100% CPU usage. our tests do not complete and time out. adding the delay resolves the issue.

note, the production MongoDB behaviour has a delay/timeout after waiting for updates from other shards.


// give time for all ChangeStream Publishers to be subscribed to
// todo: expose API to get cursors from Backend and wait until 'changeStreamCount' cursors
TimeUnit.SECONDS.sleep(5);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A required sleep in a test screams for "it will break eventually". I’m sorry, but IMO it’s not acceptable to merge such a test.
Typically one uses something like a CyclicBarrier to make such concurrency tests fully deterministic and get rid of sleeps. Please also see my other comments.

Copy link
Contributor Author

@jmoghisi jmoghisi Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address and make the test more deterministic.

@@ -456,4 +463,67 @@ public void testOplogShouldFilterNamespaceOnChangeStreams() throws Exception {
return subscriber.values().get(0);
}

@Test
@Disabled
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the test @Disabled? The test does not fail for me.

Actually I’m not able to grasp what this test is trying to test/show.
The intensive use of RxJava doesn’t necessarily help to understand the test. I’m not even sure what happens if the test breaks in the middle? Which code takes care of cleaning up potentially remaining subscriptions?

If I understood the basic idea correctly, it should be as simple as starting one or two threads that subscribe a change stream and then insert documents in the test "main" thread.
The thread sleeps can then be usually avoided for example by using a CyclicBarrier to make the test fully deterministic.
However, the test should somehow explain/show where the 100% CPU usage happens…

Copy link
Contributor Author

@jmoghisi jmoghisi Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is enabled in a later commit that contains the fix.

you have understood the idea of the test correctly. we start a number of change streams, insert a number of documents, and then assert that all of the watches saw the same items emitted within a sensible timeout.

we are maintaining an internal fork of the library with this fix. we are finding that some of our unit tests fail without it. the problem is more acute as number of change streams increases and especially on resource constrained hardware e.g. busy CI servers. I will take another pass at this test to ensure it always fails without the fix.

I can see high CPU usage when running the test without the fix and see it drop significantly when the delay is added.

the tear down is handled by the Rx Test Subscriber which cancels all the subscriptions. I'll refactor to remove the Thread sleep.

Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 2, bu: 'abc'"))),
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 3, bu: 'xyz'"))),
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 4, bu: 'abc'")))
).test().awaitDone(15, TimeUnit.SECONDS).assertComplete();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those five lines of reactive code better than just

collection.insertOne(json("_id: 2, bu: 'abc'"));
collection.insertOne(json("_id: 3, bu: 'xyz'"));
collection.insertOne(json("_id: 4, bu: 'abc'"));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're not, see other reply. i'll refactor.

@Disabled
public void testMultipleChangeStreams() throws InterruptedException {
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 1")))
.test().awaitDone(5, TimeUnit.SECONDS).assertComplete();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this reactive code better than:

collection.insertOne(json("_id: 1"));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not, I did not want to mix both styles in the same test, but happy to revise if you prefer that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants