Skip to content

Commit

Permalink
record segment and blockfile apis
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Jul 22, 2024
1 parent 28b3739 commit 59fbfd2
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 6 deletions.
25 changes: 25 additions & 0 deletions rust/worker/src/blockstore/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::blockstore::key::KeyWrapper;
use crate::blockstore::BlockfileError;
use crate::errors::ErrorCodes;
use crate::{blockstore::key::CompositeKey, errors::ChromaError};
use futures::future::join_all;
use parking_lot::Mutex;
use std::mem::transmute;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -274,6 +275,30 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
None
}

pub(super) async fn load_blocks(&self, block_ids: Vec<Uuid>) -> () {
let mut futures = Vec::new();
for block_id in block_ids {
futures.push(self.get_block(block_id));
}
join_all(futures).await;
}

pub(crate) async fn load_blocks_for_keys(&self, prefixes: &Vec<&str>, keys: &Vec<K>) -> () {
let mut composite_keys = Vec::new();
let mut prefix_iter = prefixes.iter();
let mut key_iter = keys.iter();
while let Some(prefix) = prefix_iter.next() {
if let Some(key) = key_iter.next() {
let composite_key = CompositeKey::new(prefix.to_string(), key.clone());
composite_keys.push(composite_key);
}
}
let target_block_ids = self
.sparse_index
.get_all_target_block_ids(&mut composite_keys);
self.load_blocks(target_block_ids).await;
}

pub(crate) async fn get(&'me self, prefix: &str, key: K) -> Result<V, Box<dyn ChromaError>> {
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.sparse_index.get_target_block_id(&search_key);
Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/blockstore/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
};
use async_trait::async_trait;
use core::panic;
use futures::StreamExt;
use futures::{future::join_all, StreamExt};
use thiserror::Error;
use tracing::{Instrument, Span};
use uuid::Uuid;
Expand Down Expand Up @@ -218,7 +218,7 @@ impl BlockManager {
).await;
match stream {
Ok(mut bytes) => {
let read_block_span = tracing::trace_span!(parent: Span::current(), "BlockManager read bytes to end");
let read_block_span = tracing::trace_span!(parent: Span::current(), "BlockManager read bytes to end for block get");
let buf = read_block_span.in_scope(|| async {
let mut buf: Vec<u8> = Vec::new();
while let Some(res) = bytes.next().await {
Expand All @@ -243,7 +243,7 @@ impl BlockManager {
return None;
}
};
tracing::info!("Read {:?} bytes from s3", buf.len());
tracing::info!("Read {:?} bytes from s3 for block get", buf.len());
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
let block = deserialization_span.in_scope(|| Block::from_bytes(&buf, *id));
match block {
Expand Down
43 changes: 42 additions & 1 deletion rust/worker/src/blockstore/arrow/sparse_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::blockstore::key::{CompositeKey, KeyWrapper};
use crate::errors::ChromaError;
use core::panic;
use parking_lot::Mutex;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -99,6 +99,47 @@ impl SparseIndex {
reverse.insert(block_id, SparseIndexDelimiter::Start);
}

pub(super) fn get_all_target_block_ids(
&self,
search_keys: &mut Vec<CompositeKey>,
) -> Vec<Uuid> {
// Sort so that we can search in one iteration.
search_keys.sort();
let mut result_uuids = Vec::new();
let forward = self.forward.lock();
let mut curr_iter = forward.iter();
let mut next_iter = forward.iter().skip(1);
let mut search_iter = search_keys.iter().peekable();
while let Some((curr_key, curr_block_id)) = curr_iter.next() {
let search_key = match search_iter.peek() {
Some(key) => SparseIndexDelimiter::Key((**key).clone()),
None => {
break;
}
};
if let Some((next_key, _)) = next_iter.next() {
if search_key >= *curr_key && search_key < *next_key {
result_uuids.push(*curr_block_id);
// Move forward all search keys that match this block.
search_iter.next();
while let Some(key) = search_iter.peek() {
let search_key = SparseIndexDelimiter::Key((**key).clone());
if search_key >= *curr_key && search_key < *next_key {
search_iter.next();
} else {
break;
}
}
}
} else {
// last block. All the remaining keys should be satisfied by this.
result_uuids.push(*curr_block_id);
break;
}
}
result_uuids
}

pub(super) fn get_target_block_id(&self, search_key: &CompositeKey) -> Uuid {
let forward = self.forward.lock();
let mut iter_curr = forward.iter();
Expand Down
9 changes: 9 additions & 0 deletions rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,13 @@ impl<
BlockfileReader::ArrowBlockfileReader(reader) => reader.id(),
}
}

pub(crate) async fn load_blocks_for_keys(&self, prefixes: &Vec<&str>, keys: &Vec<K>) -> () {
match self {
BlockfileReader::MemoryBlockfileReader(reader) => unimplemented!(),
BlockfileReader::ArrowBlockfileReader(reader) => {
reader.load_blocks_for_keys(prefixes, keys).await
}
}
}
}
21 changes: 20 additions & 1 deletion rust/worker/src/execution/operators/merge_knn_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};
use async_trait::async_trait;
use thiserror::Error;
use tracing::{Instrument, Span};

#[derive(Debug)]
pub struct MergeKnnResultsOperator {}
Expand Down Expand Up @@ -95,7 +96,25 @@ impl Operator<MergeKnnResultsOperatorInput, MergeKnnResultsOperatorOutput>
if input.include_vectors {
hnsw_result_vectors = Some(Vec::new());
}

// Prefetch data.
// Note: This will be moved to an operator.
let offset_ids = input
.hnsw_result_offset_ids
.iter()
.map(|x| *x as u32)
.collect();
reader
.prefetch_id_to_user_id(&offset_ids)
.instrument(
tracing::trace_span!(parent: Span::current(), "[MergeKnnResultsOperator] Prefetch id to user id"),
)
.await;
reader
.prefetch_id_to_data(&offset_ids)
.instrument(
tracing::trace_span!(parent: Span::current(), "[MergeKnnResultsOperator] Prefetch id to data"),
)
.await;
for offset_id in &input.hnsw_result_offset_ids {
let user_id = reader.get_user_id_for_offset_id(*offset_id as u32).await;
match user_id {
Expand Down
18 changes: 18 additions & 0 deletions rust/worker/src/execution/operators/merge_metadata_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,24 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
}
}
};
// Prefetch data.
// Note: This will be moved to an operator.
let merged_ids_copy = merged_ids.clone();
let mut ids_to_hydrate = Vec::new();
for merged_id in merged_ids_copy {
if visited_ids.contains(&merged_id) {
continue;
}
ids_to_hydrate.push(merged_id);
}
record_segment_reader
.prefetch_id_to_data(&ids_to_hydrate)
.instrument(tracing::trace_span!(parent: Span::current(), "[MergeMetadataResults] Prefetch id to data"))
.await;
record_segment_reader
.prefetch_id_to_user_id(&ids_to_hydrate)
.instrument(tracing::trace_span!(parent: Span::current(), "[MergeMetadataResults] Prefetch id to user id"))
.await;
for merged_id in merged_ids {
// Skip already taken from log.
if visited_ids.contains(&merged_id) {
Expand Down
23 changes: 22 additions & 1 deletion rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::types::{MaterializedLogRecord, SegmentWriter};
use super::{DataRecord, SegmentFlusher};
use crate::blockstore::arrow::types::ArrowReadableKey;
use crate::blockstore::key::KeyWrapper;
use crate::blockstore::provider::{BlockfileProvider, CreateError, OpenError};
use crate::blockstore::{BlockfileFlusher, BlockfileReader, BlockfileWriter};
use crate::blockstore::{BlockfileFlusher, BlockfileReader, BlockfileWriter, Key};
use crate::errors::{ChromaError, ErrorCodes};
use crate::execution::data::data_chunk::Chunk;
use crate::types::{MaterializedLogOperation, Operation, Segment, SegmentType};
Expand Down Expand Up @@ -811,4 +813,23 @@ impl RecordSegmentReader<'_> {
pub(crate) async fn count(&self) -> Result<usize, Box<dyn ChromaError>> {
self.id_to_data.count().await
}

pub(crate) async fn prefetch_id_to_data(&self, keys: &Vec<u32>) -> () {
let prefixes = vec![""; keys.len()];
self.id_to_data.load_blocks_for_keys(&prefixes, keys).await
}

pub(crate) async fn prefetch_user_id_to_id(&self, keys: &Vec<&str>) -> () {
let prefixes = vec![""; keys.len()];
self.user_id_to_id
.load_blocks_for_keys(&prefixes, keys)
.await
}

pub(crate) async fn prefetch_id_to_user_id(&self, keys: &Vec<u32>) -> () {
let prefixes = vec![""; keys.len()];
self.id_to_user_id
.load_blocks_for_keys(&prefixes, keys)
.await
}
}

0 comments on commit 59fbfd2

Please sign in to comment.