-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): implement read pruning by vnode #2882
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to have a new set of interface (scan_with_vnode) instead of changing existing one. We should migrate little by little.
And I think vnode information should be recorded on Keyspace (keyspace::new_with_vnode) instead of passing it everywhere. |
Indeed. I'll fix this. |
The vnodes that one executor owns are likely to change when the cluster scales in or scales out. Then we'll have to maintain the vnode info in a multi-version way in keyspace. |
If there's scale-in and scale-out, the executor will be re-created. 😇🥰 |
Codecov Report
@@ Coverage Diff @@
## main #2882 +/- ##
==========================================
- Coverage 73.47% 73.42% -0.05%
==========================================
Files 736 736
Lines 100716 101010 +294
==========================================
+ Hits 73997 74163 +166
- Misses 26719 26847 +128
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
An executor can only use the latest version of vnodes to query data. If a re-created executor wants to query data written before scaling, it will use a wrong set of vnodes and thus get incorrect result. |
New executors will always need to include their previous vnodes. We will need a separate barrier to notify compaction complete and update their vnodes. |
If a fragment scales out from 5 parallel degrees to 10, the number of vnodes owned by one parallel unit will inevitably decrease by half (since total number of vnodes is invariant). How to ensure that new executors would always include their previous vnodes? |
I think I get what you mean. new executor vnode set = UNION OF previous vnode set AND current vnode set, until all relevant compactions are done, right? |
Well, my fault, please ignore my comments.
This should never happen. Executors will only read data belonging to its own distribution. During scale-out, executors will operate on a complete different set of keys. Therefore, they will not query data written before. |
And we do not need to include previous vnode. |
&'a self, | ||
key: &'a [u8], | ||
epoch: u64, | ||
_vnode: Option<VirtualNode>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering there might not be such a "PointGet" operator, I think the type of vnodes should also be Vec<VirtualNode>
. Nevermind, it's not a big problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer always using &'a VirtualNode
, so that it will function efficiently even when vnode mapping is large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer always using &'a VirtualNode, so that it will function efficiently even when vnode mapping is large.
Will Option<&'a VirtualNode>
still cause some overhead due to the construction of Option
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it has exactly the same value size as &'a VirtualNode
.
src/storage/src/keyspace.rs
Outdated
pub async fn get_with_vnode( | ||
&self, | ||
key: impl AsRef<[u8]>, | ||
epoch: u64, | ||
vnode: VirtualNode, | ||
) -> StorageResult<Option<Bytes>> { | ||
self.store | ||
.get(&self.prefixed_key(key), epoch, Some(vnode)) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function uses the given vnode
instead of self.vnode
. What scenario should it be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used when an executor does point-get with vnode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think point get can already be optimized by bloom filter (only 0.01 false positive currently). Maybe we don't need vnode for it. But it would also be okay to use vnode to do some sanity check -- e.g. executors should not point get keys out of its vnode range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -230,15 +231,15 @@ mod tests { | |||
vec![DataType::Int64].into(), | |||
); | |||
assert!(!managed_state.is_dirty()); | |||
let columns = vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidentally reverted the change?
@@ -170,16 +185,34 @@ impl<S: StateStore> CellBasedTable<S> { | |||
|
|||
pub async fn get_row_by_scan(&self, pk: &Row, epoch: u64) -> StorageResult<Option<Row>> { | |||
// get row by state_store scan | |||
let vnode = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why CellBasedTable
need to compute vnode? I think this should be provided by executors creating CellBasedTable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just that cell based table provides such interface:
async fn batch_write_rows_inner<const WITH_VALUE_META: bool>
which I think indicates whether to compute value meta in cell based table. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Value meta needs to be computed when write of course. But for reads, isn't it true that all executors and their state table objects already have value meta assigned to them? For both point get and scan, we should use vnode provided by executors to do filters, instead of compute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we should use the same set of vnodes to do pruning, regardless of type of the read operation (point-get or range-scan). Will this lead to any inefficiency that could be avoided (e.g. less SSTs are pruned out) when we do point-get? cc. @fuyufjh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reads performed on a single vnode (e.g. we know the dist key beforehand), I think computing vnode on the fly makes sense. In other cases, I think we should just use the vnodes of the executor, which should be initialized on CellBaseTable
initialization.
src/storage/src/keyspace.rs
Outdated
vnode: VirtualNode, | ||
) -> StorageResult<Option<Bytes>> { | ||
// Construct vnode bitmap. | ||
let mut bitmap_inner = [0; VNODE_BITMAP_LEN]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code seems to appear in multiple places. Is it possible to have a VNodeBitmap::new(vnode, table_id)
, let the caller to provide a VNodeBitmap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VNodeBitmap
is actually a proto type. Maybe we should define a non-proto type for it.
@@ -170,16 +185,34 @@ impl<S: StateStore> CellBasedTable<S> { | |||
|
|||
pub async fn get_row_by_scan(&self, pk: &Row, epoch: u64) -> StorageResult<Option<Row>> { | |||
// get row by state_store scan | |||
let vnode = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reads performed on a single vnode (e.g. we know the dist key beforehand), I think computing vnode on the fly makes sense. In other cases, I think we should just use the vnodes of the executor, which should be initialized on CellBaseTable
initialization.
I believe bloom filter can already achieve a relatively low false negative. I would prefer use executor-provided vnode in all cases. |
Correct me if i am wrong, after a second thought, I think there is no such case that we don't know dist key beforehand. Therefore, we should always compute and provide a single vnode to the read interface. |
What's changed and what's your intention?
Summarize your change
After this PR gets merged, read pruning by vnode will work properly in both point-get and range-scan.
Limitations
Read pruning does NOT work in batch executor yet. This will be implemented in the future.
Checklist