Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fixed a bunch of warnings and deprecations #3020

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::SystemTime;

use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::meta::credentials::CredentialsProviderChain;
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use arrow_schema::{DataType, Field};
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::functions_array::make_array::MakeArray;
use datafusion::functions_nested::make_array::MakeArray;
use datafusion::functions_nested::planner::{FieldAccessPlanner, NestedFunctionPlanner};
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::expr::InList;
Expand Down Expand Up @@ -104,6 +105,7 @@ impl ScalarUDFImpl for MakeParquetArray {
data_type = arg.data_type();
}

#[allow(deprecated)]
match self.actual.invoke(args)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
Expand All @@ -126,7 +128,7 @@ impl ScalarUDFImpl for MakeParquetArray {
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
self.actual.invoke_no_args(number_rows)
self.actual.invoke_batch(&[], number_rows)
}

fn aliases(&self) -> &[String] {
Expand All @@ -142,9 +144,7 @@ impl ScalarUDFImpl for MakeParquetArray {
}
}

use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner};

/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the
/// This exists because the NestedFunctionPlanner, _not_ the UserDefinedFunctionPlanner, handles the
/// insertion of "make_array" which is used to turn [100] into List<field=element, values=[100]>
///
/// **screaming intensifies**
Expand Down Expand Up @@ -567,8 +567,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
#[cfg(test)]
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::functions_array::expr_fn::cardinality;
use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use datafusion::functions_nested::expr_fn::cardinality;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::expr::ScalarFunction;
Expand Down
29 changes: 16 additions & 13 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ impl StatsScalar {
macro_rules! get_stat {
($val: expr) => {
if use_min {
*$val.min()
*$val.min_opt().unwrap()
} else {
*$val.max()
*$val.max_opt().unwrap()
}
};
}
Expand Down Expand Up @@ -304,10 +304,11 @@ impl StatsScalar {
(Statistics::Double(v), _) => Ok(Self::Float64(get_stat!(v))),
(Statistics::ByteArray(v), logical_type) => {
let bytes = if use_min {
v.min_bytes()
v.min_bytes_opt()
} else {
v.max_bytes()
};
v.max_bytes_opt()
}
.unwrap_or_default();
match logical_type {
None => Ok(Self::Bytes(bytes.to_vec())),
Some(LogicalType::String) => {
Expand All @@ -326,10 +327,11 @@ impl StatsScalar {
}
(Statistics::FixedLenByteArray(v), Some(LogicalType::Decimal { scale, precision })) => {
let val = if use_min {
v.min_bytes()
v.min_bytes_opt()
} else {
v.max_bytes()
};
v.max_bytes_opt()
}
.unwrap_or_default();

let val = if val.len() <= 16 {
i128::from_be_bytes(sign_extend_be(val)) as f64
Expand All @@ -356,10 +358,11 @@ impl StatsScalar {
}
(Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => {
let val = if use_min {
v.min_bytes()
v.min_bytes_opt()
} else {
v.max_bytes()
};
v.max_bytes_opt()
}
.unwrap_or_default();

if val.len() != 16 {
return Err(DeltaWriterError::StatsParsingFailed {
Expand Down Expand Up @@ -432,8 +435,8 @@ struct AggregatedStats {
impl From<(&Statistics, &Option<LogicalType>)> for AggregatedStats {
fn from(value: (&Statistics, &Option<LogicalType>)) -> Self {
let (stats, logical_type) = value;
let null_count = stats.null_count();
if stats.has_min_max_set() {
let null_count = stats.null_count_opt().unwrap_or_default();
if stats.min_bytes_opt().is_some() && stats.max_bytes_opt().is_some() {
let min = StatsScalar::try_from_stats(stats, logical_type, true).ok();
let max = StatsScalar::try_from_stats(stats, logical_type, false).ok();
Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/test/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let cfg = RuntimeConfig::new();
let env = RuntimeEnv::new(cfg).unwrap();
let env = RuntimeEnv::try_new(cfg).unwrap();
let ses = SessionConfig::new();
let mut state = SessionStateBuilder::new()
.with_config(ses)
Expand Down
4 changes: 2 additions & 2 deletions python/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use deltalake::kernel::TableFeatures as KernelTableFeatures;
use pyo3::pyclass;

/// High level table features
#[pyclass]
#[derive(Clone)]
#[pyclass(eq, eq_int)]
#[derive(Clone, PartialEq)]
pub enum TableFeatures {
/// Mapping of one column to another
ColumnMapping,
Expand Down
4 changes: 4 additions & 0 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,12 @@ impl ObjectInputFile {
Err(PyNotImplementedError::new_err("'truncate' not implemented"))
}

#[pyo3(signature = (_size=None))]
fn readline(&self, _size: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err("'readline' not implemented"))
}

#[pyo3(signature = (_hint=None))]
fn readlines(&self, _hint: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err(
"'readlines' not implemented",
Expand Down Expand Up @@ -666,10 +668,12 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'truncate' not implemented"))
}

#[pyo3(signature = (_size=None))]
fn readline(&self, _size: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err("'readline' not implemented"))
}

#[pyo3(signature = (_hint=None))]
fn readlines(&self, _hint: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err(
"'readlines' not implemented",
Expand Down
15 changes: 13 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl RawDeltaTable {
})
}

#[pyo3(signature = (partition_filters=None))]
pub fn files(
&self,
py: Python,
Expand Down Expand Up @@ -316,6 +317,7 @@ impl RawDeltaTable {
})
}

#[pyo3(signature = (partition_filters=None))]
pub fn file_uris(
&self,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
Expand Down Expand Up @@ -828,6 +830,7 @@ impl RawDeltaTable {
}

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
#[pyo3(signature = (limit=None))]
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()
.block_on(self._table.history(limit))
Expand All @@ -845,6 +848,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?)
}

#[pyo3(signature = (schema, partition_filters=None))]
pub fn dataset_partitions<'py>(
&mut self,
py: Python<'py>,
Expand Down Expand Up @@ -876,6 +880,7 @@ impl RawDeltaTable {
.collect()
}

#[pyo3(signature = (partitions_filters=None))]
fn get_active_partitions<'py>(
&self,
partitions_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
Expand Down Expand Up @@ -969,6 +974,7 @@ impl RawDeltaTable {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))]
fn create_write_transaction(
&mut self,
py: Python,
Expand Down Expand Up @@ -1431,7 +1437,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult<Bo
Double(val) => val.to_object(py),
Timestamp(_) => {
// We need to manually append 'Z' add to end so that pyarrow can cast the
// the scalar value to pa.timestamp("us","UTC")
// scalar value to pa.timestamp("us","UTC")
let value = value.serialize();
format!("{}Z", value).to_object(py)
}
Expand All @@ -1453,7 +1459,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult<Bo
}
py_struct.to_object(py)
}
Array(val) => todo!("how should this be converted!"),
Array(_val) => todo!("how should this be converted!"),
};

Ok(val.into_bound(py))
Expand Down Expand Up @@ -1747,6 +1753,7 @@ pub struct PyCommitProperties {

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
fn write_to_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -1828,6 +1835,7 @@ fn write_to_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
fn create_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -1884,6 +1892,7 @@ fn create_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, schema, add_actions, _mode, partition_by, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
fn write_new_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -1938,6 +1947,7 @@ fn write_new_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
fn convert_to_deltalake(
py: Python,
uri: String,
Expand Down Expand Up @@ -1992,6 +2002,7 @@ fn convert_to_deltalake(
}

#[pyfunction]
#[pyo3(signature = (table=None, configuration=None))]
fn get_num_idx_cols_and_stats_columns(
table: Option<&RawDeltaTable>,
configuration: Option<HashMap<String, Option<String>>>,
Expand Down
Loading