Skip to content

Comments

[KAFKA-6730] Simplify State Store Recovery#5013

Merged
mjsax merged 11 commits intoapache:trunkfrom
ConcurrencyPractitioner:KAFKA-6730
Jun 5, 2018
Merged

[KAFKA-6730] Simplify State Store Recovery#5013
mjsax merged 11 commits intoapache:trunkfrom
ConcurrencyPractitioner:KAFKA-6730

Conversation

@ConcurrencyPractitioner
Copy link
Contributor

No description provided.

@ConcurrencyPractitioner
Copy link
Contributor Author

@mjsax One last push. :)
Thanks for your patience.

@mjsax
Copy link
Member

mjsax commented May 16, 2018

\cc @bbejeck @vvcephei

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. We are getting close :)

I think, we can still improve updating the end-offsets. Let's see what Guozhang thinks.

assertThat(callbackTwo.restored.size(), equalTo(3));
}

@Ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the ignored tests completely. \cc @bbejeck


final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 73 above: we can remove the whole JavaDoc comment because this method does not throw TaskMigratedException any longer.

final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
for (final TopicPartition partition : restoringPartitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
if (!needsRestoring.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we should add a second check to make sure updatedEndOffsets is set only once:

if (!needsRestoring.isEmpty() && updatedEndOffsets == null) {

Additionally, we should set updatedEndOffsets = null in Line 117 below (wondering if this is good enough, or if we need to reset updatedEndOffsets = null somewhere else, too. If we are in the middle of restore, and a rebalance happens, we get new partitions and need to refresh the corresponding endOffsets... \cc @guozhangwang WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax What you suggested sounds right to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mjsax . When I added the check as you suggested, I got the following:
org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest > shouldRestorePartitionsRegisteredPostInitialization FAILED
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:266)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
at org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldRestorePartitionsRegisteredPostInitialization(StoreChangelogReaderTest.java:374).

Do you think you know the cause for this failure? I suggested something that might have been the cause in #4901, although that might not precisely be the source of the error.

@ConcurrencyPractitioner
Copy link
Contributor Author

I have found that when you remove the check (updatedEndOffsets == null), then we won't have this problem. Reiterating, I think that we need to continuously update end offsets to resolve the corner case which is tested by the failing unit test.

@guozhangwang
Copy link
Contributor

@ConcurrencyPractitioner Could you provide a more detailed reasoning of why the end offsets could be changed after we fetch it at the beginning of the restoration? The goal of https://issues.apache.org/jira/browse/KAFKA-6730 itself is that, since now we call poll() on each main loop during the restoration, if there is a rebalance then it should be detected before the end offset has been incremented by other threads. If you observed it is still not the case from the failing unit tests, we need to reconsider this JIRA.

@ConcurrencyPractitioner
Copy link
Contributor Author

@guozhangwang In the test, I found that end offsets were updated twice for what appears to be two distinct topic partitions. In pure honesty, I do not know myseIf why tests are faiIing. Do you have any ideas?

@mjsax
Copy link
Member

mjsax commented May 17, 2018

One thing: we need to set updatedEndOffsets = null in Line 112:

if (needsRestoring.isEmpty()) {
    restoreConsumer.unsubscribe();
    updatedEndOffsets = null; // add this
}

Not sure if this fixed the error.

@mjsax
Copy link
Member

mjsax commented May 18, 2018

@ConcurrencyPractitioner I had a look into the test, and the issue is the test setup itself. The mock-consumer setup is split into two parts:

setupConsumer(1, topicPartition);
        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));

and later

        setupConsumer(3, postInitialization);
        consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
        consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));

Thus, when restore is called the first time, the mock consumer does not know about the other partition -- the real consumer however would know about the partition -- thus, we need to change the test setup such that the mocked consumer has the information about end-offsets for all partitions from the beginning on.

@ConcurrencyPractitioner
Copy link
Contributor Author

@mjsax Correct me if I am wrong, but here is my current idea of what is happening. When the first restore is called, updatedEndOffsets has only one entry stored in it (topicPartition, 10). Then we set up a second partition. At this point, this topic partition, postInitialization, is added to the consumer along with its respective offsets. However, this is when we encounter a problem. For starters, due to the current design, updatedEndOffsets is called exactly once for every StoreChangelogReader object that is created, hence it could not be updated a second time. And as you mentioned, the mock consumer does not know about the new topic partition that is added to the consumer after the first restore was called. So all we have to do, in my opinion, is to allow updatedEndOffsets to call restoreConsumer#endOffsets if and only if more topic partitions had been added which needs to be restored. I think this way, we will not encounter this problem.

