Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont require all consumers drained #485

Merged
merged 3 commits into from
Aug 4, 2015

Conversation

eapache
Copy link
Contributor

@eapache eapache commented Jul 16, 2015

If a partitionConsumer fills up and is not being drained (or is taking a long time) remove its subscription until it can proceed again in order to not block other partitions which may still be making progress.

@Shopify/kafka @horkhe

@eapache eapache force-pushed the dont-require-all-consumers-drained branch from 6216a3f to cdb278c Compare July 27, 2015 13:44
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this section a bit more? Can the second child.messages <- msg block as well, or am I understand this incorrectly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening in its own goroutine. If the timeout triggers, we send a message back to the brokerConsumer telling it to take us out of the pool, and then we feed the rest of the messages to the user at our leisure (so yes, the second message send probably blocks until the user catches up, but it doesn't block any other partitions).

Once we've flushed all our messages, we put ourselves back in the pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if the messages never get drained by the consumer (because their consumer goroutine shut down for some reason, we will end up with a stuck goroutine?

I think that's OK in that scenario, just want to understand the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it will still shut down if/when Close is called though since that drains any outstanding messages.

@wvanbergen
Copy link
Contributor

I think it's doable to add a test for this: consume 2 partitions, drain only one of them.

@horkhe
Copy link
Contributor

horkhe commented Jul 27, 2015

@eapache @wvanbergen There is such a test case. It is about to be introduced in #492. It is skipped there.

@eapache
Copy link
Contributor Author

eapache commented Jul 27, 2015

OK, please re-review this PR as it should now also fix all the race conditions that the previous changes introduced.

If it looks good, I'll merge it and then you can rebase #492 without the skipped test.


fetchSize int32
offset int64
highWaterMarkOffset int64
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, all the configuration errors (see config.go above) are capitalized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I've submitted a PR to fix them :)

@wvanbergen
Copy link
Contributor

I think this looks OK. @horkhe any final comments?

@eapache eapache mentioned this pull request Jul 29, 2015
Take the previous refactor to its logical conclusion by handling *all* the
error logic in the brokerConsumer, not the responseFeeder. This fixes the race
to close the dying channel (since the brokerConsumer can just close the trigger
instead as it has ownership).

At the same time, refactor `updateSubscriptionCache` into `handleResponses`, and
inline the "new subscriptions" bit into the main loop; otherwise we end up
processing the previous iterations results at the very beginning of the next
iteration, rather than at the very end of the current one.
Prep for unblocking consumers that are not being drained
If a partitionConsumer fills up and is not being drained (or is taking a long
time) remove its subscription until it can proceed again in order to not block
other partitions which may still be making progress.
@eapache eapache force-pushed the dont-require-all-consumers-drained branch from f7da387 to 292f3b0 Compare July 29, 2015 18:57
eapache added a commit that referenced this pull request Aug 4, 2015
@eapache eapache merged commit e1729d6 into master Aug 4, 2015
@eapache eapache deleted the dont-require-all-consumers-drained branch August 4, 2015 13:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants