Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Decouple iceberg sink commit from risingwave checkpoint #78

Merged
merged 5 commits into from
May 21, 2024

Conversation

liurenjie1024
Copy link
Contributor

@liurenjie1024 liurenjie1024 commented Nov 21, 2023

@liurenjie1024 liurenjie1024 marked this pull request as draft November 21, 2023 10:51
@liurenjie1024 liurenjie1024 marked this pull request as ready for review November 23, 2023 03:09

![Write parquet every checkpoint](images/0078-iceberg-sink-decouple-checkpoint/write_parquet_per_cp.svg)

While this method is simple enough, we may still experience small file problems if we do checkpoint frequently. We can further decouple flushing parquet file with checkpoint. Instead of flusing parquet files in every checkpoint, we flush parquet row groups. Following diagram illustraes the process:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can work only if the row group boundary can be completely controlled by ourselves. In other words, we can close a row group immediately when iceberge sink receives the checkpoint barrier. Is this already supported in icelake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


![Write row group every checkpoint](images/0078-iceberg-sink-decouple-checkpoint/write_row_group_per_cp.svg)

There are chances we don't even need to flush row group. For example we can save the record sequence id of current row in log store to skip flusing row group, but I don't introduce to much dependency on such characteristics of log store to make things more complicated. One row group only adds a record in parquet's `FileMetaData`, and it has no impact on other readers of parquet.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

save the flushed file paths into state table.

I think we can record the current write position in the state table instead of log store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will commit to meta store.


1. This will increase failure recovery time for iceberg sink. For example, when the commit interval is set to 30 minutes, and sink failed in the 29 minute, we will need to replay all data for the first 29 minutes.

#### Approach 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the approach will be much more complicated if we want to support updates/deletes on Iceberg Sink, saying, if a deleted row is in the previous RowGroup instead of a previous iceberg version, how to handle it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little more complicated, but not that much. We can further discuss this later when we want to take approach 2.

@ZENOTME
Copy link

ZENOTME commented Nov 28, 2023

In this approach we don't do any modification to sink log writer, but modifies iceberg sink. Instead of committing all data files to iceberg table in every checkpoint, we flush data into parquet files, and save the flushed file paths into state table. Following graph illustrates the case:

In this case, maybe we don't need to flush data into parquet files every checkpoint to avoid small file.🤔


Pros of this approach:

1. Easier to implement.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a key benefit of this solution. I suggest adding less optimizations at this point. We need to test the stability against real-life workloads, so please don't over-design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, let's solve the most urgent problem first, and refine it when necessary.

@liurenjie1024
Copy link
Contributor Author

Conclusion: we will take approach 1.

@fuyufjh fuyufjh merged commit e90b4f9 into main May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants