Skip to content

Commit

Permalink
Merge pull request #751 from splitgraph/wire-up-iceberg
Browse files Browse the repository at this point in the history
Add table format parameter to clade
  • Loading branch information
gruuya authored Nov 29, 2024
2 parents 7d6db29 + 3cd6626 commit ed4d45f
Show file tree
Hide file tree
Showing 30 changed files with 187 additions and 86 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-deci
futures = "0.3"
hex = ">=0.4.0"

iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" }
iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" }
indexmap = "2.6.0"
itertools = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions clade/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ message TableObject {
string path = 2;
// Storage location identifier
optional string store = 4;
// Table format
TableFormat format = 5;
}

enum TableFormat {
DELTA = 0;
ICEBERG = 1;
}

// A single root storage location, hosting many individual tables
Expand Down
115 changes: 83 additions & 32 deletions src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::catalog::external::ExternalStore;
use crate::catalog::repository::RepositoryStore;
use crate::catalog::{
CatalogError, CatalogResult, CatalogStore, CreateFunctionError, FunctionStore,
SchemaStore, TableStore,
SchemaStore, TableStore, DEFAULT_SCHEMA,
};

use crate::object_store::factory::ObjectStoreFactory;
Expand All @@ -13,21 +13,25 @@ use crate::wasm_udf::data_types::{
CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage,
CreateFunctionVolatility,
};
use clade::schema::{SchemaObject, TableObject};
use clade::schema::{SchemaObject, TableFormat, TableObject};
use dashmap::DashMap;
use datafusion::catalog_common::memory::MemorySchemaProvider;
use datafusion::datasource::TableProvider;

use super::empty::EmptyStore;
use crate::catalog::memory::MemoryStore;
use crate::object_store::utils::object_store_opts_to_file_io_props;
use deltalake::DeltaTable;
use futures::{stream, StreamExt, TryStreamExt};
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
use iceberg::TableIdent;
use iceberg_datafusion::IcebergTableProvider;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;

use super::empty::EmptyStore;

// Root URL for a storage location alongside client connection options
type LocationAndOptions = (String, HashMap<String, String>);

Expand Down Expand Up @@ -166,35 +170,82 @@ impl Metastore {
// delta tables present in the database. The real fix for this is to make DF use `TableSource`
// for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs.

let table_log_store = match table.store {
// Use the provided customized location
Some(name) => {
let (location, this_store_options) = store_options
.get(&name)
.ok_or(CatalogError::Generic {
reason: format!("Object store with name {name} not found"),
})?
.clone();

self.object_stores
.get_log_store_for_table(
Url::parse(&location)?,
this_store_options,
table.path,
)
.await?
match TableFormat::try_from(table.format).map_err(|e| CatalogError::Generic {
reason: format!("Unrecognized table format id {}: {e}", table.format),
})? {
TableFormat::Delta => {
let table_log_store = match table.store {
// Use the provided customized location
Some(name) => {
let (location, this_store_options) = store_options
.get(&name)
.ok_or(CatalogError::Generic {
reason: format!(
"Object store with name {name} not found"
),
})?
.clone();

self.object_stores
.get_log_store_for_table(
Url::parse(&location)?,
this_store_options,
table.path,
)
.await?
}
// Use the configured, default, object store
None => self
.object_stores
.get_default_log_store(&table.path)
.ok_or(CatalogError::NoTableStoreInInlineMetastore {
name: table.name.clone(),
})?,
};

let delta_table = DeltaTable::new(table_log_store, Default::default());
Ok((Arc::from(table.name), Arc::new(delta_table) as _))
}
// Use the configured, default, object store
None => self
.object_stores
.get_default_log_store(&table.path)
.ok_or(CatalogError::NoTableStoreInInlineMetastore {
name: table.name.clone(),
})?,
};

let delta_table = DeltaTable::new(table_log_store, Default::default());
Ok((Arc::from(table.name), Arc::new(delta_table) as _))
TableFormat::Iceberg => {
let (location, file_io) = match table.store {
Some(name) => {
let (location, this_store_options) = store_options
.get(&name)
.ok_or(CatalogError::Generic {
reason: format!(
"Object store with name {name} not found"
),
})?
.clone();

let file_io_props = object_store_opts_to_file_io_props(&this_store_options);
let file_io = FileIO::from_path(&location)?.with_props(file_io_props).build()?;
(location, file_io)
}
None => return Err(CatalogError::Generic {
reason: "Iceberg tables must pass FileIO props as object store options".to_string(),
}),
};

// Create the full path to table metadata by combining the object store location and
// relative table metadata path
let absolute_path = format!(
"{}/{}",
location.trim_end_matches("/"),
table.path.trim_start_matches("/")
);
let iceberg_table = StaticTable::from_metadata_file(
&absolute_path,
TableIdent::from_strs(vec![DEFAULT_SCHEMA, &table.name])?,
file_io,
)
.await?
.into_table();
let table_provider =
IcebergTableProvider::try_new_from_table(iceberg_table).await?;
Ok((Arc::from(table.name), Arc::new(table_provider) as _))
}
}
}

pub async fn build_functions(
Expand Down
3 changes: 3 additions & 0 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ pub enum CatalogError {

#[error("No inline metastore passed in")]
NoInlineMetastore,

#[error("Failed constructing an Iceberg table: {0}")]
IcebergError(#[from] iceberg::Error),
}

/// Implement a global converter into a DataFusionError from the catalog error type.
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use uuid::Uuid;

use clade::schema::{ListSchemaResponse, SchemaObject, TableObject};
use clade::schema::{ListSchemaResponse, SchemaObject, TableFormat, TableObject};

use crate::catalog::{
CatalogError, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore,
Expand Down Expand Up @@ -133,6 +133,7 @@ impl SchemaStore for RepositoryStore {
name: name.clone(),
path: uuid.to_string(),
store: None,
format: TableFormat::Delta.into(),
})
} else {
None
Expand Down
34 changes: 34 additions & 0 deletions src/object_store/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use futures::TryFutureExt;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use object_store::aws::AmazonS3ConfigKey;
use object_store::Error;
use std::collections::HashMap;
use std::path::Path as StdPath;
use std::str::FromStr;
use tokio::fs::{copy, create_dir_all, remove_file, rename};
use tracing::debug;

Expand Down Expand Up @@ -44,3 +48,33 @@ pub async fn fast_upload(from: &StdPath, to: String) -> object_store::Result<(),
Ok(())
}
}

// Go through all known keys for object store and convert them to corresponding file_io ones.
//
// For now only converts S3 keys.
// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store,
// https://github.com/apache/iceberg-rust/issues/172
pub fn object_store_opts_to_file_io_props(
opts: &HashMap<String, String>,
) -> HashMap<String, String> {
let mut props = HashMap::new();

for (key, val) in opts.iter() {
let key = match AmazonS3ConfigKey::from_str(key) {
Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID,
Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY,
Ok(AmazonS3ConfigKey::Region) => S3_REGION,
Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT,
_ => key, // for now just propagate any non-matched keys
};

props.insert(key.to_string(), val.clone());
}

// FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO)
props
.entry(S3_REGION.to_string())
.or_insert("dummy-region".to_string());

props
}
2 changes: 1 addition & 1 deletion src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl SchemaProvider for SeafowlSchema {
let mut delta_table = match self.tables.get(name) {
None => return Ok(None),
Some(table) => match table.as_any().downcast_ref::<DeltaTable>() {
// This shouldn't happen since we store only DeltaTable's in the map
// Different table format, e.g. Iceberg
None => return Ok(Some(table.clone())),
Some(delta_table) => {
if delta_table.version() != -1 {
Expand Down
21 changes: 9 additions & 12 deletions tests/clade/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@ async fn test_basic_select(#[case] table: &str, #[case] object_store: bool) -> (
let _r = context.metastore.schemas.list(DEFAULT_DB).await;

let plan = context
.plan_query(&format!("SELECT * FROM {table} ORDER BY value"))
.plan_query(&format!("SELECT * FROM {table} ORDER BY key"))
.await
.unwrap();
let results = context.collect(plan).await.unwrap();

let expected = [
"+-------+------+-------+-----+",
"| value | year | month | day |",
"+-------+------+-------+-----+",
"| 1 | 2020 | 1 | 1 |",
"| 2 | 2020 | 2 | 3 |",
"| 3 | 2020 | 2 | 5 |",
"| 4 | 2021 | 4 | 5 |",
"| 5 | 2021 | 12 | 4 |",
"| 6 | 2021 | 12 | 20 |",
"| 7 | 2021 | 12 | 20 |",
"+-------+------+-------+-----+",
"+-----+-------+",
"| key | value |",
"+-----+-------+",
"| 1 | one |",
"| 2 | two |",
"| 3 | three |",
"| 4 | four |",
"+-----+-------+",
];
assert_batches_eq!(expected, &results);
}

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3 changes: 3 additions & 0 deletions tests/data/delta/_delta_log/00000000000000000000.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"b4d25be5-de67-4fca-8f6a-9aa18eeb141e","name":"test","description":"Created by Seafowl 0.5.8","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1732876066617,"configuration":{}}}
{"commitInfo":{"timestamp":1732876066618,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","mode":"ErrorIfExists","location":"file:///Users/gruuya/Splitgraph/seafowl-data/2f6abe4f-c07e-43fa-a98d-6200a34380dd","metadata":"{\"configuration\":{},\"createdTime\":1732876066617,\"description\":\"Created by Seafowl 0.5.8\",\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"b4d25be5-de67-4fca-8f6a-9aa18eeb141e\",\"name\":\"test\",\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"key\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"value\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}"},"clientVersion":"delta-rs.0.22.0"}}
2 changes: 2 additions & 0 deletions tests/data/delta/_delta_log/00000000000000000001.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet","partitionValues":{},"size":768,"modificationTime":1732876066644,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":1,\"value\":\"one\"},\"maxValues\":{\"key\":2,\"value\":\"two\"},\"nullCount\":{\"value\":0,\"key\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1732876066645,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.22.0"}}
2 changes: 2 additions & 0 deletions tests/data/delta/_delta_log/00000000000000000002.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet","partitionValues":{},"size":780,"modificationTime":1732876070309,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":3,\"value\":\"four\"},\"maxValues\":{\"key\":4,\"value\":\"three\"},\"nullCount\":{\"key\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1732876070311,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.22.0"}}
Binary file not shown.
Binary file not shown.
25 changes: 19 additions & 6 deletions tests/fixtures.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use clade::schema::{ListSchemaResponse, SchemaObject, StorageLocation, TableObject};
use clade::schema::{
ListSchemaResponse, SchemaObject, StorageLocation, TableFormat, TableObject,
};
use object_store::aws::AmazonS3ConfigKey;
use object_store::gcp::GoogleConfigKey;
use object_store::ClientConfigKey;
Expand All @@ -25,15 +27,17 @@ pub fn fake_gcs_creds() -> String {
pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
let mut local_schema_tables = vec![TableObject {
name: "file_with_store".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
path: "delta".to_string(),
store: Some("local_fs".to_string()),
format: TableFormat::Delta.into(),
}];

if include_file_without_store {
local_schema_tables.push(TableObject {
name: "file".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
path: "delta".to_string(),
store: None,
format: TableFormat::Delta.into(),
})
}

Expand All @@ -48,22 +52,31 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
tables: vec![
TableObject {
name: "minio".to_string(),
path: "test-data/delta-0.8.0-partitioned".to_string(),
path: "test-data/delta".to_string(),
store: Some("minio".to_string()),
format: TableFormat::Delta.into(),
},
TableObject {
name: "minio_prefix".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
path: "delta".to_string(),
store: Some("minio-prefix".to_string()),
format: TableFormat::Delta.into(),
},
TableObject {
name: "iceberg".to_string(),
path: "iceberg/default.db/iceberg_table/metadata/00001-f394d7ec-944b-432d-a44f-78b5ec95aae2.metadata.json".to_string(),
store: Some("minio-prefix".to_string()),
format: TableFormat::Iceberg.into(),
},
],
},
SchemaObject {
name: "gcs".to_string(),
tables: vec![TableObject {
name: "fake".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
path: "delta".to_string(),
store: Some("fake-gcs".to_string()),
format: TableFormat::Delta.into(),
}],
},
],
Expand Down
Loading

0 comments on commit ed4d45f

Please sign in to comment.