Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Jul 22, 2024
1 parent 59fbfd2 commit 2b42032
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 39 deletions.
1 change: 1 addition & 0 deletions rust/worker/src/blockstore/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
}

pub(super) async fn load_blocks(&self, block_ids: Vec<Uuid>) -> () {
// TODO: These need to be tasks enqueued onto dispatcher.
let mut futures = Vec::new();
for block_id in block_ids {
futures.push(self.get_block(block_id));
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/blockstore/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl BlockManager {
).await;
match stream {
Ok(mut bytes) => {
let read_block_span = tracing::trace_span!(parent: Span::current(), "BlockManager read bytes to end for block get");
let read_block_span = tracing::trace_span!(parent: Span::current(), "BlockManager read bytes to end");
let buf = read_block_span.in_scope(|| async {
let mut buf: Vec<u8> = Vec::new();
while let Some(res) = bytes.next().await {
Expand All @@ -243,7 +243,7 @@ impl BlockManager {
return None;
}
};
tracing::info!("Read {:?} bytes from s3 for block get", buf.len());
tracing::info!("Read {:?} bytes from s3", buf.len());
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
let block = deserialization_span.in_scope(|| Block::from_bytes(&buf, *id));
match block {
Expand Down
19 changes: 0 additions & 19 deletions rust/worker/src/execution/operators/merge_knn_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,6 @@ impl Operator<MergeKnnResultsOperatorInput, MergeKnnResultsOperatorOutput>
if input.include_vectors {
hnsw_result_vectors = Some(Vec::new());
}
// Prefetch data.
// Note: This will be moved to an operator.
let offset_ids = input
.hnsw_result_offset_ids
.iter()
.map(|x| *x as u32)
.collect();
reader
.prefetch_id_to_user_id(&offset_ids)
.instrument(
tracing::trace_span!(parent: Span::current(), "[MergeKnnResultsOperator] Prefetch id to user id"),
)
.await;
reader
.prefetch_id_to_data(&offset_ids)
.instrument(
tracing::trace_span!(parent: Span::current(), "[MergeKnnResultsOperator] Prefetch id to data"),
)
.await;
for offset_id in &input.hnsw_result_offset_ids {
let user_id = reader.get_user_id_for_offset_id(*offset_id as u32).await;
match user_id {
Expand Down
18 changes: 0 additions & 18 deletions rust/worker/src/execution/operators/merge_metadata_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,6 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
}
}
};
// Prefetch data.
// Note: This will be moved to an operator.
let merged_ids_copy = merged_ids.clone();
let mut ids_to_hydrate = Vec::new();
for merged_id in merged_ids_copy {
if visited_ids.contains(&merged_id) {
continue;
}
ids_to_hydrate.push(merged_id);
}
record_segment_reader
.prefetch_id_to_data(&ids_to_hydrate)
.instrument(tracing::trace_span!(parent: Span::current(), "[MergeMetadataResults] Prefetch id to data"))
.await;
record_segment_reader
.prefetch_id_to_user_id(&ids_to_hydrate)
.instrument(tracing::trace_span!(parent: Span::current(), "[MergeMetadataResults] Prefetch id to user id"))
.await;
for merged_id in merged_ids {
// Skip already taken from log.
if visited_ids.contains(&merged_id) {
Expand Down

0 comments on commit 2b42032

Please sign in to comment.