Skip to content

Commit 3b385a9

Browse files
marvinlanhenkeliurenjie1024viirya
authored andcommitted
Basic Integration with Datafusion (apache#324)
* chore: basic structure * feat: add IcebergCatalogProvider * feat: add IcebergSchemaProvider * feat: add IcebergTableProvider * chore: add integration test infr * fix: remove old test * chore: update crate structure * fix: remove workspace dep * refactor: use try_join_all * chore: remove feature flag * chore: rename package * chore: update readme * feat: add TableType * fix: import + async_trait * fix: imports + async_trait * chore: remove feature flag * fix: cargo sort * refactor: CatalogProvider `fn try_new` * refactor: SchemaProvider `fn try_new` * chore: update docs * chore: update docs * chore: update doc * feat: impl `fn schema` on TableProvider * chore: rename ArrowSchema * refactor: remove DashMap * feat: add basic IcebergTableScan * chore: fix docs * chore: add comments * fix: clippy * fix: typo * fix: license * chore: update docs * chore: move derive stmt * fix: collect into hashmap * chore: use DFResult * Update crates/integrations/datafusion/README.md Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> --------- Co-authored-by: Renjie Liu <liurenjie2008@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent ec5b6a7 commit 3b385a9

File tree

15 files changed

+889
-1
lines changed

15 files changed

+889
-1
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ members = [
2121
"crates/catalog/*",
2222
"crates/examples",
2323
"crates/iceberg",
24+
"crates/integrations/*",
2425
"crates/test_utils",
2526
]
2627

@@ -56,6 +57,7 @@ fnv = "1"
5657
futures = "0.3"
5758
iceberg = { version = "0.2.0", path = "./crates/iceberg" }
5859
iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" }
60+
iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" }
5961
itertools = "0.12"
6062
lazy_static = "1"
6163
log = "^0.4"

crates/iceberg/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use futures::AsyncReadExt;
2525
use typed_builder::TypedBuilder;
2626

2727
/// Table represents a table in the catalog.
28-
#[derive(TypedBuilder, Debug)]
28+
#[derive(TypedBuilder, Debug, Clone)]
2929
pub struct Table {
3030
file_io: FileIO,
3131
#[builder(default, setter(strip_option, into))]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-datafusion"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Datafusion Integration"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "integrations", "datafusion"]
30+
31+
[dependencies]
32+
anyhow = { workspace = true }
33+
async-trait = { workspace = true }
34+
datafusion = { version = "37.0.0" }
35+
futures = { workspace = true }
36+
iceberg = { workspace = true }
37+
log = { workspace = true }
38+
tokio = { workspace = true }
39+
40+
[dev-dependencies]
41+
iceberg-catalog-hms = { workspace = true }
42+
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
43+
port_scanner = { workspace = true }
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing,
13+
~ software distributed under the License is distributed on an
14+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
~ KIND, either express or implied. See the License for the
16+
~ specific language governing permissions and limitations
17+
~ under the License.
18+
-->
19+
20+
# Apache Iceberg DataFusion Integration
21+
22+
This crate contains the integration of Apache DataFusion and Apache Iceberg.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{any::Any, collections::HashMap, sync::Arc};
19+
20+
use datafusion::catalog::{schema::SchemaProvider, CatalogProvider};
21+
use futures::future::try_join_all;
22+
use iceberg::{Catalog, NamespaceIdent, Result};
23+
24+
use crate::schema::IcebergSchemaProvider;
25+
26+
/// Provides an interface to manage and access multiple schemas
27+
/// within an Iceberg [`Catalog`].
28+
///
29+
/// Acts as a centralized catalog provider that aggregates
30+
/// multiple [`SchemaProvider`], each associated with distinct namespaces.
31+
pub struct IcebergCatalogProvider {
32+
/// A `HashMap` where keys are namespace names
33+
/// and values are dynamic references to objects implementing the
34+
/// [`SchemaProvider`] trait.
35+
schemas: HashMap<String, Arc<dyn SchemaProvider>>,
36+
}
37+
38+
impl IcebergCatalogProvider {
39+
/// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
40+
/// using the given client to fetch and initialize schema providers for
41+
/// each namespace in the Iceberg [`Catalog`].
42+
///
43+
/// This method retrieves the list of namespace names
44+
/// attempts to create a schema provider for each namespace, and
45+
/// collects these providers into a `HashMap`.
46+
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
47+
// TODO:
48+
// Schemas and providers should be cached and evicted based on time
49+
// As of right now; schemas might become stale.
50+
let schema_names: Vec<_> = client
51+
.list_namespaces(None)
52+
.await?
53+
.iter()
54+
.flat_map(|ns| ns.as_ref().clone())
55+
.collect();
56+
57+
let providers = try_join_all(
58+
schema_names
59+
.iter()
60+
.map(|name| {
61+
IcebergSchemaProvider::try_new(
62+
client.clone(),
63+
NamespaceIdent::new(name.clone()),
64+
)
65+
})
66+
.collect::<Vec<_>>(),
67+
)
68+
.await?;
69+
70+
let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
71+
.into_iter()
72+
.zip(providers.into_iter())
73+
.map(|(name, provider)| {
74+
let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
75+
(name, provider)
76+
})
77+
.collect();
78+
79+
Ok(IcebergCatalogProvider { schemas })
80+
}
81+
}
82+
83+
impl CatalogProvider for IcebergCatalogProvider {
84+
fn as_any(&self) -> &dyn Any {
85+
self
86+
}
87+
88+
fn schema_names(&self) -> Vec<String> {
89+
self.schemas.keys().cloned().collect()
90+
}
91+
92+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
93+
self.schemas.get(name).cloned()
94+
}
95+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use anyhow::anyhow;
19+
use iceberg::{Error, ErrorKind};
20+
21+
/// Converts a datafusion error into an iceberg error.
22+
pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error {
23+
Error::new(
24+
ErrorKind::Unexpected,
25+
"Operation failed for hitting datafusion error".to_string(),
26+
)
27+
.with_source(anyhow!("datafusion error: {:?}", error))
28+
}
29+
/// Converts an iceberg error into a datafusion error.
30+
pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError {
31+
datafusion::error::DataFusionError::External(error.into())
32+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod catalog;
19+
pub use catalog::*;
20+
21+
mod error;
22+
pub use error::*;
23+
24+
mod physical_plan;
25+
mod schema;
26+
mod table;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub(crate) mod scan;

0 commit comments

Comments
 (0)