Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: The Data Skew Problem on FlinkSink #4228

Closed
wants to merge 1 commit into from

Conversation

zhengchar
Copy link

@zhengchar zhengchar commented Feb 25, 2022

Hi,

I tried to load 1TB data from TiDB to Iceberg by Flink. Iceberg table consists of 128 buckets partition.

I found data skew problem on FlinkSQL IcebergWriter stage. We set parrallism 128 on this stage, there are only 49 taskmanagers has data processing tasks, others are finished so quickly.

The data partition operator for a bucket partition table in Flink is 'keyby', a hash policy may occur the data skew. I make a
custom partition function which can distribute a task for a table partition data to a taskmanager evenly。

image

According to my testing, on batch mode without deletion process, this function can make every tm has task to process and cut down data load time from 96 min to 38 min with 64 parallism.

@zhengchar zhengchar changed the title The Data Skew Problem on FlinkSink Flink: The Data Skew Problem on FlinkSink Feb 25, 2022
@stevenzwu
Copy link
Contributor

There is a recent Slack thread for the same issue where hash distribution leads to skewed shuffling (for bucketing partitions and probably other partition spec too): https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1645676203340179

I don't necessarily agree with the solution provided in this PR. We can provide some general solution in FlinkSink.

For bucketing partitions, we can implement a custom partitioner to shuffle data by the bucketing value to the downstream tasks/channels: bucket(key) % downstream operator parallelism.

I don't know if it makes sense to add bucketing enum value to DistributionMode or we can make this config only for FlinkSink.

cc @openinx @rdblue @kbendick

@zhengchar
Copy link
Author

There is a recent Slack thread for the same issue where hash distribution leads to skewed shuffling (for bucketing partitions and probably other partition spec too): https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1645676203340179

I don't necessarily agree with the solution provided in this PR. We can provide some general solution in FlinkSink.

For bucketing partitions, we can implement a custom partitioner to shuffle data by the bucketing value to the downstream tasks/channels: bucket(key) % downstream operator parallelism.

I don't know if it makes sense to add bucketing enum value to DistributionMode or we can make this config only for FlinkSink.

cc @openinx @rdblue @kbendick

Hi Steven,

Glad to having discussion with you!

I think in this problem there are three key aspects:

  • First, we must consider each partition will be wrote by only one task to solve small data files problem.
  • Second, data is shuffled by bucket partition key now.
  • Third, bucket is a hashed policy, but 'truncate' partition or 'identity' partition has no hash attribute

I agree with you that we need to provide a new method to make partition data and task mapped evenly, but not a hash function. I think a new KeySelector logic may apply on bucket partition situation and it is not fit on other partition spec (identity or truncate), or we can make this config only for flinksink on 'bucket' partition spec?

BTW, could you please help to add me to the slack discussion mentioned above? Thanks very muck!

@stevenzwu
Copy link
Contributor

@zhengchar please follow the instruction for Slack invite: https://iceberg.apache.org/community/

@kbendick
Copy link
Contributor

There is a recent Slack thread for the same issue where hash distribution leads to skewed shuffling (for bucketing partitions and probably other partition spec too): https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1645676203340179

I don't necessarily agree with the solution provided in this PR. We can provide some general solution in FlinkSink.

For bucketing partitions, we can implement a custom partitioner to shuffle data by the bucketing value to the downstream tasks/channels: bucket(key) % downstream operator parallelism.

I don't know if it makes sense to add bucketing enum value to DistributionMode or we can make this config only for FlinkSink.

cc @openinx @rdblue @kbendick

I agree @stevenzwu, that I don't necessarily think this is the appropriate solution to the concern. Please do join the Iceberg Slack @zhengchar - as Steven mentioned this has come up very recently there.

I wouldn't add something so specific like bucketing to DistributionMode (at least as proposed here), I would probably start by making a config to the FlinkSink and then trying to solve the problem somewhat more generally with respect to partitioning and required distribution - possibly making that opt-in / opt-out via the flink sink config in case it's not an issue for users. But having a DistributionMode of bucketing seems a little too situationally specific given the preconditions outlined here (only bucketing partition is used, etc). I agree with the idea of a custom partitioner to preshuffle the data by some (potentially configurable) factor of downstream parallelism (be that tasks or channels).

I'd love to continue the discussion on Slack (or on the dev list, but Slack is great for casual async discussion and the Iceberg Slack is very active).

@openinx
Copy link
Member

openinx commented Mar 1, 2022

