Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): enhance more sinks to support commit_checkpoint_interval #17383

Closed
fuyufjh opened this issue Jun 21, 2024 · 10 comments
Closed

feat(sink): enhance more sinks to support commit_checkpoint_interval #17383

fuyufjh opened this issue Jun 21, 2024 · 10 comments

Comments

@fuyufjh
Copy link
Member

fuyufjh commented Jun 21, 2024

Is your feature request related to a problem? Please describe.

Many OLAP systems including BigQuery, ClickHouse, Iceberg, etc., perform poorly when inserting data in small batches. Currently, constrained by our checkpointing interval, RisingWave can only batch data within 1 second. In cases with tens of parallelism, this approach results in poor performance.

Describe the solution you'd like

In Iceberg Sink we have introduced an option commit_checkpoint_interval.

// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,

Based on log store, it can batch data in a wider range, crossing multiple checkpoint epoches.

Describe alternatives you've considered

NA

Additional context

Today I met a case of poor performance in BigQuery Sink. We may start from this.

@github-actions github-actions bot added this to the release-1.10 milestone Jun 21, 2024
@xxhZs
Copy link
Contributor

xxhZs commented Jun 21, 2024

Similar Issues #17223

@wenym1
Copy link
Contributor

wenym1 commented Jun 21, 2024

We can run some benckmarks to test how the throughput will be affected when we try to commit more things once in a batch. Can test bigqueyr and clickhouse first. cc @xxhZs

@wcy-fdu
Copy link
Contributor

wcy-fdu commented Jun 21, 2024

The same issue also needs to be considered on the file sink side. Currently file sink can only write file when checkpoint comes. Although Flink will also force flush once when the checkpoint arrives, its checkpoint frequency is very low so the impact is not significant. For this type of sink, decoupling the sink is equivalent to batch writing files in some way.
For example in flink

  • It contains at least 15 minutes worth of data
  • It hasn’t received new records for the last 5 minutes
  • The file size has reached 1 GB (after writing the last record)

@xxhZs
Copy link
Contributor

xxhZs commented Jun 24, 2024

We can run some benckmarks to test how the throughput will be affected when we try to commit more things once in a batch. Can test bigqueyr and clickhouse first. cc @xxhZs

After running the bench, I found that the throughput on and off is basically the same

@xxhZs
Copy link
Contributor

xxhZs commented Jun 24, 2024

And this pr also has a bench test on sr.#16816

@wenym1
Copy link
Contributor

wenym1 commented Jun 24, 2024

We can run some benckmarks to test how the throughput will be affected when we try to commit more things once in a batch. Can test bigqueyr and clickhouse first. cc @xxhZs

After running the bench, I found that the throughput on and off is basically the same

@xxhZs Could you paste some concrete number about the throughputs and the experiment settings?

@xxhZs
Copy link
Contributor

xxhZs commented Jun 25, 2024

we use sink bench , and test 300s, the result is :
bigquery:
commit_checkpoint_interval = 1:
avg 105691
p90 Some(112191)
p95 Some(115124)
p99 Some(119399)
commit_checkpoint_interval = 10:
avg 101587
p90 Some(116913)
p95 Some(118546)
p99 Some(122634)
commit_checkpoint_interval = 50:
avg 104062
p90 Some(113777)
p95 Some(116502)
p99 Some(119873)

clickhouse:
commit_checkpoint_interval = 1:
avg 259874
p90 Some(282974)
p95 Some(288061)
p99 Some(301896)
commit_checkpoint_interval = 10:
avg 262768
p90 Some(281498)
p95 Some(288191)
p99 Some(296637)
commit_checkpoint_interval = 50:
avg 265784
p90 Some(289081)
p95 Some(295188)
p99 Some(315546)

And I found that our default bigquery.max_batch_rows used to be too small. After resizing it to a larger size, the throughput was much higher

@wenym1
Copy link
Contributor

wenym1 commented Jun 25, 2024

And I found that our default bigquery.max_batch_rows used to be too small. After resizing it to a larger size, the throughput was much higher

Interesting. What's the new size, and how much does the throughput increase? If the throughput can be greatly increased when we just simply increase this batch size, I suspect that most time is wasted on waiting for the response from response stream. Can you try asynchronously waiting for response from stream and do the truncation, in this way we won't be blocked by waiting for the response and may get better throughput.

Besides, currently we create a AppendRowsRequestRows for each stream chunk, and when we write rows, we may have several write requests. Have you tried combining all rows that is going to be written, into a single AppendRowsRequestRows?

And what's the number of parallelisms in the test?

@xxhZs
Copy link
Contributor

xxhZs commented Jun 26, 2024

More detailed bench for bigquery sinks
no aysnc + max_batch_rows = 1024 + commit_checkpoint_interval = 1:
avg: 4593 rows/s
p90: 6144 rows/s
p95: 6144 rows/s
p99: 6144 rows/s
aysnc + max_batch_rows = 1024 + commit_checkpoint_interval = 1:
avg: 4377 rows/s
p90: 6144 rows/s
p95: 6144 rows/s
p99: 6144 rows/s
no aysnc + max_batch_rows = 10240 + commit_checkpoint_interval = 1:
avg: 18202 rows/s
p90: 20480 rows/s
p95: 22528 rows/s
p99: 28672 rows/s
aysnc + max_batch_rows = 10240 + commit_checkpoint_interval = 1:
Throughput Sink:
avg: 33255 rows/s
p90: 40960 rows/s
p95: 40960 rows/s
p99: 42112 rows/s
no aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 1:
avg: 60873 rows/s
p90: 117888 rows/s
p95: 122880 rows/s
p99: 126976 rows/s
aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 1:
avg: 118708 rows/s
p90: 260096 rows/s
p95: 260096 rows/s
p99: 284672 rows/s
no aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 10:
avg: 57629 rows/s
p90: 118382 rows/s
p95: 120832 rows/s
p99: 126482 rows/s
aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 10:
avg: 108911 rows/s
p90: 116736 rows/s
p95: 120832 rows/s
p99: 268288 rows/s
no aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 50:
avg: 60745 rows/s
p90: 114688 rows/s
p95: 118784 rows/s
p99: 122880 rows/s
aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 50:
avg: 96614 rows/s
p90: 106496 rows/s
p95: 110592 rows/s
p99: 116736 rows/s
without async, avg throughput will be smaller, due to the fact that an oversized batch will wait for a long time while the bg is being written, resulting in a throughput of 0 for that period of time

@fuyufjh
Copy link
Member Author

fuyufjh commented Jul 10, 2024

Tracked in #17095

@fuyufjh fuyufjh closed this as not planned Won't fix, can't repro, duplicate, stale Jul 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants