-
Notifications
You must be signed in to change notification settings - Fork 216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): support scan row handler only #447
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The storage part looks generally good (and I have some new ideas for that), but the binder part doesn't looks perfect in join statements.
The previous plan (note that it includes a bug in #388):
> create table t1(v1 int not null, v2 int);
created
> create table t2(v3 int, v4 varchar)
created
> explain select count(*) from t1 join t2 on v2=v3;
PhysicalProjection: exprs [InputRef #0]
PhysicalSimpleAgg: 1 agg calls
PhysicalHashJoin: op Inner, left_index 0, right_index 0, predicate: Bool(true) (const)
PhysicalTableScan: table #8, columns [1, 0], with_row_handler: false, is_sorted: false, expr: None
PhysicalTableScan: table #9, columns [0], with_row_handler: false, is_sorted: false, expr: None
The new plan:
> explain select count(*) from t1 join t2 on v2=v3;
PhysicalProjection: exprs [InputRef #0]
PhysicalSimpleAgg: 1 agg calls
PhysicalHashJoin: op Inner, left_index 0, right_index 0, predicate: Bool(true) (const)
PhysicalTableScan: table #8, columns [1], with_row_handler: true, is_sorted: false, expr: None
PhysicalTableScan: table #9, columns [0], with_row_handler: true, is_sorted: false, expr: None
Indeed, if we are joining two tables, we do not need to scan the row handler -- we will always scan the join key.
From my perspective, there are two ways to support scanning row handlers only.
- Add a
RowHandlerIterator
struct, and treat it in the same way as user column iterator. - Or we can store the
current_row_id
variable inRowSetIterator
, and use this row id tosequence
the row handler iterators.
Thanks for your contribution!
if row_handler_count == column_refs.len() { | ||
panic!("no user column") | ||
} | ||
let only_scan_row_handler = row_handler_count == column_refs.len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to treat row handler column in the same way as user columns...
.indexes() | ||
.iter() | ||
.map(|x| x.row_count) | ||
.sum::<u32>() as usize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
summing the total row doesn't look like a trivial operation. There could be thousands of blocks in a single column. I think we can use the original fetch_size
for row handlers. Most executors prefer to consume chunks little by little, instead of getting them all at once.
Also, the expected_size
of next_batch
function requires that next_batch
cannot return elements more than expected_size
. This seems to violate the rule.
Thanks for your contribution! But I can't simply determine the correctness of your code :) |
The logic in RowSetIterator is correct (from my perspective). Adding some unit tests might also help. |
Sorry for the late reply, I had added the |
Thanks a lot! The |
Signed-off-by: Fedomn <fedomn.ma@gmail.com>
Signed-off-by: Fedomn <fedomn.ma@gmail.com>
Some explanations about these changes for easier understanding:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for your contribution! But IMO, this implementation doesn't look very clear to me. The main problem is with the commit 7fbbf70, which implements the RowHandlerColumnIterator
in a complex way. You may apply the following idea to implement the RowHandlerColumnIterator
.
- The
RowHandlerColumnIterator
doesn't need to have correspondingblock
and the filter-scan logic -- it is a very simple iterator that returns i64 value in an incremental order. - And therefore, there should be no change on
concrete_column_iterator
... We just need to simply implement a new RowHandlerColumnIterator.
An example:
pub struct RowHandlerColumnIterator {
rowset_id: usize,
row_count: usize,
current_row_id: usize,
}
impl RowHandlerColumnIterator {
fn new(rowset_id: usize, first_row: usize) { ... }
}
Then, we can implement
impl ColumnIterator<I64Array> for RowHandlerIterator {
type NextFuture<'a> = impl Future<Output = StorageResult<Option<(u32, I64Array)>>> + 'a;
fn next_batch(&mut self, expected_size: Option<usize>) -> Self::NextFuture<'_> {
async move { /* produce a I64Array like the original `sequence` method, and take `expected_size` into account. */ }
}
fn fetch_hint(&self) -> usize {
// simply return the remaining rows of the column
}
fn fetch_current_row_id(&self) -> u32 {
self.current_row_id
}
fn skip(&mut self, cnt: usize) {
self.current_row_id += cnt;
}
}
Then, everything should be set -- we can use the RowHandlerIterator
like other user columns. No need to implement a separate block iterator for row handler :)
For the commit with binder ede0762, LGTM, good work!
... the RowSet handler doesn't even need to know about how other columns are indexed -- it can return all its information without any I/O, so we can set its fetch_hint to the remaining items (instead of the remaining items in columns[0]'s block) |
Thanks a lot. I will apply these feedbacks. |
Sorry for my clumsiness, I still have some questions for constructing
|
Oops, I missed this point! I think there are multiple approaches to solve this...
I think the first approach (which is what you've already done days ago) looks like a good way. Instead of computing total row count in
risinglight/src/storage/secondary/rowset/rowset_iterator.rs Lines 104 to 118 in de55dfe
Therefore, If you feel there's difficulty implementing this, feel free to comment in this PR, or ping me on Slack. Thanks! |
Signed-off-by: Fedomn <fedomn.ma@gmail.com>
Signed-off-by: Fedomn <fedomn.ma@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, for the current implementation! For the remaining bugs, you may create a new issue (and send PR if you have time), thanks!
delete from t where v = 7 | ||
|
||
query I | ||
select count(*) from t where v > 5 |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PhysicalProjection: exprs [InputRef #0]
PhysicalSimpleAgg: 1 agg calls
PhysicalTableScan: table #9, columns [0], with_row_handler: true, is_sorted: false, expr: Gt(InputRef #0, Int32(5) (const))
The plan is not optimal. As we already have column 0 scanned, we don't need row handler.
We can refine the binder logic later to handle this case, and get this PR merged at first. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to close #421, but I'm not sure my solution is right. I split two cases in rowset_iterator:
for the first part, I keep the original logic which is RowCount calculated by common_chunk_range
for the second part, I add a new condition branch that returns the first column total row_count directly
Signed-off-by: Fedomn fedomn.ma@gmail.com