-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add standalone shuffle for transformed ivf-pq vectors file #2670
Conversation
@@ -470,6 +603,8 @@ impl IvfShuffler { | |||
"Chunk loaded into memory and sorted, writing to disk at {}", | |||
path | |||
); | |||
|
|||
// TODO: The result can be lance v1 or v2. Currently it is v1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the sorted files are written using v1, this should be updated eventually (for this reason the current test is also simplified because we would have to read the lance v1 file in python. So instead we just check if the file is empty or not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like some tests are failing
while let Some(batch) = stream.next().await { | ||
let batch = batch?; | ||
let part_ids: &UInt32Array = batch | ||
.column_by_name(PART_ID_COLUMN) | ||
.expect("Partition ID column not found") | ||
.as_primitive(); | ||
part_ids.values().iter().for_each(|part_id| { | ||
partition_sizes[*part_id as usize] += 1; | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this could be potentially pulled out of the if statement if you changed it to:
let stream = if ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both streams have different types...one is BufferUnordered<Map<Iter<Range<usize>>, impl FnMut(usize) -> impl Future<Output = Result<RecordBatch, Error>>>>
and the other is Pin<Box<dyn RecordBatchStream>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you box the first one? Just add .boxed()
to it and it should also be Pin<Box<dyn RecordBatchStream>>
(I think, might be wrong)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but it's still different - Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch, lance_core::Error>> + std::marker::Send>>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2670 +/- ##
==========================================
- Coverage 79.65% 79.56% -0.09%
==========================================
Files 226 226
Lines 66245 66329 +84
Branches 66245 66329 +84
==========================================
+ Hits 52766 52776 +10
- Misses 10387 10451 +64
- Partials 3092 3102 +10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more suggestions but we can go ahead and move forward.
…#2681) Follow up of PRs #2566, #2657 and #2670 The whole pipeline can help users transform, shuffle, and load IVF-PQ indices in separate, standalone steps (can be useful when building indices for large datasets) First, the user runs `transform_vectors()` separately on selected fragments Then, they run `shuffle_transformed_vectors()` on each transformed result Finally, they load all the shuffled vectors and commit the index into the dataset using `load_shuffled_vectors()`
Makes shuffle a standalone operation (shuffles the transformed ivf-pq indices by partition id and saves the files on disk). Uses the v2 reader internally during the shuffling process.