Thanks @zhengchar for bringing this interesting issue here, and thanks @stevenzwu and @kbendick for providing the slack context.

In essence, the current PartitionKeySelector will generate a String key which is composited by a bucket integer value of a narrow range ( usually [0~bucketNum) ). And then in the keyBy operator , it will use the key to calculate the String#hashCode % parallelism again to choose the preferred downstream channel.

The value range of the first hash is becoming narrow, resulting in a large number of row conflicts in the same bucket for the second hash.

@openinx
Copy link
Member

openinx commented Mar 1, 2022

I agree with @stevenzwu that we need a general solution to fix this data skew issue.

@openinx openinx added this to the Iceberg 0.14.0 Release milestone Mar 1, 2022
@openinx
Copy link
Member

openinx commented Mar 1, 2022

I just copied the solution from Slack here for further searching:

https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1646033885013799?thread_ts=1645676203.340179&cid=C025PH0G1D4

hi @Steven Wu @Kyle Bendickson, thanks for your help .
it really help me a lot .
i custom a partitioner that is consistent with iceberg's bucketing algorithm.
so disable the shuffling (DistributionMode.HASH , It can still handle small files.

val kafkaStream.partitionCustom(new Partitioner[String]() {
              val hash = Hashing.murmur3_32()
              override def partition(key: String, numPartitions: Int): Int = {
                val res = hash.hashString(key, StandardCharsets.UTF_8).asInt()
                (Integer.MAX_VALUE & res) % numPartitions
              }
            },value=>value.getField(0).toString)
FlinkSink.forRow(finalStream.javaStream,tableSchema)
          .tableLoader(loader)
          .writeParallelism(sinkParallelism)
          .distributionMode(DistributionMode.NONE)
          .build()

@zhengchar
Copy link
Author

I just copied the solution from Slack here for further searching:

https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1646033885013799?thread_ts=1645676203.340179&cid=C025PH0G1D4

hi @Steven Wu @Kyle Bendickson, thanks for your help .
it really help me a lot .
i custom a partitioner that is consistent with iceberg's bucketing algorithm.
so disable the shuffling (DistributionMode.HASH , It can still handle small files.

val kafkaStream.partitionCustom(new Partitioner[String]() {
              val hash = Hashing.murmur3_32()
              override def partition(key: String, numPartitions: Int): Int = {
                val res = hash.hashString(key, StandardCharsets.UTF_8).asInt()
                (Integer.MAX_VALUE & res) % numPartitions
              }
            },value=>value.getField(0).toString)
FlinkSink.forRow(finalStream.javaStream,tableSchema)
          .tableLoader(loader)
          .writeParallelism(sinkParallelism)
          .distributionMode(DistributionMode.NONE)
          .build()

Hi @openinx,

Thanks for your explanation.

I have tried the solution above. I found the data skew problem still here on IcebergWriter stage.

In my opinion, there are two points:

  • we need a method to distribute data to different task evenly, but I think the HASH method is not very fit. So I tried to calculate a weight value for every partition, then mod to different task to execute.
  • If mode 'NONE' is used, I think a task will handle multi-partition data, on pos/eq deletion procedure will cause OOM and may occur little file problem. This is also a resource distribution problem.

@stevenzwu
Copy link
Contributor

@zhengchar We all agree that HASH distribution is not a good fit for bucketing tables. In the sample code from the Slack thread, hash distribution is disabled and a custom partitioner is used to shuffle the data matching the bucketing function in Iceberg.

how is the output/destination table partitioned? Trying to understand why the custom partitioner doesn't work for your scenario.

@rdblue
Copy link
Contributor

rdblue commented Mar 4, 2022

After talking with Steven, I think we should go ahead and detect the case where we can use bucket values for distribution directly. There should be no need to add a mode to the table property for that.

@zhengchar
Copy link
Author

@zhengchar We all agree that HASH distribution is not a good fit for bucketing tables. In the sample code from the Slack thread, hash distribution is disabled and a custom partitioner is used to shuffle the data matching the bucketing function in Iceberg.

how is the output/destination table partitioned? Trying to understand why the custom partitioner doesn't work for your scenario.

Hi Steven,

According to my description above, my dest-table just a bucket[64] table.

@stevenzwu
Copy link
Contributor

@zhengchar can you share you code snippet? thought the customer partitioner (with NONE distribution mode) should work for your case as well. not saying it is the general solution. trying to understand why it doesn't work for you or any other unique conditions from your use case that we missed.

@stevenzwu
Copy link
Contributor

close obsolete PR as it is superseded by PR #7161

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants