Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: df 36 upgrade #2853

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,419 changes: 1,318 additions & 1,101 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ wildcard_imports = "deny"

[workspace.dependencies]
clap = { version = "4.5.4", features = ["derive"] }
datafusion = { version = "35.0.0", features = ["avro"] }
datafusion = { version = "36.0.0", features = ["avro"] }
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
datafusion-proto = { version = "35.0.0" }
datafusion-proto = { version = "36.0.0" }
reqwest = { version = "0.11.27", default-features = false, features = [
"json",
"rustls-tls",
Expand All @@ -43,6 +43,6 @@ tracing = "0.1"
url = "2.5.0"

[workspace.dependencies.deltalake]
git = "https://github.com/delta-io/delta-rs.git"
rev = "993e2c202936719855f8831513bcbab1b9930b94"
git = "https://github.com/GlareDB/delta-rs.git"
rev = "94773cb304ebc5eaa48d7540eb01cdf08f8b401f"
features = ["s3", "gcs", "azure", "datafusion"]
7 changes: 6 additions & 1 deletion crates/datafusion_ext/src/planner/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
self.sql_expr_to_logical_expr(*right, schema, planner_context)
.await?,
);
let stride = Box::new(Expr::Literal(ScalarValue::Int64(Some(1))));

GetFieldAccess::ListRange { start, stop }
GetFieldAccess::ListRange {
start,
stop,
stride,
}
}
_ => GetFieldAccess::ListIndex {
key: Box::new(
Expand Down
2 changes: 2 additions & 0 deletions crates/datafusion_ext/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
| SQLDataType::Int64
| SQLDataType::Float64
| SQLDataType::Struct(_)
| SQLDataType::JSONB
| SQLDataType::Unspecified
=> not_impl_err!(
"Unsupported SQL type {sql_type:?}"
),
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ tiberius = { version = "0.12.2", default-features = false, features = [
"rustls",
"chrono",
] }
lance = { git = "https://github.com/GlareDB/lance", rev = "de6df70d9c5d95a4818b8799c23e3d1ad649bc1d" }
lance = { git = "https://github.com/GlareDB/lance", branch = "df36" }
bson = "2.9.0"
scylla = { version = "0.12.0" }
glob = "0.3.1"
Expand Down
18 changes: 13 additions & 5 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl ParseOptionValue<MongoDbProtocol> for OptionValue {
}
}


impl Display for MongoDbProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Expand Down Expand Up @@ -462,15 +461,24 @@ fn df_to_bson(val: ScalarValue) -> Result<Bson, ExtensionError> {
ScalarValue::UInt64(v) => Ok(Bson::Int64(i64::try_from(v.unwrap_or_default()).unwrap())),
ScalarValue::Float32(v) => Ok(Bson::Double(f64::from(v.unwrap_or_default()))),
ScalarValue::Float64(v) => Ok(Bson::Double(v.unwrap_or_default())),
ScalarValue::Struct(v, f) => {
ScalarValue::Struct(sa) => {
let mut doc = RawDocumentBuf::new();
for (key, value) in f.into_iter().zip(v.unwrap_or_default().into_iter()) {
let fields = sa.fields();
let columns = sa.columns();
for (field, column) in fields.iter().zip(columns.iter()) {
if column.len() != 1 {
return Err(ExtensionError::String(
"Struct column should have only one row".to_string(),
));
}
let sv = ScalarValue::try_from_array(column, 0).unwrap();
doc.append(
key.name(),
RawBson::try_from(df_to_bson(value)?)
field.name(),
RawBson::try_from(df_to_bson(sv)?)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
);
}

Ok(Bson::Document(
doc.to_document()
.map_err(|e| DataFusionError::External(Box::new(e)))?,
Expand Down
10 changes: 6 additions & 4 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{ExecutionPlan, Statistics};
use datafusion::prelude::Expr;
use datafusion_ext::metrics::ReadOnlyDataSourceMetricsExecAdapter;
use deltalake::delta_datafusion::DataFusionMixins;
use deltalake::kernel::{ArrayType, DataType as DeltaDataType};
use deltalake::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake::operations::create::CreateBuilder;
Expand Down Expand Up @@ -97,6 +98,7 @@ impl LogStoreFactory for FakeStoreFactory {
/// DeltaField represents data types as stored in Delta Lake, with additional
/// metadata for indicating the 'real' (original) type, for cases when
/// downcasting occurs.
#[derive(Debug)]
struct DeltaField {
data_type: DeltaDataType,
metadata: Option<HashMap<String, Value>>,
Expand All @@ -111,10 +113,9 @@ fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult<DeltaField> {
(&DataType::Timestamp(TimeUnit::Microsecond, tz.clone())).try_into()?;
let mut metadata = HashMap::new();
metadata.insert("arrow_type".to_string(), json!(dtype));

Ok(DeltaField {
data_type: delta_type,
metadata: None,
metadata: Some(metadata),
})
}
dtype @ DataType::FixedSizeList(fld, _) => {
Expand Down Expand Up @@ -216,7 +217,6 @@ impl NativeTableStorage {

for col in &opts.columns {
let delta_col = arrow_to_delta_safe(&col.arrow_type)?;

builder = builder.with_column(
col.name.clone(),
delta_col.data_type,
Expand All @@ -225,8 +225,9 @@ impl NativeTableStorage {
);
}

let delta_table = builder.await?;
// TODO: Partitioning
NativeTable::new(builder.await?)
NativeTable::new(delta_table)
};

Ok(tbl)
Expand Down Expand Up @@ -356,6 +357,7 @@ impl NativeTable {
} else {
SaveMode::Append
};

let store = self.delta.log_store();
let snapshot = self.delta.state.clone();
Arc::new(NativeTableInsertExec::new(
Expand Down
60 changes: 54 additions & 6 deletions crates/datasources/src/native/insert.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::common::ToDFSchema;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::logical_expr::{ident, Cast, Expr};
use datafusion::physical_expr::{create_physical_expr, PhysicalSortExpr};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs,
Expand All @@ -16,6 +19,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use deltalake::kernel::StructField;
use deltalake::logstore::LogStore;
use deltalake::operations::write::WriteBuilder;
use deltalake::protocol::SaveMode;
Expand Down Expand Up @@ -86,7 +90,7 @@ impl ExecutionPlan for NativeTableInsertExec {
input: children[0].clone(),
store: self.store.clone(),
snapshot: self.snapshot.clone(),
save_mode: self.save_mode.clone(),
save_mode: self.save_mode,
}))
}

Expand All @@ -107,16 +111,60 @@ impl ExecutionPlan for NativeTableInsertExec {
context.session_config().clone(),
context.runtime_env(),
);

let schema = self.input.schema();
let fields = schema.fields().clone();
let input_dfschema = schema.to_dfschema()?;
// delta-rs does not support all data types, so we need to check if the input schema
// contains any unsupported data types.
// If it does, we need to cast them to supported data types.
let mut contains_unsupported_fields = false;
let projections = fields
.iter()
.map(|field| {
let e = if StructField::try_from(field.as_ref()).is_ok() {
ident(field.name())
} else {
contains_unsupported_fields = true;

match field.data_type() {
DataType::Timestamp(_, _) => Expr::Cast(Cast {
expr: Box::new(ident(field.name())),
data_type: DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
),
}),

dtype => {
return Err(DataFusionError::Execution(format!(
"Unsupported data type {:?} for field {}",
dtype,
field.name()
)))
}
}
};
let e = create_physical_expr(&e, &input_dfschema, state.execution_props()).unwrap();
Ok((e, field.name().clone()))
})
.collect::<DataFusionResult<Vec<_>>>()?;

let input = if contains_unsupported_fields {
Arc::new(ProjectionExec::try_new(projections, self.input.clone())?)
} else {
self.input.clone()
};

// Allows writing multiple output partitions from the input execution
// plan.
//
// TODO: Possibly try avoiding cloning the snapshot.
let builder = WriteBuilder::new(self.store.clone(), Some(self.snapshot.clone()))
.with_input_session_state(state)
.with_save_mode(self.save_mode.clone())
.with_input_execution_plan(self.input.clone());
.with_save_mode(self.save_mode)
.with_input_execution_plan(input.clone());

let input = self.input.clone();
let output = futures::stream::once(async move {
let _ = builder
.await
Expand Down
3 changes: 2 additions & 1 deletion crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ impl TableProvider for ObjStoreTableProvider {
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
let (files, statistics) = get_statistics_with_limit(files, self.schema(), limit).await?;
let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit, true).await?;

// If there are no files, return an empty exec plan.
if files.is_empty() {
Expand Down
Loading