-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support async for bigquery sink #17488
Conversation
recopver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we can avoid spawning a tokio task to poll the response stream. We can implement our own BigQueryLogSinker
, which polls the log_reader and the response_stream with a select
.
src/connector/src/sink/big_query.rs
Outdated
let (expect_offset, mut rx) = self.client.get_subscribe(); | ||
let future = Box::pin(async move { | ||
loop { | ||
match rx.recv().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think instead you can create a oneshot channel for each AppendRowsRequest so that you won't need this shared broadcast channel. When the spawned worker knows that a request is handled, it notifies the rx with the oneshot tx, and here the future can be simply the rx.
let append_req = AppendRowsRequest { | ||
write_stream: write_stream.clone(), | ||
offset: None, | ||
trace_id: Uuid::new_v4().hyphenated().to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you tracked the trace_id
locally instead of incrementing the self.offset
? I think in the response stream, it should return the same trace_id in the corresponding response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried it, but there is no ID in the returned message
Since each row of bg can't exceed 10MB, new logic is added to split the size of each row |
src/connector/src/sink/big_query.rs
Outdated
while resp_num > 1 { | ||
self.offset_queue.push_back(None); | ||
resp_num -= 1; | ||
} | ||
self.offset_queue.push_back(Some(offset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why inserting resp_num - 1
None
s and 1 Some(offset)
? Could you please add some comments here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because if a chunk is too large, we will split it into multiple async tasks, so we need to wait until all the tasks are finished before we can truncate, here none means that he is only the middle async task is completed, to the chunk before you can truncate.
And i will add comments.
src/connector/src/sink/big_query.rs
Outdated
|
||
fn default_retry_times() -> usize { | ||
5 | ||
pub async fn wait_next_offset(&mut self) -> Result<Option<TruncateOffset>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess async fn next_offset
is enough to indicate that the function call may need to be "waited".
src/connector/src/sink/big_query.rs
Outdated
if let Some(Some(TruncateOffset::Barrier { .. })) = self.offset_queue.front() { | ||
return Ok(self.offset_queue.pop_front().unwrap()); | ||
} | ||
self.resp_stream | ||
.next() | ||
.await | ||
.ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??; | ||
self.offset_queue.pop_front().ok_or_else(|| { | ||
SinkError::BigQuery(anyhow::anyhow!( | ||
"should have pending chunk offset when we receive new response" | ||
)) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we can only directly pop and return if the front of queue is TruncateOffset::Barrier
? Is it possible to have other variants of TruncateOffset
in the queue? And what happen if there are?
Could you please add some comments here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't go to the bg sink to create an async task when we receive a barrier, there's no need to wait here for the
And i will add comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't go to the bg sink to create an async task when we receive a barrier, there's no need to wait here for the
Can't quite get it. Could you elaborate in proper English?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 3 cases:
- Some(barrier), since we won't be sending a barrier to bigquery, there is no need to wait for the. But we will record the barriers in the log store, so we need to truncate.
- Some(chunk), since we send a chunk to bigquery and record it in the log store, so we need to truncate and wait for resp.
- None, It means that we split a large chunk of RW into multiple requests for bigquery (for this chunk, in offset_queue, we set it to none , none ... chunk). So we need to wait for all the none and chunk resps and truncate in Some(chunk)
} | ||
pub struct BigQueryLogSinker { | ||
writer: BigQuerySinkWriter, | ||
bigquery_future_manager: BigQueryFutureManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not future_manager
? we don't need to add so many prefixes in private fields/types. And I guess it may not be so suitable to call it "manager" here.
Also, do we really need a BigQueryFutureManager
type here? It seems don't "manage" anything, we still directly access bigquery_future_manager.offset_queue
below😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For async sink, we have a FutureManager to manage the asyn tasks and truncate our log store chunk after the async tasks are done. this BigQueryFutureManager has the same tasks as FutureManager, just customized for bg sink
src/connector/src/sink/big_query.rs
Outdated
}) | ||
let mut client = conn.conn(); | ||
|
||
let (tx, rx) = mpsc::channel(BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use bounded channel here since we already limit the queue size in the select!
src/connector/src/sink/big_query.rs
Outdated
.map_err(|e| SinkError::BigQuery(e.into()))? | ||
.into_inner(); | ||
loop { | ||
if let Some(append_rows_response) = resp_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we reach the end of the resp_stream
, we will keep yielding (). This is not correct. Instead of using if let Some(...)
, we'd better just resp_stream.message().await.ok_or_else(|| ...)
to turn None
into an end-of-stream error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't return () at the end of the stream, after receiving every reply, if there are no errors, it returns a meaningless (),
The goal is to return that a message was received with no errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the purpose of returning a meaningless ()
. What I meant was that, when resp_stream.message()
returns None, we should break the loop with an EndOfStream
error, or just simply break the loop, instead of ignoring it.
In current code, after resp_stream returns None, an ()
will still be yielded, and later when polled again, and resp_stream returns None again, and an ()
is still yielded, and this will be repeated endlessly over and over again, while the external code is not aware that the response stream has actually stopped.
src/connector/src/sink/big_query.rs
Outdated
if let Some(Some(TruncateOffset::Barrier { .. })) = self.offset_queue.front() { | ||
return Ok(self.offset_queue.pop_front().unwrap()); | ||
} | ||
self.resp_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the correctness depends on that the number of non-barrier items (either None or Some(TruncateOffset::Chunk)) in the queue is the same as the inflight request. However, in the implementation, it's possible to have 0 resp_num
when we write a chunk. For this chunk, there is no inflight request, but it will have an item in the queue, which causes inconsistency between the number of queue items and inflight requests.
I think instead of using none to represent that a chunk is split into multiple requests, we'd better store (TruncateOffset, remaining_resp_num) as the queue item. Code will be like the following
if let Some((offset, remaining_resp_num)) = self.offset_queue.front_mut() {
if *remaining_resp_num == 0 {
return Ok(self.offset_queue.pop_front().unwrap().0);
}
while *remaining_resp_num > 0 {
self.resp_stream.next().await...??;
*remaining_resp_num -= 1;
}
} else {
return pending().await;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
src/bench/sink_bench/sink_option.yml
Outdated
@@ -100,8 +100,8 @@ Starrocks: | |||
BigQuery: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert the change.
fn default_max_batch_rows() -> usize { | ||
1024 | ||
struct BigQueryFutureManager { | ||
// `offset_queue` holds the Some corresponding to each future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the comment.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
in #17383 (comment)
We found that waiting to write data can block for a long time, affecting throughput.
So we support async for bigquery sink
bench with this pr:
avg: 108025 rows/s
p90: 116736 rows/s
p95: 116736 rows/s
p99: 118784 rows/s
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.