Skip to content

Comments

KAFKA-7000: update assignment in Consumer#position#5142

Closed
vvcephei wants to merge 2 commits intoapache:trunkfrom
vvcephei:KAFKA-7000-update-assignment-in-position
Closed

KAFKA-7000: update assignment in Consumer#position#5142
vvcephei wants to merge 2 commits intoapache:trunkfrom
vvcephei:KAFKA-7000-update-assignment-in-position

Conversation

@vvcephei
Copy link
Contributor

@vvcephei vvcephei commented Jun 5, 2018

Call ConsumerCoordinator.poll in Consumer.position to ensure we have
updated assignment metadata before potentially throwing an exception regarding
our assignment.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 5, 2018

@guozhangwang @hachikuji ,
Do you mind taking a look at this?

It's the root cause of #5107 failing system tests.
More generally, this fixes a requirement to call poll(0) after subscribing, when there otherwise shouldn't be one.

@mjsax mjsax added the consumer label Jun 5, 2018
@ijuma ijuma requested a review from hachikuji June 5, 2018 23:54
@guozhangwang
Copy link
Contributor

I've synced with @vvcephei that leads to this JIRA. Compared with committed(), position() does not try to make sure coordinator is ready. So without poll() call the sequence subscribe(), position() will fail, but subscribe(), committed() will not. I think it is better to not enforce users calling poll() in between, since the new API can not pass in 0

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 6, 2018

Test results were gone by the time I checked...
Retest this, please.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 6, 2018

The failing test is a result of this change: The test calls position within a consumer rebalance listener, resulting in an infinite regression. Here's part of the stack trace:

...
	  at kafka.api.PlaintextConsumerTest$$anon$3.onPartitionsRevoked(PlaintextConsumerTest.scala:193)
	  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:464)
	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:387)
	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:331)
	  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:330)
	  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1614)
	  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1581)
	  at kafka.api.PlaintextConsumerTest$$anon$3.onPartitionsRevoked(PlaintextConsumerTest.scala:193)
	  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:464)
	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:387)
	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:331)
	  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:330)
	  at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1215)
	  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1178)
	  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1112)
	  at kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:208)

I have verified that the exact same thing happens if you call poll() inside the listener. Is this something to be concerned about in the library or should I just modify the test to break the recurson?

Thoughts, @guozhangwang ?

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 6, 2018

I have updated the test to avoid the recursion. I think the test still evaluates the same condition, namely that the commit occurs and succeeds.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 6, 2018

Note to self: This will need a cherry-pick PR to 2.0 branch.

@asfgit
Copy link

asfgit commented Jun 6, 2018

SUCCESS
9190 tests run, 8 skipped, 0 failed.
--none--

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 7, 2018

Jenkins is acting wacky. Both the builds passed.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 7, 2018

After some discussion with @guozhangwang, it seems like this isn't desirable after all. Our use case was to subscribe and then ask for position without ever polling. In actuality, we could make do by asking for the committed offsets, which are provided by both the Consumer and the AdminClient. We went with the AdminClient.

@vvcephei vvcephei closed this Jun 7, 2018
@vvcephei vvcephei deleted the KAFKA-7000-update-assignment-in-position branch June 7, 2018 22:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants