Skip to content

Commit

Permalink
Implement get_tree() feature (#905)
Browse files Browse the repository at this point in the history
Handled get_tree() GRPC request in the CAS Server.
Used FIFO(First-In-First-Out) traversal method ( BFS Algorithm ) to
loop over directories using `VecDeque` data structure.
Paging is supported and `page_token` in `GetTreeRequest` is used to
point the directory to start traversing with. And `next_page_token` in
`GetTreeResponse` refers the `page_token` parameter which will be used
in the next request.
  • Loading branch information
aleksdmladenovic authored May 16, 2024
1 parent ba7ed50 commit ae44878
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 7 deletions.
76 changes: 69 additions & 7 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;

use bytes::Bytes;
use futures::stream::{FuturesUnordered, Stream};
use futures::TryStreamExt;
use nativelink_config::cas_server::{CasStoreConfig, InstanceName};
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use nativelink_error::{error_if, make_input_err, Code, Error, ResultExt};
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{
ContentAddressableStorage, ContentAddressableStorageServer as Server,
};
use nativelink_proto::build::bazel::remote::execution::v2::{
batch_read_blobs_response, batch_update_blobs_response, compressor, BatchReadBlobsRequest,
BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse,
BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse, Directory,
FindMissingBlobsRequest, FindMissingBlobsResponse, GetTreeRequest, GetTreeResponse,
};
use nativelink_proto::google::rpc::Status as GrpcStatus;
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
Expand Down Expand Up @@ -238,10 +239,71 @@ impl CasServer {
.into_inner();
return Ok(Response::new(Box::pin(stream)));
}
Err(make_err!(
Code::Unimplemented,
"get_tree is not implemented"
))
let store_pin = Pin::new(store.as_ref());
let root_digest: DigestInfo = inner_request
.root_digest
.err_tip(|| "Expected root_digest to exist in GetTreeRequest")?
.try_into()
.err_tip(|| "In GetTreeRequest::root_digest")?;

let mut deque: VecDeque<DigestInfo> = VecDeque::new();
let mut directories: Vec<Directory> = Vec::new();
// `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
let mut page_token_parts = inner_request.page_token.split("-");
let page_token_digest = DigestInfo::try_new(
page_token_parts
.next()
.err_tip(|| "Failed to parse `hash_str` in `page_token`")?,
page_token_parts
.next()
.err_tip(|| "Failed to parse `size_bytes` in `page_token`")?
.parse::<i64>()
.err_tip(|| "Failed to parse `size_bytes` as i64")?,
)
.err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")?;
let page_size = inner_request.page_size;
// If `page_size` is 0, paging is not necessary.
let mut page_token_matched = page_size == 0;
deque.push_back(root_digest);

while !deque.is_empty() {
let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")?;
let directory = get_and_decode_digest::<Directory>(store_pin, &digest)
.await
.err_tip(|| "Converting digest to Directory")?;
if digest == page_token_digest {
page_token_matched = true;
}
for directory in &directory.directories {
let digest: DigestInfo = directory
.digest
.clone()
.err_tip(|| "Expected Digest to exist in Directory::directories::digest")?
.try_into()
.err_tip(|| "In Directory::file::digest")?;
deque.push_back(digest);
}
if page_token_matched {
directories.push(directory);
if directories.len() as i32 == page_size {
break;
}
}
}
// `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
// It will be an empty string when it reached the end of the directory tree.
let next_page_token: String = if let Some(value) = deque.front() {
format!("{}-{}", value.hash_str(), value.size_bytes)
} else {
String::new()
};

Ok(Response::new(Box::pin(futures::stream::once(async {
Ok(GetTreeResponse {
directories,
next_page_token,
})
}))))
}
}

Expand Down
241 changes: 241 additions & 0 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_s
use nativelink_proto::build::bazel::remote::execution::v2::{compressor, digest_function, Digest};
use nativelink_proto::google::rpc::Status as GrpcStatus;
use nativelink_service::cas_server::CasServer;
use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
Expand Down Expand Up @@ -324,6 +325,246 @@ mod batch_read_blobs {
}
}

