diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs
index 2a10d2825b..06fbf24654 100644
--- a/crates/aws/src/credentials.rs
+++ b/crates/aws/src/credentials.rs
@@ -4,7 +4,7 @@
 use std::collections::HashMap;
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
 
 use aws_config::default_provider::credentials::DefaultCredentialsChain;
 use aws_config::meta::credentials::CredentialsProviderChain;
diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs
index 54cebcfbd2..b1fe1f9472 100644
--- a/crates/core/src/delta_datafusion/expr.rs
+++ b/crates/core/src/delta_datafusion/expr.rs
@@ -28,7 +28,8 @@ use arrow_schema::{DataType, Field};
 use chrono::{DateTime, NaiveDate};
 use datafusion::execution::context::SessionState;
 use datafusion::execution::session_state::SessionStateBuilder;
-use datafusion::functions_array::make_array::MakeArray;
+use datafusion::functions_nested::make_array::MakeArray;
+use datafusion::functions_nested::planner::{FieldAccessPlanner, NestedFunctionPlanner};
 use datafusion_common::Result as DFResult;
 use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
 use datafusion_expr::expr::InList;
@@ -104,6 +105,7 @@ impl ScalarUDFImpl for MakeParquetArray {
             data_type = arg.data_type();
         }
 
+        #[allow(deprecated)]
         match self.actual.invoke(args)? {
             ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
                 let field = Arc::new(Field::new("element", data_type, true));
@@ -126,7 +128,7 @@ impl ScalarUDFImpl for MakeParquetArray {
     }
 
     fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
-        self.actual.invoke_no_args(number_rows)
+        self.actual.invoke_batch(&[], number_rows)
     }
 
     fn aliases(&self) -> &[String] {
@@ -142,9 +144,7 @@ impl ScalarUDFImpl for MakeParquetArray {
     }
 }
 
-use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner};
-
-/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the
+/// This exists because the NestedFunctionPlanner, _not_ the UserDefinedFunctionPlanner, handles the
 /// insertion of "make_array" which is used to turn [100] into List<field=element, values=[100]>
 ///
 /// **screaming intensifies**
