Skip to content

Commit

Permalink
Make root configurable in FS FileIO and remove default_table_root_loc…
Browse files Browse the repository at this point in the history
…ation from Catalog
  • Loading branch information
fqaiser94 committed Jul 25, 2024
1 parent a1dcb4a commit 807dd4c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
20 changes: 10 additions & 10 deletions crates/catalog/inmemory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,14 @@ use crate::namespace_state::NamespaceState;
pub struct InMemoryCatalog {
root_namespace_state: Mutex<NamespaceState>,
file_io: FileIO,
default_table_root_location: String,
}

impl InMemoryCatalog {
/// Creates an in-memory catalog.
pub fn new(file_io: FileIO, default_table_root_location: String) -> Self {
pub fn new(file_io: FileIO) -> Self {
Self {
root_namespace_state: Mutex::new(NamespaceState::new()),
file_io,
default_table_root_location,
}
}
}
Expand Down Expand Up @@ -169,8 +167,7 @@ impl Catalog for InMemoryCatalog {
Some(location) => (table_creation, location),
None => {
let location = format!(
"{}/{}/{}",
self.default_table_root_location,
"{}/{}",
table_ident.namespace().join("/"),
table_ident.name()
);
Expand Down Expand Up @@ -272,6 +269,7 @@ impl Catalog for InMemoryCatalog {
#[cfg(test)]
mod tests {
use iceberg::io::FileIOBuilder;
use iceberg::io::ROOT_LOCATION;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use std::collections::HashSet;
use std::hash::Hash;
Expand All @@ -282,11 +280,13 @@ mod tests {

fn new_inmemory_catalog() -> impl Catalog {
let tmp_dir = TempDir::new().unwrap();
let default_table_root_location = tmp_dir.path().to_str().unwrap().to_string();

let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let root_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIOBuilder::new_fs_io()
.with_prop(ROOT_LOCATION, root_location)
.build()
.unwrap();

InMemoryCatalog::new(file_io, default_table_root_location)
InMemoryCatalog::new(file_io)
}

async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
Expand Down Expand Up @@ -999,7 +999,7 @@ mod tests {
&catalog.load_table(&expected_table_ident).await.unwrap(),
&expected_table_ident,
&simple_table_schema(),
)
);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ pub use storage_s3::*;
#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-fs")]
use storage_fs::*;
pub use storage_fs::*;
22 changes: 16 additions & 6 deletions crates/iceberg/src/io/storage_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ use opendal::{Operator, Scheme};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};

/// Property root location
pub const ROOT_LOCATION: &str = "root";

/// # TODO
///
/// opendal has a plan to introduce native config support.
/// We manually parse the config here and those code will be finally removed.
#[derive(Default, Clone)]
pub(crate) struct FsConfig {}
pub(crate) struct FsConfig {
root_location: String,
}

impl Debug for FsConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand All @@ -35,15 +40,20 @@ impl Debug for FsConfig {

impl FsConfig {
/// Decode from iceberg props.
pub fn new(_: HashMap<String, String>) -> Self {
Self::default()
pub fn new(m: HashMap<String, String>) -> Self {
let root_location = match m.get(ROOT_LOCATION) {
Some(root_location) => root_location.clone(),
None => "/".to_string(),
};

Self { root_location }
}

/// Build new opendal operator from give path.
/// Build new opendal operator from given path.
///
/// fs always build from `/`
/// fs builds from `/` by default
pub fn build(&self, _: &str) -> Result<Operator> {
let m = HashMap::from_iter([("root".to_string(), "/".to_string())]);
let m = HashMap::from_iter([(ROOT_LOCATION.to_string(), self.root_location.clone())]);
Ok(Operator::via_map(Scheme::Fs, m)?)
}
}

0 comments on commit 807dd4c

Please sign in to comment.