@ConcurrencyPractitioner
Copy link
Contributor Author

Note that with the solution I am proposing, it would allow mock consumer to know about partitions that it had missed.

@mjsax
Copy link
Member

mjsax commented May 19, 2018

@ConcurrencyPractitioner You observation sound correct to me. It think, we need to update the endOffsets each time when there is some new partitions to be initilized:

if (!needsInitializing.isEmpty()) {
    initialize();
}

Does this make sense?

@ConcurrencyPractitioner
Copy link
Contributor Author

Hi @mjsax any comments?

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build fails with compile error. Can you please rebase this PR?

if (!needsRestoring.isEmpty()) {
final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet());
remainingPartitions.removeAll(updatedEndOffsets.keySet());
updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this into the block above line 70 to 72 and simplify:

if (!needsInitializing.isEmpty()) {
    initialize();
    updatedEndOffsets.putAll(restoreConsumer.endOffsets(needsInitializing.keySet()));
}

final StateRestorer restorer = stateRestorers.get(partition);
final long pos = processNext(records.records(partition), restorer, updatedEndOffsets.get(partition));
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove the entry in updatedEndOffsets if this is true.

@ConcurrencyPractitioner
Copy link
Contributor Author

Hi @mjsax , it appears that needsInitializing is distinct from needsRestoring. When I tested your approach, it didn't work because the topic partitions in needsInitializing are different from the ones that we actually need to restore.

@ConcurrencyPractitioner
Copy link
Contributor Author

Only needsRestoring contains the needed partitions, not needsInitializing.

@mjsax
Copy link
Member

mjsax commented May 22, 2018

I see. Guess we need to update the end-offsets before the call to initialize(); because the new partitions are move from needsInitialization to needsRestore within initialize().

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented May 22, 2018

Hi @mjsax, when I updated the end offsets before initialize() is called. I got the following test error:

org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest > shouldThrowExceptionIfConsumerHasCurrentSubscription FAILED
    java.lang.IllegalStateException: The partition sometopic-0 does not have an end offset.
        at org.apache.kafka.clients.consumer.MockConsumer.endOffsets(MockConsumer.java:377)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:71)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldThrowExceptionIfConsumerHasCurrentSubscription(StoreChangelogReaderTest.java:109)

As you can see, some partitions found in needsInitializing does not have an endOffsets, probably because they have not even been assigned one yet.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented May 24, 2018

Hi @mjsax any other comments that we would need to address?

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented May 29, 2018

@mjsax If you feel you have time, could you tell me exactly what this PR is lacking?

@mjsax
Copy link
Member

mjsax commented May 29, 2018

@ConcurrencyPractitioner Feature freeze deadline does not affect this PR, because it's an internal change. Code freeze deadline is in 2 weeks, so we still got time. Reviewing now though :)

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call for second review @guozhangwang @bbejeck @vvcephei

@@ -81,10 +78,25 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {

final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this variable to simplify the code further.

remainingPartitions.removeAll(updatedEndOffsets.keySet());
updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
final Iterator<TopicPartition> iterator = restoringPartitions.iterator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace restoringPartitions with needsRestoring.keySet() to get rid of the unnecessary variable.

restorePartition(allRecords, partition, active.restoringTaskFor(partition));
final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet());
remainingPartitions.removeAll(updatedEndOffsets.keySet());
updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be correct. Still wondering why you did not move this code into

if (!needsInitializing.isEmpty()) {
    initialize();
    // put the three lines here
}

I think that remainingPartitions can only contain data if !needsInitializing.isEmpty() is true -- thus, no need to execute the code in each iteration. Or do I miss anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about my delay. It should be fine now.

Copy link
Contributor Author

@ConcurrencyPractitioner ConcurrencyPractitioner Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Actually, needsInitializing can still be empty, but remainingPartitions would still have partitions which needs to be added from needsRestoring. I think the main reason is that needsInitializing and needsRestoring are completely independent of one another. If one does not contain partitions, that does not neccesarily effect the other. In other words, as long as needsRestoring has partitions which needs to be restored, we will probably need to check for partitions every time -- regardless of what needsInitializing contains.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot follow. Not sure what I am missing. From my understanding the flow is as follows: (1) after new partitions are assigned, they are added to needsInitializing. (2) If partitions/task are initialized, it's check if the need restoring: (2a) if they need restoring they are removed from needsInitializing and added to needsRestoring (2b) if they don't need restoring, they are only removed from needsInitializing.

We need to maintain updatedEndOffsets only for partitions that needs restoring. And we need to get the endOffset for those partitions only once. Thus, new end-offsets only need to be added, after new partitions are assigned and thus, needsInitializing.isEmpty() is false.

Actually, needsInitializing can still be empty, but remainingPartitions would still have partitions which needs to be added from needsRestoring

Why? Those partitions should have been added to updatedEndOffsets after they got assigned initially.

I think the main reason is that needsInitializing and needsRestoring are completely independent of one another.

From my understanding, they are not independent. Each partitions is first in needsInitializing and might be "moved" from there to needsRestoring.

In other words, as long as needsRestoring has partitions which needs to be restored, we will probably need to check for partitions every time -- regardless of what needsInitializing contains

Why? The end-offset should not change and thus, for each partition that is moved from needsInitializing to needsRestoring we only add the end-offset once to updatedEndOffsets

Please let me know what I miss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see what happened. Sorry, this was my bad. What happened was that I was adding the end offsets to updatedEndOffsets before initialize() in the conditional was called. Consequently, I never detected any change in needsRestoring.

@ConcurrencyPractitioner
Copy link
Contributor Author

HI @mjsax I think this should be all of it.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. One more comment.

if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) {
restorer.restoreDone();
updatedEndOffsets.remove(partition);
completedPartitions.add(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing -- missed this before: can't we remove completedPartitions and call iterator.remove() instead?

Maybe we need to change the iterator to iterate over the HashMap instead of the "key-set" of the HashMap though.

@guozhangwang
Copy link
Contributor

retest this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made my pass, other than @mjsax 's comment it lgtm.

@ConcurrencyPractitioner
Copy link
Contributor Author

@mjsax Sorry about the delay. Fixed now.

@bbejeck
Copy link
Member

bbejeck commented Jun 5, 2018

retest this please

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, LGTM.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch and for being patient!

@mjsax mjsax merged commit ba0ebca into apache:trunk Jun 5, 2018
ijuma added a commit to edoardocomar/kafka that referenced this pull request Jun 6, 2018
…grained-acl-create-topics

* apache-github/trunk:
  KAFKA-5588: Remove deprecated --new-consumer tools option (apache#5097)
  MINOR: Fix for the location of the trogdor.sh executable file in the documentation. (apache#5040)
  KAFKA-6997: Exclude test-sources.jar when $INCLUDE_TEST_JARS is FALSE
  MINOR: docs should point to latest version (apache#5132)
  KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes (KIP-298)
  [KAFKA-6730] Simplify State Store Recovery (apache#5013)
  MINOR: Rename package `internal` to `internals` for consistency (apache#5137)
  KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (apache#4801)
  MINOR: Add missing configs for resilience settings
  MINOR: Add regression tests for KTable mapValues and filter (apache#5134)
  KAFKA-6750: Add listener name to authentication context (KIP-282) (apache#4829)
  KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (apache#4956)
  KAFKA-6938: Add documentation for accessing Headers on Kafka Streams Processor API (apache#5128)
  KAFKA-6813: return to double-counting for count topology names (apache#5075)
  KAFKA-5919; Adding checks on "version" field for tools using it
  MINOR: Remove deprecated KafkaStreams constructors in docs (apache#5118)
ijuma added a commit to big-andy-coates/kafka that referenced this pull request Jun 6, 2018
…refix

* apache-github/trunk:
  KAFKA-6726: Fine Grained ACL for CreateTopics (KIP-277) (apache#4795)
  KAFKA-5588: Remove deprecated --new-consumer tools option (apache#5097)
  MINOR: Fix for the location of the trogdor.sh executable file in the documentation. (apache#5040)
  KAFKA-6997: Exclude test-sources.jar when $INCLUDE_TEST_JARS is FALSE
  MINOR: docs should point to latest version (apache#5132)
  KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes (KIP-298)
  [KAFKA-6730] Simplify State Store Recovery (apache#5013)
  MINOR: Rename package `internal` to `internals` for consistency (apache#5137)
  KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (apache#4801)
  MINOR: Add missing configs for resilience settings
  MINOR: Add regression tests for KTable mapValues and filter (apache#5134)
  KAFKA-6750: Add listener name to authentication context (KIP-282) (apache#4829)
  KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (apache#4956)
  KAFKA-6938: Add documentation for accessing Headers on Kafka Streams Processor API (apache#5128)
  KAFKA-6813: return to double-counting for count topology names (apache#5075)
  KAFKA-5919; Adding checks on "version" field for tools using it
  MINOR: Remove deprecated KafkaStreams constructors in docs (apache#5118)
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
Reviewer: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants