From 5ed7dacf2629b2fa0f3b2051ff6d4ee6f81e497c Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 21 Nov 2023 18:51:02 +0800 Subject: [PATCH 1/5] RFC: Decouple iceberg sink commit from risingwave checkpoint --- rfcs/0078-iceberg-sink-decouple-checkpoint.md | 70 +++++++++++++++++++ .../write_parquet_per_cp.svg | 34 +++++++++ .../write_row_group_per_cp.svg | 66 +++++++++++++++++ 3 files changed, 170 insertions(+) create mode 100644 rfcs/0078-iceberg-sink-decouple-checkpoint.md create mode 100644 rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_parquet_per_cp.svg create mode 100644 rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_row_group_per_cp.svg diff --git a/rfcs/0078-iceberg-sink-decouple-checkpoint.md b/rfcs/0078-iceberg-sink-decouple-checkpoint.md new file mode 100644 index 00000000..eb62120e --- /dev/null +++ b/rfcs/0078-iceberg-sink-decouple-checkpoint.md @@ -0,0 +1,70 @@ +--- +feature: Decouple iceberg sink commit from risingwave checkpoint +authors: +- "liurenjie1024" + start_date: "2023/11/20" +--- + +# RFC: Decouple iceberg sink commit from risingwave checkpoint + +## Motivation + +Currently, the iceberg sink commit is coupled with risingwave checkpoint, e.g. we commit changes to iceberg table when risingwave checkpoint is triggered. During our talk with some lakehouse user, we found that they want to commit changes to iceberg table less frequently than risingwave checkpoint. For example, they want to do checkpoint every several seconds, while commit changes to iceberg table should be much less frequent, e.g. every 5 to 10 minutes. Frequently commit changes to iceberg table will cause too many small files in iceberg table, which will cause performance issue when querying or maintaining iceberg table. In this proposal we want to allow user to have some way to control the frequency of committing changes to iceberg table so that it's more flexible. + +## Design + +### Syntax + +We will add a new option `commit_interval` to iceberg sink, e.g. the time between two commits. By default, it's same as `checkpoint interval` of risingwave. If user wants to commit changes to iceberg table less frequently than risingwave checkpoint, they can set `commit_interval` to a larger value. It's worth discussion that if we require that `commit_interal` to be an exact multiplier of `checkpoint_interval`. + +1. We enforce that `commit_interval` to be an exact multiplier of `checkpoint_interval`. In this case, risingwave should report an error when creating iceberg sink if violated. For example, if `checkpoint_interval` is 1 minute, and user's input is `commit_interval` is `2m 30s`, then risingwave should report an error. +2. We don't enforce that, but we will ceil `commit_interval` to the nearest multiplier of `checkpoint_interval`. For example, if `checkpoint_interval` is 1 minute, and user's input is `commit_interval` is `2m 30s`, then risingwave will round it to `3m` and report a warning to user. + +Personally I'm in favor of first option, which will not introduce any surprise to user. + +### Implementation + +To simplify failure recovery, we still need to commit changes to iceberg table when risingwave checkpoint is triggered, but we don't need to commit changes to iceberg table every time. In following example let's assume that $ commit\_interval \div checkpoint\_interval $ is $5$. + +#### Approach 1 + +Modify sink log writer to trigger sink checkpoint every 5 checkpoint. + +Pros of this approach: + +1. Easier to implement. +2. Easier to apply to more sinks (such as delta lake). + +Cons of this approach: + +1. This will increase failure recovery time for iceberg sink. For example, when the commit interval is set to 30 minutes, and sink failed in the 29 minute, we will need to replay all data for the first 29 minutes. + +#### Approach 2 + +In this approach we don't do any modification to sink log writer, but modifies iceberg sink. Instead of committing all data files to iceberg table in every checkpoint, we flush data into parquet files, and save the flushed file paths into state table. Following graph illustrates the case: + +![Write parquet every checkpoint](images/0078-iceberg-sink-decouple-checkpoint/write_parquet_per_cp.svg) + +While this method is simple enough, we may still experience small file problems if we do checkpoint frequently. We can further decouple flushing parquet file with checkpoint. Instead of flusing parquet files in every checkpoint, we flush parquet row groups. Following diagram illustraes the process: + +![Write row group every checkpoint](images/0078-iceberg-sink-decouple-checkpoint/write_row_group_per_cp.svg) + +There are chances we don't even need to flush row group. For example we can save the record sequence id of current row in log store to skip flusing row group, but I don't introduce to much dependency on such characteristics of log store to make things more complicated. One row group only adds a record in parquet's `FileMetaData`, and it has no impact on other readers of parquet. + +The content of `SerializedFileWriter` can be found [here](https://docs.rs/parquet/latest/src/parquet/file/writer.rs.html#128-140), essentially it's row group metadata and some statistics information. We need to fork and maintain `parquet` to extract them. + +The states in sink executor are essentially maps from `(executor id, partition value)` to `filename list`. When handling scaling in/out of actors, we should do a commit to iceberg table. There are other possible solutions to better handle it, but given that this doesn't happen frequently, I think it's acceptable without introducing too much complexity. + +As with how to append to parquet file after closing, we take different approaches for different file systems. If the file system supports `append` operation, it would be quite easy for our implementation. For s3, we are supposed to use [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) to simulate this process. + +Pros of this approach: + +1. It enables faster failure recovery, e.g. we only need to replay data for each checkpoint. +2. It enables faster checkpoint without hurting iceberg performance. +3. It can be easily applied to other lakehouse formats (such as delta lake) + +Cons of this approach: + +1. Compared with approach 1, the engineering effort and complexity is much more. + +## Discussion diff --git a/rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_parquet_per_cp.svg b/rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_parquet_per_cp.svg new file mode 100644 index 00000000..046c467b --- /dev/null +++ b/rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_parquet_per_cp.svg @@ -0,0 +1,34 @@ +Sink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +Executor1.parquet1.parquet1.parquet1.parquet1.parquet1.parquet1.parquet1.parquet9.parquet9.parquet1.parquet1.parquet2.parquet2.parquet2.parquet2.parquet2.parquet2.parquet2.parquet2.parquet2.parquet2.parquet3.parquet3.parquet3.parquet3.parquet3.parquet3.parquet3.parquet3.parquet4.parquet4.parquet4.parquet4.parquet4.parquet4.parquet5.parquet5.parquet5.parquet5.parquet5.parquet5.parquet6.parquet6.parquet6.parquet6.parquet7.parquet7.parquet8.parquet8.parquetcheckpoint1:  +10scheckpoint1:  +10scheckpoint2:  +20scheckpoint2:  +20scheckpoint3:  +30scheckpoint3:  +30scheckpoint4:  +40scheckpoint4:  +40scheckpoint6:  +60scheckpoint6:  +60scheckpoint5:  +50scheckpoint5:  +50sSink  +CooridnatorSink  +Cooridnator \ No newline at end of file diff --git a/rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_row_group_per_cp.svg b/rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_row_group_per_cp.svg new file mode 100644 index 00000000..17b0bfca --- /dev/null +++ b/rfcs/images/0078-iceberg-sink-decouple-checkpoint/write_row_group_per_cp.svg @@ -0,0 +1,66 @@ +Sink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorSink  +ExecutorRow  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 1Row  +Group 2Row  +Group 2Row  +Group 2Row  +Group 2Row  +Group 2Row  +Group 2Row  +Group 3Row  +Group 31.parquet1.parquet1.parquet1.parquet1.parquet1.parquet2.parquet2.parquetcheckpoint1:  +10scheckpoint1:  +10scheckpoint2:  +20scheckpoint2:  +20scheckpoint3:  +30scheckpoint3:  +30scheckpoint4:  +40scheckpoint4:  +40scheckpoint6:  +60scheckpoint6:  +60scheckpoint5:  +50scheckpoint5:  +50sSink  +CooridnatorSink  +Cooridnator1.parquet.part1.parquet.part3.parquet.part3.parquet.part1.parquet.part1.parquet.part2.parquet.part2.parquet.part2.parquet.part2.parquet.part1.parquet.footer.part1.parquet.footer.part3.parquet.footer.part3.parquet.footer.part1.parquet.footer.part1.parquet.footer.part2.parquet.footer.part2.parquet.footer.part2.parquet.footer.part2.parquet.footer.partSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterSerializedFil +eWriterRow  +Group 1Row  +Group 1Row  +Group 2Row  +Group 2 \ No newline at end of file From 538ede20f59dae7936bf98b48f6fb58178cf5011 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 23 Nov 2023 11:08:55 +0800 Subject: [PATCH 2/5] Fix according discussion --- rfcs/0078-iceberg-sink-decouple-checkpoint.md | 67 +++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/rfcs/0078-iceberg-sink-decouple-checkpoint.md b/rfcs/0078-iceberg-sink-decouple-checkpoint.md index eb62120e..d737ea31 100644 --- a/rfcs/0078-iceberg-sink-decouple-checkpoint.md +++ b/rfcs/0078-iceberg-sink-decouple-checkpoint.md @@ -1,7 +1,7 @@ --- feature: Decouple iceberg sink commit from risingwave checkpoint authors: -- "liurenjie1024" +- "Renjie Liu, Yiming Wen" start_date: "2023/11/20" --- @@ -9,7 +9,7 @@ authors: ## Motivation -Currently, the iceberg sink commit is coupled with risingwave checkpoint, e.g. we commit changes to iceberg table when risingwave checkpoint is triggered. During our talk with some lakehouse user, we found that they want to commit changes to iceberg table less frequently than risingwave checkpoint. For example, they want to do checkpoint every several seconds, while commit changes to iceberg table should be much less frequent, e.g. every 5 to 10 minutes. Frequently commit changes to iceberg table will cause too many small files in iceberg table, which will cause performance issue when querying or maintaining iceberg table. In this proposal we want to allow user to have some way to control the frequency of committing changes to iceberg table so that it's more flexible. +Currently, the iceberg sink commit is coupled with risingwave checkpoint, e.g. we commit changes to iceberg table when risingwave checkpoint is triggered. During our talk with some lakehouse user, we found that they want to commit changes to iceberg table less frequently than risingwave checkpoint. For example, they want to do checkpoint every several seconds for to get fresh view of materialized view, while commit changes to iceberg table should be much less frequent, e.g. every 5 to 10 minutes. Frequently commit changes to iceberg table will cause too many small files in iceberg table, which will cause performance issue when querying or maintaining iceberg table. In this proposal we want to allow user to have some way to control the frequency of committing changes to iceberg table so that it's more flexible. ## Design @@ -28,7 +28,64 @@ To simplify failure recovery, we still need to commit changes to iceberg table w #### Approach 1 -Modify sink log writer to trigger sink checkpoint every 5 checkpoint. +Modify sink log writer to trigger sink checkpoint every 5 checkpoint. Following diagram illustrates the process: + +```mermaid +sequenceDiagram + +participant LogStore +participant LogReader +participant SinkWriter + +Note right of SinkWriter: Failure recovery point from last checkpoint + +LogStore ->> LogReader: Chunk +LogReader ->> SinkWriter: Chunk +LogStore ->> LogReader: Barrier +LogReader ->> SinkWriter: Barrier +rect rgb(191, 223, 255) +LogStore ->> LogReader: Checkpoint +LogReader ->> SinkWriter: Barrier +end + +LogStore ->> LogReader: Chunk +LogReader ->> SinkWriter: Chunk +LogStore ->> LogReader: Barrier +LogReader ->> SinkWriter: Barrier +rect rgb(191, 223, 255) +LogStore ->> LogReader: Checkpoint +LogReader ->> SinkWriter: Barrier +end + +LogStore ->> LogReader: Chunk +LogReader ->> SinkWriter: Chunk +LogStore ->> LogReader: Barrier +LogReader ->> SinkWriter: Barrier +rect rgb(191, 223, 255) +LogStore ->> LogReader: Checkpoint +LogReader ->> SinkWriter: Barrier +end + +LogStore ->> LogReader: Chunk +LogReader ->> SinkWriter: Chunk +LogStore ->> LogReader: Barrier +LogReader ->> SinkWriter: Barrier +rect rgb(191, 223, 255) +LogStore ->> LogReader: Checkpoint +LogReader ->> SinkWriter: Barrier +end + +LogStore ->> LogReader: Chunk +LogReader ->> SinkWriter: Chunk +LogStore ->> LogReader: Barrier +LogReader ->> SinkWriter: Barrier +rect rgb(200, 150, 255) +LogStore ->> LogReader: Checkpoint +LogReader ->> SinkWriter: Checkpoint +end +``` + +To avoid dead lock caused by some misconfiguration, e.g. allowed in flight checkpoint number, we need to enable partial checkpoint of log store. Pros of this approach: @@ -53,9 +110,9 @@ There are chances we don't even need to flush row group. For example we can save The content of `SerializedFileWriter` can be found [here](https://docs.rs/parquet/latest/src/parquet/file/writer.rs.html#128-140), essentially it's row group metadata and some statistics information. We need to fork and maintain `parquet` to extract them. -The states in sink executor are essentially maps from `(executor id, partition value)` to `filename list`. When handling scaling in/out of actors, we should do a commit to iceberg table. There are other possible solutions to better handle it, but given that this doesn't happen frequently, I think it's acceptable without introducing too much complexity. +The states in sink executor are essentially maps from `(executor id, partition value)` to `filename list`. When handling scaling in/out of actors, we should do a commit to iceberg table. There are other possible solutions to better handle it, but given that this doesn't happen frequently, I think it's acceptable without introducing too much complexity. After we decouple sink with mv using log store, it's no longer easy to maintain state table for sink executors, so we need to submit these states coordinator in every checkpoint and store them in metastore. The worst case of number of state entries is: $ executor\_num * partition\_num\_of\_iceberg * 5 $. -As with how to append to parquet file after closing, we take different approaches for different file systems. If the file system supports `append` operation, it would be quite easy for our implementation. For s3, we are supposed to use [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) to simulate this process. +As with how to append to parquet file after closing, we need to rely on [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html), e.g. we concat different row groups to one parquet when doing actual committing. This is supported in almost all systems, even in hdfs 3, but not available hdfs 2. Pros of this approach: From 6e0d162dbbcab063f35c466815e3fc78c67a0f31 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 23 Nov 2023 11:12:40 +0800 Subject: [PATCH 3/5] Try fix math --- rfcs/0078-iceberg-sink-decouple-checkpoint.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/0078-iceberg-sink-decouple-checkpoint.md b/rfcs/0078-iceberg-sink-decouple-checkpoint.md index d737ea31..3c917b78 100644 --- a/rfcs/0078-iceberg-sink-decouple-checkpoint.md +++ b/rfcs/0078-iceberg-sink-decouple-checkpoint.md @@ -24,7 +24,7 @@ Personally I'm in favor of first option, which will not introduce any surprise t ### Implementation -To simplify failure recovery, we still need to commit changes to iceberg table when risingwave checkpoint is triggered, but we don't need to commit changes to iceberg table every time. In following example let's assume that $ commit\_interval \div checkpoint\_interval $ is $5$. +To simplify failure recovery, we still need to commit changes to iceberg table when risingwave checkpoint is triggered, but we don't need to commit changes to iceberg table every time. In following example let's assume that $commit\_interval \div checkpoint\_interval$ is $5$. #### Approach 1 From bb796e88e6a518a6dfe4511b1a50aa70522c8c7c Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 23 Nov 2023 11:14:05 +0800 Subject: [PATCH 4/5] Fix another equation --- rfcs/0078-iceberg-sink-decouple-checkpoint.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/0078-iceberg-sink-decouple-checkpoint.md b/rfcs/0078-iceberg-sink-decouple-checkpoint.md index 3c917b78..4bc5b735 100644 --- a/rfcs/0078-iceberg-sink-decouple-checkpoint.md +++ b/rfcs/0078-iceberg-sink-decouple-checkpoint.md @@ -110,7 +110,7 @@ There are chances we don't even need to flush row group. For example we can save The content of `SerializedFileWriter` can be found [here](https://docs.rs/parquet/latest/src/parquet/file/writer.rs.html#128-140), essentially it's row group metadata and some statistics information. We need to fork and maintain `parquet` to extract them. -The states in sink executor are essentially maps from `(executor id, partition value)` to `filename list`. When handling scaling in/out of actors, we should do a commit to iceberg table. There are other possible solutions to better handle it, but given that this doesn't happen frequently, I think it's acceptable without introducing too much complexity. After we decouple sink with mv using log store, it's no longer easy to maintain state table for sink executors, so we need to submit these states coordinator in every checkpoint and store them in metastore. The worst case of number of state entries is: $ executor\_num * partition\_num\_of\_iceberg * 5 $. +The states in sink executor are essentially maps from `(executor id, partition value)` to `filename list`. When handling scaling in/out of actors, we should do a commit to iceberg table. There are other possible solutions to better handle it, but given that this doesn't happen frequently, I think it's acceptable without introducing too much complexity. After we decouple sink with mv using log store, it's no longer easy to maintain state table for sink executors, so we need to submit these states coordinator in every checkpoint and store them in metastore. The worst case of number of state entries is: $executor\_num * partition\_num\_of\_iceberg * 5$. As with how to append to parquet file after closing, we need to rely on [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html), e.g. we concat different row groups to one parquet when doing actual committing. This is supported in almost all systems, even in hdfs 3, but not available hdfs 2. From b097c04e45b24a1d6e8328ddfdfdbcbbd488a9b8 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 21 May 2024 11:04:25 +0800 Subject: [PATCH 5/5] Update 0078-iceberg-sink-decouple-checkpoint.md --- rfcs/0078-iceberg-sink-decouple-checkpoint.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/0078-iceberg-sink-decouple-checkpoint.md b/rfcs/0078-iceberg-sink-decouple-checkpoint.md index 4bc5b735..0592a32a 100644 --- a/rfcs/0078-iceberg-sink-decouple-checkpoint.md +++ b/rfcs/0078-iceberg-sink-decouple-checkpoint.md @@ -2,7 +2,7 @@ feature: Decouple iceberg sink commit from risingwave checkpoint authors: - "Renjie Liu, Yiming Wen" - start_date: "2023/11/20" +start_date: "2023/11/20" --- # RFC: Decouple iceberg sink commit from risingwave checkpoint