mod get_tree {
use futures::StreamExt;
use nativelink_proto::build::bazel::remote::execution::v2::{
digest_function, Directory, DirectoryNode, GetTreeRequest, GetTreeResponse, NodeProperties,
};
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::store_trait::Store;
use pretty_assertions::assert_eq; // Must be declared in every module.
use prost_types::Timestamp;

use super::*;

struct SetupDirectoryResult {
root_directory: Directory,
root_directory_digest_info: DigestInfo,
sub_directories: Vec<Directory>,
sub_directory_digest_infos: Vec<DigestInfo>,
}
async fn setup_directory_structure(
store_pinned: Pin<&dyn Store>,
) -> Result<SetupDirectoryResult, Error> {
// Set up 5 sub-directories.
const SUB_DIRECTORIES_LENGTH: i32 = 5;
let mut sub_directory_nodes: Vec<DirectoryNode> = vec![];
let mut sub_directories: Vec<Directory> = vec![];
let mut sub_directory_digest_infos: Vec<DigestInfo> = vec![];

for i in 0..SUB_DIRECTORIES_LENGTH {
let sub_directory: Directory = Directory {
files: vec![],
directories: vec![],
symlinks: vec![],
node_properties: Some(NodeProperties {
properties: vec![],
mtime: Some(Timestamp {
seconds: i as i64,
nanos: 0,
}),
unix_mode: Some(0o755),
}),
};
let sub_directory_digest_info: DigestInfo = serialize_and_upload_message(
&sub_directory,
store_pinned,
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
sub_directory_digest_infos.push(sub_directory_digest_info);
sub_directory_nodes.push(DirectoryNode {
name: format!("sub_directory_{i}"),
digest: Some(sub_directory_digest_info.into()),
});
sub_directories.push(sub_directory);
}

// Set up a root directory.
let root_directory: Directory = Directory {
files: vec![],
directories: sub_directory_nodes,
symlinks: vec![],
node_properties: None,
};
let root_directory_digest_info: DigestInfo = serialize_and_upload_message(
&root_directory,
store_pinned,
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;

Ok(SetupDirectoryResult {
root_directory,
root_directory_digest_info,
sub_directories,
sub_directory_digest_infos,
})
}

#[nativelink_test]
async fn get_tree_read_directories_without_paging() -> Result<(), Box<dyn std::error::Error>> {
let store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();
let store_pinned = Pin::new(store_owned.as_ref());

// Setup directory structure.
let SetupDirectoryResult {
root_directory,
root_directory_digest_info,
sub_directories,
sub_directory_digest_infos: _,
} = setup_directory_structure(store_pinned).await?;

// Must work when paging is disabled ( `page_size` is 0 ).
// It reads all directories at once.
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 0,
page_token: format!(
"{}-{}",
root_directory_digest_info.hash_str(),
root_directory_digest_info.size_bytes
),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![
root_directory.clone(),
sub_directories[0].clone(),
sub_directories[1].clone(),
sub_directories[2].clone(),
sub_directories[3].clone(),
sub_directories[4].clone()
],
next_page_token: String::new()
}]
);

Ok(())
}

#[nativelink_test]
async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::error::Error>> {
let store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();
let store_pinned = Pin::new(store_owned.as_ref());

// Setup directory structure.
let SetupDirectoryResult {
root_directory,
root_directory_digest_info,
sub_directories,
sub_directory_digest_infos,
} = setup_directory_structure(store_pinned).await?;

// Must work when paging is enabled ( `page_size` is 2 ).
// First, it reads `root_directory` and `sub_directory[0]`.
// Then, it reads `sub_directory[1]` and `sub_directory[2]`.
// Finally, it reads `sub_directory[3]` and `sub_directory[4]`.
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
root_directory_digest_info.hash_str(),
root_directory_digest_info.size_bytes
),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![root_directory.clone(), sub_directories[0].clone()],
next_page_token: format!(
"{}-{}",
sub_directory_digest_infos[1].hash_str(),
sub_directory_digest_infos[1].size_bytes
),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
sub_directory_digest_infos[1].hash_str(),
sub_directory_digest_infos[1].size_bytes
),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[1].clone(), sub_directories[2].clone()],
next_page_token: format!(
"{}-{}",
sub_directory_digest_infos[3].hash_str(),
sub_directory_digest_infos[3].size_bytes
),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
sub_directory_digest_infos[3].hash_str(),
sub_directory_digest_infos[3].size_bytes
),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
.await;
assert!(raw_response.is_ok());
assert_eq!(
raw_response
.unwrap()
.into_inner()
.filter_map(|x| async move { Some(x.unwrap()) })
.collect::<Vec<_>>()
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[3].clone(), sub_directories[4].clone()],
next_page_token: String::new(),
}]
);

Ok(())
}
}
#[cfg(test)]
mod end_to_end {
use nativelink_proto::build::bazel::remote::execution::v2::{
Expand Down

0 comments on commit ae44878

Please sign in to comment.