diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 522bac2e2ca4..6aa0fa111921 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -35,6 +35,7 @@ use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; use datafusion::datasource::datasource::Statistics; +use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::datasource::FilePartition; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, @@ -655,6 +656,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), }; let fun_expr = functions::create_physical_fun( diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 86eb64cf873e..23a6d72e5713 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -58,7 +58,7 @@ chrono = "0.4" async-trait = "0.1.41" futures = "0.3" pin-project-lite= "^0.2.0" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] } tokio-stream = "0.1" log = "^0.4" md-5 = { version = "^0.9.1", optional = true } diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index d5e29522f6ba..df3328ec81c8 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -22,6 +22,7 @@ pub mod datasource; pub mod empty; pub mod json; pub mod memory; +pub mod object_store; pub mod parquet; pub use self::csv::{CsvFile, CsvReadOptions}; diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs new file mode 100644 index 000000000000..2b27f6c8f993 --- /dev/null +++ b/datafusion/src/datasource/object_store/local.rs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object store that represents the Local File System. + +use std::fs::Metadata; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{stream, AsyncRead, StreamExt}; + +use crate::datasource::object_store::{ + FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, +}; +use crate::error::DataFusionError; +use crate::error::Result; + +#[derive(Debug)] +/// Local File System as Object Store. +pub struct LocalFileSystem; + +#[async_trait] +impl ObjectStore for LocalFileSystem { + async fn list_file(&self, prefix: &str) -> Result { + list_all(prefix.to_owned()).await + } + + async fn list_dir( + &self, + _prefix: &str, + _delimiter: Option, + ) -> Result { + todo!() + } + + fn file_reader(&self, file: FileMeta) -> Result> { + Ok(Arc::new(LocalFileReader::new(file)?)) + } +} + +struct LocalFileReader { + file: FileMeta, +} + +impl LocalFileReader { + fn new(file: FileMeta) -> Result { + Ok(Self { file }) + } +} + +#[async_trait] +impl ObjectReader for LocalFileReader { + async fn chunk_reader( + &self, + _start: u64, + _length: usize, + ) -> Result> { + todo!() + } + + fn length(&self) -> u64 { + self.file.size + } +} + +async fn list_all(prefix: String) -> Result { + fn get_meta(path: String, metadata: Metadata) -> FileMeta { + FileMeta { + path, + last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + size: metadata.len(), + } + } + + async fn find_files_in_dir( + path: String, + to_visit: &mut Vec, + ) -> Result> { + let mut dir = tokio::fs::read_dir(path).await?; + let mut files = Vec::new(); + + while let Some(child) = dir.next_entry().await? { + if let Some(child_path) = child.path().to_str() { + let metadata = child.metadata().await?; + if metadata.is_dir() { + to_visit.push(child_path.to_string()); + } else { + files.push(get_meta(child_path.to_owned(), metadata)) + } + } else { + return Err(DataFusionError::Plan("Invalid path".to_string())); + } + } + Ok(files) + } + + let prefix_meta = tokio::fs::metadata(&prefix).await?; + let prefix = prefix.to_owned(); + if prefix_meta.is_file() { + Ok(Box::pin(stream::once(async move { + Ok(get_meta(prefix, prefix_meta)) + }))) + } else { + let result = stream::unfold(vec![prefix], move |mut to_visit| async move { + match to_visit.pop() { + None => None, + Some(path) => { + let file_stream = match find_files_in_dir(path, &mut to_visit).await { + Ok(files) => stream::iter(files).map(Ok).left_stream(), + Err(e) => stream::once(async { Err(e) }).right_stream(), + }; + + Some((file_stream, to_visit)) + } + } + }) + .flatten(); + Ok(Box::pin(result)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use std::collections::HashSet; + use std::fs::create_dir; + use std::fs::File; + use tempfile::tempdir; + + #[tokio::test] + async fn test_recursive_listing() -> Result<()> { + // tmp/a.txt + // tmp/x/b.txt + // tmp/y/c.txt + let tmp = tempdir()?; + let x_path = tmp.path().join("x"); + let y_path = tmp.path().join("y"); + let a_path = tmp.path().join("a.txt"); + let b_path = x_path.join("b.txt"); + let c_path = y_path.join("c.txt"); + create_dir(&x_path)?; + create_dir(&y_path)?; + File::create(&a_path)?; + File::create(&b_path)?; + File::create(&c_path)?; + + let mut all_files = HashSet::new(); + let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?; + while let Some(file) = files.next().await { + let file = file?; + assert_eq!(file.size, 0); + all_files.insert(file.path); + } + + assert_eq!(all_files.len(), 3); + assert!(all_files.contains(a_path.to_str().unwrap())); + assert!(all_files.contains(b_path.to_str().unwrap())); + assert!(all_files.contains(c_path.to_str().unwrap())); + + Ok(()) + } +} diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs new file mode 100644 index 000000000000..fd25fd43a2e7 --- /dev/null +++ b/datafusion/src/datasource/object_store/mod.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object Store abstracts access to an underlying file/object storage. + +pub mod local; + +use std::collections::HashMap; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use futures::{AsyncRead, Stream}; + +use local::LocalFileSystem; + +use crate::error::{DataFusionError, Result}; +use chrono::Utc; + +/// Object Reader for one file in a object store +#[async_trait] +pub trait ObjectReader { + /// Get reader for a part [start, start + length] in the file asynchronously + async fn chunk_reader(&self, start: u64, length: usize) + -> Result>; + + /// Get length for the file + fn length(&self) -> u64; +} + +/// Represents a file or a prefix that may require further resolution +#[derive(Debug)] +pub enum ListEntry { + /// File metadata + FileMeta(FileMeta), + /// Prefix to be further resolved during partition discovery + Prefix(String), +} + +/// File meta we got from object store +#[derive(Debug)] +pub struct FileMeta { + /// Path of the file + pub path: String, + /// Last time the file was modified in UTC + pub last_modified: Option>, + /// File size in total + pub size: u64, +} + +/// Stream of files get listed from object store +pub type FileMetaStream = + Pin> + Send + Sync + 'static>>; + +/// Stream of list entries get from object store +pub type ListEntryStream = + Pin> + Send + Sync + 'static>>; + +/// A ObjectStore abstracts access to an underlying file/object storage. +/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes +#[async_trait] +pub trait ObjectStore: Sync + Send + Debug { + /// Returns all the files in path `prefix` + async fn list_file(&self, prefix: &str) -> Result; + + /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, + /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. + async fn list_dir( + &self, + prefix: &str, + delimiter: Option, + ) -> Result; + + /// Get object reader for one file + fn file_reader(&self, file: FileMeta) -> Result>; +} + +static LOCAL_SCHEME: &str = "file"; + +/// A Registry holds all the object stores at runtime with a scheme for each store. +/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +/// and query data inside these systems. +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock>>, +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read local files natively. + pub fn new() -> Self { + let mut map: HashMap> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc, + ) -> Option> { + let mut stores = self.object_stores.write().unwrap(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option> { + let stores = self.object_stores.read().unwrap(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the URI based on it's scheme. For example: + /// URI with scheme file or no schema will return the default LocalFS store, + /// URI with scheme s3 will return the S3 store if it's registered. + pub fn get_by_uri(&self, uri: &str) -> Result> { + if let Some((scheme, _)) = uri.split_once(':') { + let stores = self.object_stores.read().unwrap(); + stores + .get(&*scheme.to_lowercase()) + .map(Clone::clone) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + scheme + )) + }) + } else { + Ok(Arc::new(LocalFileSystem)) + } + } +} diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 2e6a7a4f7012..54c7ce8be280 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -49,6 +49,7 @@ use crate::catalog::{ ResolvedTableReference, TableReference, }; use crate::datasource::csv::CsvFile; +use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -164,6 +165,7 @@ impl ExecutionContext { aggregate_functions: HashMap::new(), config, execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), })), } } @@ -363,6 +365,29 @@ impl ExecutionContext { self.state.lock().unwrap().catalog_list.catalog(name) } + /// Registers a object store with scheme using a custom `ObjectStore` so that + /// an external file system or object storage system could be used against this context. + /// + /// Returns the `ObjectStore` previously registered for this scheme, if any + pub fn register_object_store( + &self, + scheme: impl Into, + object_store: Arc, + ) -> Option> { + let scheme = scheme.into(); + + self.state + .lock() + .unwrap() + .object_store_registry + .register_store(scheme, object_store) + } + + /// Retrieves a `ObjectStore` instance by scheme + pub fn object_store(&self, scheme: &str) -> Option> { + self.state.lock().unwrap().object_store_registry.get(scheme) + } + /// Registers a table using a custom `TableProvider` so that /// it can be referenced from SQL statements executed against this /// context. @@ -849,6 +874,8 @@ pub struct ExecutionContextState { pub config: ExecutionConfig, /// Execution properties pub execution_props: ExecutionProps, + /// Object Store that are registered with the context + pub object_store_registry: Arc, } impl ExecutionProps { @@ -876,6 +903,7 @@ impl ExecutionContextState { aggregate_functions: HashMap::new(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), } }