Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.17: Support Partition Commit notification #7638

Closed
wants to merge 1 commit into from

Conversation

hililiwei
Copy link
Contributor

@hililiwei hililiwei commented May 18, 2023

Co-authored-by: quanyingxue yxuequan@163.com

Partition Commit

Iceberg flink writer’s default data commit relies on Checkpoint. WhenCheckpoint is completed, the new data written is committed to the metadata, regardless of which partition the new data belongs to. However, after writing a partition, it is often necessary to notify downstream applications. For example, add the partition to metadata or writing a _SUCCESS file in the directory(Even if stored in object storage, downstream applications may still need to rely on this file as a flag to drive the progress of the entire Job flow).

For the current default partition commit mode, which depends on Checkpoint, we can understand it as using process time to determine table partition commit (this commit mode still has room for optimization, because developers may want to decouple it from the checkpoint cycle). And the partition commit strategy in this PR, we can understand it as using event time to decide whether to commit table partitions.

  • Policy: How to commit a partition, built-in policies support for the commit of success files and default, you can also implement your own policies, such as merging small files, etc.

NOTE: Partition Commit only works in dynamic partition inserting.

Partition commit trigger

To define when to commit a partition, providing partition commit trigger:

Option Required Default Type Description
sink.partition-commit.enabled optional false Boolean Set true to commit partition according to the time that extracted from partition values and watermark.
sink.partition-commit.delay optional 0 s Duration The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.
sink.partition-commit.watermark-time-zone optional UTC String The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when sink.partition-commit.enabled is set to 'true'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'.

Partition commit according to the time that extracted from partition values and watermark. This requires that your job has watermark generation, and the partition is divided according to time, such as hourly partition or daily partition.

If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values:

  • ‘sink.partition-commit.enabled’=‘true’
  • ‘sink.partition-commit.delay’=‘1h’ (‘1h’ if your partition is hourly partition, depends on your partition type) This is the most accurate way to commit partition, and it will try to ensure that the committed partitions are as data complete as possible.

Late data processing: The record will be written into its partition when a record is supposed to be written into a partition that has already been committed, and then the committing of this partition will be triggered again.

Partition Time Extractor

Time extractors define extracting time from partition values.

Option Required Default Type Description
partition.time-extractor.timestamp-pattern optional (none) String Use partition fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can configure: '$dt $hour:00:00'.
partition.time-extractor.timestamp-formatter optional yyyy-MM-dd HH:mm:ss String The formatter that formats the partition timestamp string value to timestamp, the partition timestamp string value is expressed by 'partition.time-extractor.timestamp-pattern'. For example, the partition timestamp is extracted from multiple partition fields, say 'year', 'month' and 'day', you can configure 'partition.time-extractor.timestamp-pattern' to '$year$month$day', and configure partition.time-extractor.timestamp-formatter to 'yyyyMMdd'. By default the formatter is 'yyyy-MM-dd HH:mm:ss'. The timestamp-formatter is compatible with Java's DateTimeFormatter

The default extractor is based on a timestamp pattern composed of your partition fields.

Partition Commit Policy

The partition commit policy defines what action is taken when partitions are committed.

Option Required Default Type Description
sink.partition-commit.policy.kind
optional default String Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. default: add partition to meta data. success-file: add '_SUCESS' file to directory. custom: use policy class to create a commit policy. Support to configure multiple policies: 'default,success-file'.
sink.partition-commit.policy.class optional (none) String The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.
sink.partition-commit.success-file.name optional _SUCCESS String The file name for success-file partition commit policy, default is '_SUCCESS'.

You can extend the implementation of commit policy, The custom commit policy implementation like:

public class CustomCommitPolicy implements PartitionCommitPolicy {

  @Override
  public void commit(Table table, PartitionKey partitionKey) {
		……………………
  }
}

Full-example

CREATE TABLE kafka_table (
  id STRING,
  name STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE iceberg_table (
  id STRING,
  name STRING,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'format-version'='2',
  'sink.partition-commit.enabled'='true',
  'sink.partition-commit.delay'='1h',
  'sink.partition-commit.policy.kind'='success-file',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
  'partition.time-extractor.timestamp-format'='yyyy-MM-dd HH:mm:ss'
 );

INSERT INTO iceberg_table 
SELECT 
    id,
    name, 
    DATE_FORMAT(ts, 'yyyy-MM-dd'),
    DATE_FORMAT(ts, 'HH') 
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM iceberg_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ;

To do

  • When checkpoint is triggered, store the unclosed writers to the state

@hililiwei hililiwei changed the title Flink: Support Partition Commit for append Flink 1.17: Support Partition Commit for append table May 18, 2023
@hililiwei hililiwei changed the title Flink 1.17: Support Partition Commit for append table Flink 1.17: Support Partition Commit for append data May 18, 2023
Co-authored-by: quanyingxue <yxuequan@163.com>
@stevenzwu
Copy link
Contributor

stevenzwu commented May 18, 2023

@hililiwei I thought PR #6253 is less intrusive than this PR and can achieve similar goals. The approach in this PR introduced a lot of complexities. we had some discussion before in PR #6253. we should probably start some high-level discussions on the direction. Can you listed the pros and cons of these two approaches? maybe create a design doc to describe the problems that we are trying to solve, different approaches, and the pros and cons of the approaches.

@hililiwei
Copy link
Contributor Author

hililiwei commented May 19, 2023

@stevenzwu Thank you for taking the time to review. The purpose of this PR is to implement a partition commit policy based on Iceberg tables, which has the following differences and advantages compared to proposal #6253:

• Proposal #6253 only writes the watermark of the Flink task to the table’s Summary, rather than actually committing partitions. This makes it impossible for downstream application to directly determine which partitions are visible, and they need to calculate it themselves based on the watermark and the each partition time. Moreover, the value of the watermark may decrease due to flink job restart or data replay, making some partitions that were already visible invisible again without this PR, which is unacceptable in a production environment. This PR commits partitions based on the watermark and the event time of the partition, so that downstream application can directly see which partitions are available without extra calculation and judgment. This can improve the query efficiency and experience of downstream application. In scenarios such as BI and ad hoc, it can also avoid the problem of developers forgetting to filter data based on the watermark, and not every developer knows that they need to use the watermark to filter data, this greatly increases the possibility of them getting illegal data.

• This PR allows users to customize the partition commit policy, which can perform some high-level custom operations when committing partitions. In some scenarios, downstream applications need to not only process newly committed partitions, but also deal with data from old partitions that arrive late, and perform some custom operations on the table, such as remote API calls, event notifications, data deletion, etc., or even file merging. This is very useful for table management and task flow customization. Users can choose the appropriate commit strategy according to their own business needs and scenarios, achieving more flexible and efficient data processing. Our internal business developers have used this feature to develop some custom commit policies, which have brought us a lot of convenience and advantages.

• This PR maintains high compatibility with the Flink ecosystem. It uses the _SUCCESS file as a marker to indicate partition commit (https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/#partition-commit), which makes it easy to migrate stream tables from Filesystem/Hive to Iceberg without changing the original logic and process. This can reduce migration costs and risks, and improve migration efficiency and stability.

Thank you again for your review and feedback.

@stevenzwu
Copy link
Contributor

stevenzwu commented May 19, 2023

the value of the watermark may decrease due to flink job restart or data replay

This can be fixed by checkpointing the watermark written to snapshot summary or during restore the committer can retrieve the latest committed watermark

This makes it impossible for downstream application to directly determine which partitions are visible, and they need to calculate it themselves based on the watermark and the each partition time.

I agree that a little bit of logic is needed to determine which partitions have complete data based on the published watermark in snapshot summary.

This PR maintains high compatibility with the Flink ecosystem. It uses the _SUCCESS file as a marker to indicate partition commit (https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/#partition-commit),

I am not sure this is a fair comparison. Flink filesystem connector is storing files on distributed file system (like S3) directly. there is no table format abstraction. hence success file is the only option.

Can you also explain what does "partition commit" mean exactly?

sink.partition-commit.success-file.name

where are those success files stored? How do downstream consumers find them?

How does work with entropy enabled?
https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout

@hililiwei
Copy link
Contributor Author

hililiwei commented May 19, 2023

Can you also explain what does "partition commit" mean exactly?

Partition commit means that in the current task, we consider that the data of a partition is ready and can be opened to downstream applications. Here we can define what actions to perform when committing partitions. Is it to submit the partition to the metadata, create a marker file, or perform some other custom operations.

For example, there is a table partitioned by hour, when the watermark based on event time reaches 02:00:00 we think that the data of the 01 hour partition is complete and can be used by downstream applications, so we commit this partition to the metadata (This means that we need to commit the data of this partition to the Iceberg table, create a new snapshot). In this case, we can make a remote API call to notify the task flow that it can start the spark batch step for 01 hour partition.

@hililiwei
Copy link
Contributor Author

hililiwei commented May 19, 2023

This can be fixed by checkpointing the watermark written to snapshot summary or during restore the committer can retrieve the latest committed watermark

I agree that a little bit of logic is needed to determine which partitions have complete data based on the published watermark in snapshot summary.

Of course, we can store the watermark, but this does not fundamentally solve the data visibility problem. Writing watermark to Iceberg table is simple, but it increases the complexity of the whole task chain. And in many scenarios, downstream does not have a suitable place to implement this set of processing logic. For example, ad hoc query, OLAP query.

'determine which partitions have complete data', isn’t this exactly what Flink(with watermark) is better at and should do? When the data of a partition is complete, flink submit this partition and make it visible to the downstream.

I am not sure this is a fair comparison. Flink filesystem connector is storing files on distributed file system (like S3) directly. there is no table format abstraction. hence success file is the only option.

Regarding SUCCESS, it is just an built-in partition commit policy. And not only Filesystem, Hive also supports this, see :https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#writing

@hililiwei
Copy link
Contributor Author

I am not sure this is a fair comparison. Flink filesystem connector is storing files on distributed file system (like S3) directly. there is no table format abstraction. hence success file is the only option.

When we process data, we often have not only streaming jobs, but also many Spark batch jobs behind the streaming jobs. Flink+Hive+Spark is a very common combination. Similarly, when we switch Hive to Iceberg, we need the streaming job to tell the scheduling system when to start the Spark task. We use FileSystem very rarely.

@hililiwei
Copy link
Contributor Author

where are those success files stored? How do downstream consumers find them?

This file is stored under the partition directory, such as /path/iceberg_table/data/dt=20220101/_SUCCESS

How does work with entropy enabled?
https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout

The same way as HDFS, We use the following method to create this file:

    String successFileCommitPath =
        table.locationProvider().newDataLocation(table.spec(), partitionKey, successFileName);

    try (FileIO io = table.io()) {
      io.newOutputFile(successFileCommitPath).createOrOverwrite().close();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

I may not have fully understood what you mean.

@stevenzwu
Copy link
Contributor

@hililiwei can you put everything in a quip doc? then we can send an email to dev@ and seek broader feedback.

@chenjunjiedada
Copy link
Collaborator

I concur with @stevenzwu on the necessity for a design document that delves into the issue and potential solutions. The stream-to-batch use case is prevalent among data lake users, and it warrants exploring diverse approaches. We can foster community consensus on the most suitable solution, paving the way for the subsequent coding phase.

@stevenzwu
Copy link
Contributor

stevenzwu commented May 20, 2023

s3://my-table-data-bucket/my_ns.db/my_table/data/dt=20220101/_SUCCESS

with entropy enabled, the file path will look like. see the random prefix 2d3905f8 in the path.
https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout

s3://my-table-data-bucket/2d3905f8/my_ns.db/my_table/data/dt=20220101/_SUCCESS

The success file pattern assumes a well known / predefined path, which may not be true with S3 object store.

Hive is not much different with distributed file system (like HDFS), since it assumes file layout following folder pattern.

@hililiwei
Copy link
Contributor Author

@hililiwei can you put everything in a quip doc? then we can send an email to dev@ and seek broader feedback.

https://docs.google.com/document/d/1Sobv8XbvsyPzHi1YWy_jSet1Wy7smXKDKeQrNZSFYCg

@stevenzwu
Copy link
Contributor

@hililiwei can you start a discussion thread on dev@iceberg? A lot more people follow dev@ discussions than PR reviews/discussions.

@hililiwei
Copy link
Contributor Author

@hililiwei can you start a discussion thread on dev@iceberg? A lot more people follow dev@ discussions than PR reviews/discussions.

Ok, I thought maybe we could try to get rid of the SUCCESS file part and talk about partition commits on the dev mailing list first?

@hililiwei hililiwei changed the title Flink 1.17: Support Partition Commit for append data Flink 1.17: Support Partition Commit notification Jun 16, 2023
@hililiwei hililiwei closed this Jun 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants