Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Fix mismatch between hybrid version partition count and real-time partition count #1338

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

sushantmane
Copy link
Contributor

@sushantmane sushantmane commented Nov 22, 2024

Ensure real-time topic partition count matches hybrid version partition count

This fix addresses an issue where the real-time topic partition count did not align with the hybrid version
partition count, causing errors during hybrid store ingestion. The issue occurred in the following scenario:

  1. Create a store with 1 partition.
  2. Perform a batch push, creating a batch version with 1 partition.
  3. Update the store to 3 partitions and convert it to a hybrid store.
  4. Start real-time writes using push type STREAM.
  5. Perform a full push to create a hybrid version with 3 partitions. This push fails because, after the topic
    switch, real-time consumers cannot find partitions 2 and 3 due to the real-time topic having only 1 partition.

Root Cause:

  • In step 4, if the real-time topic did not exist, it was created with a partition count derived from the largest
    existing version (batch version with 1 partition), which lead to mismatch.

Solution in this PR:

  • STREAM push type is now disallowed if there is no online hybrid version.
  • If an online hybrid version exists, it ensures the real-time topic partition count matches the hybrid version
    partition count.
  • The requestTopicForPushing method no longer creates a real-time topic if it does not already exist.

Misc:

  • Refactor CreationVersion::requestTopicForPushing to make it easy for unit testing
  • Added similar checks for incremental push job type
  • Fix flaky blob transfer flaky tests
  • Stop creating real-time topics for region system stores like meta and PS3 in parent region
  • Logging changes around TS to make it easy to search

How was this PR tested?

WIP

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

…rtition count

This fix addresses an issue where the real-time topic partition count did not align with the hybrid version
partition count, causing errors during hybrid store operations. The issue occurred in the following scenario:

1. Create a store with 1 partition.
2. Perform a batch push, creating a batch version with 1 partition.
3. Update the store to 3 partitions and convert it to a hybrid store.
4. Start real-time writes using push type STREAM.
5. Perform a full push to create a hybrid version with 3 partitions. This push fails because, after the topic
   switch, real-time consumers cannot find partitions 2 and 3 due to the real-time topic having only 1 partition.

Root Cause:
- In step 4, if the real-time topic did not exist, it was created with a partition count derived from the largest
  existing version (batch version with 1 partition), leading to a mismatch.

Solution:
- STREAM push type is now disallowed if there is no online hybrid version.
- If an online hybrid version exists, it ensures the real-time topic partition count matches the hybrid version
  partition count.
- The `requestTopicForPushing` method no longer creates a real-time topic if it does not already exist.

This ensures consistency between the real-time topic and hybrid versions, preventing errors during hybrid store operations.
…inc-pushes in child regions when getting version for inc push
@sushantmane sushantmane marked this pull request as draft November 22, 2024 19:51
@sushantmane
Copy link
Contributor Author

Moving to draft as I would like to move topic creation in addVersion and see how that works out

@@ -834,6 +834,7 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
private boolean isLocalVersionTopicPartitionFullyConsumed(PartitionConsumptionState pcs) {
long localVTOff = pcs.getLatestProcessedLocalVersionTopicOffset();
long localVTEndOffset = getTopicPartitionEndOffSet(localKafkaServer, versionTopic, pcs.getPartition());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect there is a bug here in this function. I need to think through

this.pushJobId = pushJobId;
}

public static PushType extractPushType(String pushTypeString) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PushType enum itself, we already have an valueOf (int), would it be better to add this method there to have all the valueOf in one place?

And this method is simply catch an illegalArgumentException and throw a new illegalArgumentException with a more informative error message. Catching exception is expensive. So maybe inside the PushType enum, you can pre-build a static map of pushType string, if the inputString does not match any, you can throw an illegalArgumentException.

Just my two-cents.


private static void verifyPartitioner(PartitionerConfig storePartitionerConfig, Set<String> partitionersFromRequest) {
// If partitioners are provided, check if the store partitioner is in the list
if (partitionersFromRequest != null && !partitionersFromRequest.isEmpty()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the isEmpty test here necessary? If a set is empty, your third test contains(xXX) will return false.


// If Version partition count different from calculated partition count use the version count as store count
// may have been updated later.
if (version.getPartitionCount() != response.getPartitions()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should here we log the partition count change so that we won't have to scratch our head in future troubleshoot?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants