Skip to content

Commit

Permalink
Start adding iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Nov 25, 2024
1 parent acc5aca commit 07d4cdc
Show file tree
Hide file tree
Showing 11 changed files with 880 additions and 64 deletions.
851 changes: 814 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ env_logger = "0.11.5"
futures = "0.3.30"
http = "1"
http-body = "1"
iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff", optional = true }
itertools = "0.13.0"
lazy_static = "1.4.0"
log = "0.4.22"
Expand Down Expand Up @@ -64,6 +65,7 @@ url = "2.5.2"
[features]
default = ["functions-parquet"]
deltalake = ["dep:deltalake"]
iceberg = ["dep:iceberg-datafusion"]
flightsql = ["dep:arrow-flight", "dep:tonic"]
experimental-flightsql-server = ["flightsql"]
s3 = ["object_store/aws", "url"]
Expand Down
5 changes: 3 additions & 2 deletions src/execution/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl std::fmt::Debug for ExecutionContext {

impl ExecutionContext {
/// Construct a new `ExecutionContext` with the specified configuration
pub fn try_new(config: &ExecutionConfig, app_type: AppType) -> Result<Self> {
pub async fn try_new(config: &ExecutionConfig, app_type: AppType) -> Result<Self> {
let mut builder = DftSessionStateBuilder::new();
let mut executor = None;
match app_type {
Expand All @@ -97,8 +97,9 @@ impl ExecutionContext {
}
}
let extensions = enabled_extensions();

for extension in &extensions {
builder = extension.register(config, builder)?;
builder = extension.register(config.clone(), builder).await?;
}

let state = builder.build()?;
Expand Down
13 changes: 0 additions & 13 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub use stats::{collect_plan_io_stats, ExecutionStats};
#[cfg(feature = "flightsql")]
use self::flightsql::{FlightSQLClient, FlightSQLContext};
use self::local::ExecutionContext;
use crate::config::AppConfig;
use color_eyre::eyre::Result;
use datafusion::prelude::*;

pub enum AppType {
Expand All @@ -58,17 +56,6 @@ impl AppExecution {
}
}

pub fn try_new_from_config(config: AppConfig, app_type: AppType) -> Result<Self> {
let context = ExecutionContext::try_new(&config.execution, app_type)?;
#[cfg(feature = "flightsql")]
let flightsql_context = FlightSQLContext::new(config.flightsql);
Ok(Self {
context,
#[cfg(feature = "flightsql")]
flightsql_context,
})
}

pub fn execution_ctx(&self) -> &ExecutionContext {
&self.context
}
Expand Down
3 changes: 1 addition & 2 deletions src/extensions/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ impl Debug for DftSessionStateBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DftSessionStateBuilder")
.field("session_config", &self.session_config)
//.field("table_factories", &self.table_factories)
.field(
"table_factories",
&"TODO TableFactoryDoes not implement Debug",
&"TODO TableFactory does not implement Debug",
)
.field("runtime_env", &self.runtime_env)
.finish()
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl DeltaLakeExtension {
impl Extension for DeltaLakeExtension {
fn register(
&self,
_config: &ExecutionConfig,
_config: ExecutionConfig,
builder: DftSessionStateBuilder,
) -> datafusion_common::Result<DftSessionStateBuilder> {
Ok(builder.with_table_factory("DELTATABLE", Arc::new(DeltaTableFactory {})))
Expand Down
5 changes: 2 additions & 3 deletions src/extensions/functions_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ impl JsonFunctionsExtension {
}

impl Extension for JsonFunctionsExtension {
fn register(
async fn register(
&self,
_config: &ExecutionConfig,
_config: ExecutionConfig,
builder: DftSessionStateBuilder,
) -> datafusion_common::Result<DftSessionStateBuilder> {
//
Ok(builder)
}

Expand Down
44 changes: 44 additions & 0 deletions src/extensions/iceberg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DeltaLake integration: [DeltaLakeExtension]
use crate::config::ExecutionConfig;
use crate::extensions::{DftSessionStateBuilder, Extension};
use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProviderFactory};
use std::sync::Arc;

#[derive(Debug, Default)]
pub struct IcebergExtension {}

impl IcebergExtension {
pub fn new() -> Self {
Self {}
}
}

impl Extension for IcebergExtension {
async fn register(
&self,
_config: ExecutionConfig,
builder: DftSessionStateBuilder,
) -> datafusion_common::Result<DftSessionStateBuilder> {
Ok(builder.with_table_factory("ICEBERG", Arc::new(IcebergTableProviderFactory {})));

let catalog_provider = IcebergCatalogProvider::try_new(catalog).await?;
}
}
9 changes: 7 additions & 2 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ mod builder;
mod deltalake;
#[cfg(feature = "functions-json")]
mod functions_json;
#[cfg(feature = "iceberg")]
mod iceberg;
#[cfg(feature = "s3")]
mod s3;

pub use builder::DftSessionStateBuilder;

#[async_trait::async_trait]
pub trait Extension: Debug {
/// Registers this extension with the DataFusion [`SessionStateBuilder`]
fn register(
async fn register(
&self,
_config: &ExecutionConfig,
_config: ExecutionConfig,
_builder: DftSessionStateBuilder,
) -> Result<DftSessionStateBuilder>;

Expand All @@ -55,6 +58,8 @@ pub fn enabled_extensions() -> Vec<Box<dyn Extension>> {
Box::new(s3::AwsS3Extension::new()),
#[cfg(feature = "deltalake")]
Box::new(deltalake::DeltaLakeExtension::new()),
#[cfg(feature = "iceberg")]
Box::new(iceberg::IcebergExtension::new()),
#[cfg(feature = "functions-json")]
Box::new(functions_json::JsonFunctionsExtension::new()),
]
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl AwsS3Extension {
impl Extension for AwsS3Extension {
fn register(
&self,
config: &ExecutionConfig,
config: ExecutionConfig,
mut builder: DftSessionStateBuilder,
) -> datafusion_common::Result<DftSessionStateBuilder> {
let Some(object_store_config) = &config.object_store else {
Expand Down
8 changes: 5 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> {
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
let state = state::initialize(cli.config_path());
let execution_ctx =
ExecutionContext::try_new(&state.config.execution, AppType::FlightSQLServer)?;
ExecutionContext::try_new(&state.config.execution, AppType::FlightSQLServer).await?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
Expand All @@ -75,7 +75,8 @@ async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> {
// CLI mode: executing commands from files or CLI arguments
if !cli.files.is_empty() || !cli.commands.is_empty() {
env_logger::init();
let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Cli)?;
let execution_ctx =
ExecutionContext::try_new(&state.config.execution, AppType::Cli).await?;
#[allow(unused_mut)]
let mut app_execution = AppExecution::new(execution_ctx);
#[cfg(feature = "flightsql")]
Expand All @@ -95,7 +96,8 @@ async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> {
// TUI mode: running the TUI
telemetry::initialize_logs()?; // use alternate logging for TUI
let state = state::initialize(cli.config_path());
let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Tui)?;
let execution_ctx =
ExecutionContext::try_new(&state.config.execution, AppType::Tui).await?;
let app_execution = AppExecution::new(execution_ctx);
let app = App::new(state, cli, app_execution);
app.run_app().await?;
Expand Down

0 comments on commit 07d4cdc

Please sign in to comment.