@@ -567,8 +567,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
 #[cfg(test)]
 mod test {
     use arrow_schema::DataType as ArrowDataType;
-    use datafusion::functions_array::expr_fn::cardinality;
     use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
+    use datafusion::functions_nested::expr_fn::cardinality;
     use datafusion::prelude::SessionContext;
     use datafusion_common::{Column, ScalarValue, ToDFSchema};
     use datafusion_expr::expr::ScalarFunction;
diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs
index ae763f7b72..984754d510 100644
--- a/crates/core/src/writer/stats.rs
+++ b/crates/core/src/writer/stats.rs
@@ -252,9 +252,9 @@ impl StatsScalar {
         macro_rules! get_stat {
             ($val: expr) => {
                 if use_min {
-                    *$val.min()
+                    *$val.min_opt().unwrap()
                 } else {
-                    *$val.max()
+                    *$val.max_opt().unwrap()
                 }
             };
         }
@@ -304,10 +304,11 @@ impl StatsScalar {
             (Statistics::Double(v), _) => Ok(Self::Float64(get_stat!(v))),
             (Statistics::ByteArray(v), logical_type) => {
                 let bytes = if use_min {
-                    v.min_bytes()
+                    v.min_bytes_opt()
                 } else {
-                    v.max_bytes()
-                };
+                    v.max_bytes_opt()
+                }
+                .unwrap_or_default();
                 match logical_type {
                     None => Ok(Self::Bytes(bytes.to_vec())),
                     Some(LogicalType::String) => {
@@ -326,10 +327,11 @@ impl StatsScalar {
             }
             (Statistics::FixedLenByteArray(v), Some(LogicalType::Decimal { scale, precision })) => {
                 let val = if use_min {
-                    v.min_bytes()
+                    v.min_bytes_opt()
                 } else {
-                    v.max_bytes()
-                };
+                    v.max_bytes_opt()
+                }
+                .unwrap_or_default();
 
                 let val = if val.len() <= 16 {
                     i128::from_be_bytes(sign_extend_be(val)) as f64
@@ -356,10 +358,11 @@ impl StatsScalar {
             }
             (Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => {
                 let val = if use_min {
-                    v.min_bytes()
+                    v.min_bytes_opt()
                 } else {
-                    v.max_bytes()
-                };
+                    v.max_bytes_opt()
+                }
+                .unwrap_or_default();
 
                 if val.len() != 16 {
                     return Err(DeltaWriterError::StatsParsingFailed {
@@ -432,8 +435,8 @@ struct AggregatedStats {
 impl From<(&Statistics, &Option<LogicalType>)> for AggregatedStats {
     fn from(value: (&Statistics, &Option<LogicalType>)) -> Self {
         let (stats, logical_type) = value;
-        let null_count = stats.null_count();
-        if stats.has_min_max_set() {
+        let null_count = stats.null_count_opt().unwrap_or_default();
+        if stats.min_bytes_opt().is_some() && stats.max_bytes_opt().is_some() {
             let min = StatsScalar::try_from_stats(stats, logical_type, true).ok();
             let max = StatsScalar::try_from_stats(stats, logical_type, false).ok();
             Self {
diff --git a/crates/test/src/datafusion.rs b/crates/test/src/datafusion.rs
index f6357ab3b7..5ca73a742e 100644
--- a/crates/test/src/datafusion.rs
+++ b/crates/test/src/datafusion.rs
@@ -7,7 +7,7 @@ use std::sync::Arc;
 
 pub fn context_with_delta_table_factory() -> SessionContext {
     let cfg = RuntimeConfig::new();
-    let env = RuntimeEnv::new(cfg).unwrap();
+    let env = RuntimeEnv::try_new(cfg).unwrap();
     let ses = SessionConfig::new();
     let mut state = SessionStateBuilder::new()
         .with_config(ses)
diff --git a/python/src/features.rs b/python/src/features.rs
index 155f7aa365..400cb91b33 100644
--- a/python/src/features.rs
+++ b/python/src/features.rs
@@ -2,8 +2,8 @@ use deltalake::kernel::TableFeatures as KernelTableFeatures;
 use pyo3::pyclass;
 
 /// High level table features
-#[pyclass]
-#[derive(Clone)]
+#[pyclass(eq, eq_int)]
+#[derive(Clone, PartialEq)]
 pub enum TableFeatures {
     /// Mapping of one column to another
     ColumnMapping,
diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs
index 453d05e480..116b1b0cf1 100644
--- a/python/src/filesystem.rs
+++ b/python/src/filesystem.rs
@@ -503,10 +503,12 @@ impl ObjectInputFile {
         Err(PyNotImplementedError::new_err("'truncate' not implemented"))
     }
 
+    #[pyo3(signature = (_size=None))]
     fn readline(&self, _size: Option<i64>) -> PyResult<()> {
         Err(PyNotImplementedError::new_err("'readline' not implemented"))
     }
 
+    #[pyo3(signature = (_hint=None))]
     fn readlines(&self, _hint: Option<i64>) -> PyResult<()> {
         Err(PyNotImplementedError::new_err(
             "'readlines' not implemented",
@@ -666,10 +668,12 @@ impl ObjectOutputStream {
         Err(PyNotImplementedError::new_err("'truncate' not implemented"))
     }
 
+    #[pyo3(signature = (_size=None))]
     fn readline(&self, _size: Option<i64>) -> PyResult<()> {
         Err(PyNotImplementedError::new_err("'readline' not implemented"))
     }
 
+    #[pyo3(signature = (_hint=None))]
     fn readlines(&self, _hint: Option<i64>) -> PyResult<()> {
         Err(PyNotImplementedError::new_err(
             "'readlines' not implemented",
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 5a9a3ce237..304faa00c7 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -287,6 +287,7 @@ impl RawDeltaTable {
         })
     }
 
+    #[pyo3(signature = (partition_filters=None))]
     pub fn files(
         &self,
         py: Python,
@@ -316,6 +317,7 @@ impl RawDeltaTable {
         })
     }
 
+    #[pyo3(signature = (partition_filters=None))]
     pub fn file_uris(
         &self,
         partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
@@ -828,6 +830,7 @@ impl RawDeltaTable {
     }
 
     /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
+    #[pyo3(signature = (limit=None))]
     pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
         let history = rt()
             .block_on(self._table.history(limit))
@@ -845,6 +848,7 @@ impl RawDeltaTable {
             .map_err(PythonError::from)?)
     }
 
+    #[pyo3(signature = (schema, partition_filters=None))]
     pub fn dataset_partitions<'py>(
         &mut self,
         py: Python<'py>,
@@ -876,6 +880,7 @@ impl RawDeltaTable {
             .collect()
     }
 
+    #[pyo3(signature = (partitions_filters=None))]
     fn get_active_partitions<'py>(
         &self,
         partitions_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
@@ -969,6 +974,7 @@ impl RawDeltaTable {
     }
 
     #[allow(clippy::too_many_arguments)]
+    #[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))]
     fn create_write_transaction(
         &mut self,
         py: Python,
@@ -1431,7 +1437,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult<Bo
         Double(val) => val.to_object(py),
         Timestamp(_) => {
             // We need to manually append 'Z' add to end so that pyarrow can cast the
-            // the scalar value to pa.timestamp("us","UTC")
+            // scalar value to pa.timestamp("us","UTC")
             let value = value.serialize();
             format!("{}Z", value).to_object(py)
         }
@@ -1453,7 +1459,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult<Bo
             }
             py_struct.to_object(py)
         }
-        Array(val) => todo!("how should this be converted!"),
+        Array(_val) => todo!("how should this be converted!"),
     };
 
     Ok(val.into_bound(py))
@@ -1747,6 +1753,7 @@ pub struct PyCommitProperties {
 
 #[pyfunction]
 #[allow(clippy::too_many_arguments)]
+#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
 fn write_to_deltalake(
     py: Python,
     table_uri: String,
@@ -1828,6 +1835,7 @@ fn write_to_deltalake(
 
 #[pyfunction]
 #[allow(clippy::too_many_arguments)]
+#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
 fn create_deltalake(
     py: Python,
     table_uri: String,
@@ -1884,6 +1892,7 @@ fn create_deltalake(
 
 #[pyfunction]
 #[allow(clippy::too_many_arguments)]
+#[pyo3(signature = (table_uri, schema, add_actions, _mode, partition_by, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
 fn write_new_deltalake(
     py: Python,
     table_uri: String,
@@ -1938,6 +1947,7 @@ fn write_new_deltalake(
 
 #[pyfunction]
 #[allow(clippy::too_many_arguments)]
+#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
 fn convert_to_deltalake(
     py: Python,
     uri: String,
@@ -1992,6 +2002,7 @@ fn convert_to_deltalake(
 }
 
 #[pyfunction]
+#[pyo3(signature = (table=None, configuration=None))]
 fn get_num_idx_cols_and_stats_columns(
     table: Option<&RawDeltaTable>,
     configuration: Option<HashMap<String, Option<String>>>,