diff --git a/daft/daft.pyi b/daft/daft.pyi index 58e7f05bdb..d5ab46cb89 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -156,14 +156,40 @@ class PartitionSpec: scheme: PartitionScheme num_partitions: int by: list[PyExpr] + scheme_config: RangeConfig | HashConfig | RandomConfig | UnknownConfig - def __init__( - self, scheme: PartitionScheme = PartitionScheme.Unknown, num_partitions: int = 0, by: list[PyExpr] | None = None - ): ... + @staticmethod + def range(by: list[PyExpr], num_partitions: int, descending: list[bool]) -> PartitionSpec: ... + @staticmethod + def hash(by: list[PyExpr], num_partitions: int) -> PartitionSpec: ... + @staticmethod + def random(num_partitions: int) -> PartitionSpec: ... + @staticmethod + def unknown(num_partitions: int) -> PartitionSpec: ... def __eq__(self, other: PartitionSpec) -> bool: ... # type: ignore[override] def __ne__(self, other: PartitionSpec) -> bool: ... # type: ignore[override] def __str__(self) -> str: ... +class RangeConfig: + """Configuration of a range partitioning.""" + + descending: list[bool] + +class HashConfig: + """Configuration of a hash partitioning.""" + + ... + +class RandomConfig: + """Configuration of a random partitioning.""" + + ... + +class UnknownConfig: + """Configuration of an unknown partitioning.""" + + ... + class ResourceRequest: """ Resource request for a query fragment task. diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 30a84c9c82..6c4b434c11 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -4,6 +4,7 @@ use crate::{ logical_ops, logical_plan::LogicalPlan, optimization::Optimizer, + partitioning::PartitionSchemeConfig, planner::plan, sink_info::{OutputFileInfo, SinkInfo}, source_info::{ @@ -162,13 +163,13 @@ impl LogicalPlanBuilder { &self, num_partitions: Option, partition_by: Vec, - scheme: PartitionScheme, + scheme_config: PartitionSchemeConfig, ) -> DaftResult { let logical_plan: LogicalPlan = logical_ops::Repartition::try_new( self.plan.clone(), num_partitions, partition_by, - scheme, + scheme_config, )? .into(); Ok(logical_plan.into()) @@ -380,17 +381,48 @@ impl PyLogicalPlanBuilder { pub fn repartition( &self, + py: Python<'_>, partition_by: Vec, scheme: PartitionScheme, num_partitions: Option, + scheme_config: Option, ) -> PyResult { let partition_by_exprs: Vec = partition_by .iter() .map(|expr| expr.clone().into()) .collect(); + let partition_scheme_config = match scheme { + PartitionScheme::Range => { + if let Some(scheme_config) = scheme_config { + PartitionSchemeConfig::Range(scheme_config.extract(py)?) + } else { + return Err(DaftError::ValueError( + "Must provide a scheme config with ascending/descending list if repartitioning by range.".to_string(), + ).into()); + } + } + PartitionScheme::Hash => PartitionSchemeConfig::Hash( + scheme_config + .map(|c| c.extract(py)) + .transpose()? + .unwrap_or_default(), + ), + PartitionScheme::Random => PartitionSchemeConfig::Random( + scheme_config + .map(|c| c.extract(py)) + .transpose()? + .unwrap_or_default(), + ), + PartitionScheme::Unknown => PartitionSchemeConfig::Unknown( + scheme_config + .map(|c| c.extract(py)) + .transpose()? + .unwrap_or_default(), + ), + }; Ok(self .builder - .repartition(num_partitions, partition_by_exprs, scheme)? + .repartition(num_partitions, partition_by_exprs, partition_scheme_config)? .into()) } diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index 9901a3ff0d..1d4057827f 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -28,7 +28,7 @@ use daft_scan::{ }; pub use join::{JoinStrategy, JoinType}; pub use logical_plan::LogicalPlan; -pub use partitioning::{PartitionScheme, PartitionSpec}; +pub use partitioning::{PartitionScheme, PartitionSchemeConfig, PartitionSpec}; pub use physical_plan::PhysicalPlanScheduler; pub use resource_request::ResourceRequest; pub use source_info::{FileInfo, FileInfos}; @@ -36,6 +36,8 @@ pub use source_info::{FileInfo, FileInfos}; #[cfg(feature = "python")] use daft_scan::storage_config::PythonStorageConfig; #[cfg(feature = "python")] +use partitioning::PyPartitionSpec; +#[cfg(feature = "python")] use pyo3::prelude::*; #[cfg(feature = "python")] @@ -46,7 +48,7 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; - parent.add_class::()?; + parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-plan/src/logical_ops/repartition.rs b/src/daft-plan/src/logical_ops/repartition.rs index 56739a0203..807f64b6d7 100644 --- a/src/daft-plan/src/logical_ops/repartition.rs +++ b/src/daft-plan/src/logical_ops/repartition.rs @@ -4,7 +4,7 @@ use common_error::{DaftError, DaftResult}; use daft_dsl::Expr; use itertools::Itertools; -use crate::{LogicalPlan, PartitionScheme}; +use crate::{partitioning::PartitionSchemeConfig, LogicalPlan}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Repartition { @@ -12,7 +12,7 @@ pub struct Repartition { pub input: Arc, pub num_partitions: Option, pub partition_by: Vec, - pub scheme: PartitionScheme, + pub scheme_config: PartitionSchemeConfig, } impl Repartition { @@ -20,9 +20,9 @@ impl Repartition { input: Arc, num_partitions: Option, partition_by: Vec, - scheme: PartitionScheme, + scheme_config: PartitionSchemeConfig, ) -> DaftResult { - if matches!(scheme, PartitionScheme::Range) { + if matches!(scheme_config, PartitionSchemeConfig::Range(_)) { return Err(DaftError::ValueError( "Repartitioning with the Range partition scheme is not supported.".to_string(), )); @@ -31,13 +31,22 @@ impl Repartition { input, num_partitions, partition_by, - scheme, + scheme_config, }) } pub fn multiline_display(&self) -> Vec { let mut res = vec![]; - res.push(format!("Repartition: Scheme = {:?}", self.scheme)); + let scheme_config = self.scheme_config.multiline_display(); + res.push(format!( + "Repartition: Scheme = {}{}", + self.scheme_config.var_name(), + if scheme_config.is_empty() { + "".to_string() + } else { + format!("({})", scheme_config.join(", ")) + } + )); res.push(format!( "Number of partitions = {}", self.num_partitions diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 5db0c5fee7..c3b8d989f8 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -159,7 +159,7 @@ impl LogicalPlan { Self::Limit(Limit { limit, eager, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager)), Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()), Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), - Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), + Self::Repartition(Repartition { num_partitions, partition_by, scheme_config: scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()), diff --git a/src/daft-plan/src/optimization/rules/drop_repartition.rs b/src/daft-plan/src/optimization/rules/drop_repartition.rs index 1059b58eda..f0449d6a8b 100644 --- a/src/daft-plan/src/optimization/rules/drop_repartition.rs +++ b/src/daft-plan/src/optimization/rules/drop_repartition.rs @@ -58,7 +58,7 @@ mod tests { Optimizer, }, test::dummy_scan_node, - LogicalPlan, PartitionScheme, + LogicalPlan, PartitionSchemeConfig, }; /// Helper that creates an optimizer with the DropRepartition rule registered, optimizes @@ -94,8 +94,16 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? - .repartition(Some(5), vec![col("a")], PartitionScheme::Hash)? + .repartition( + Some(10), + vec![col("a")], + PartitionSchemeConfig::Hash(Default::default()), + )? + .repartition( + Some(5), + vec![col("a")], + PartitionSchemeConfig::Hash(Default::default()), + )? .build(); let expected = "\ Repartition: Scheme = Hash, Number of partitions = 5, Partition by = col(a)\ diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index 3f70af7e0f..c51de47691 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -293,7 +293,7 @@ mod tests { Optimizer, }, test::{dummy_scan_node, dummy_scan_operator_node}, - JoinType, LogicalPlan, PartitionScheme, + JoinType, LogicalPlan, PartitionSchemeConfig, }; /// Helper that creates an optimizer with the PushDownFilter rule registered, optimizes @@ -492,7 +492,11 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? + .repartition( + Some(1), + vec![col("a")], + PartitionSchemeConfig::Hash(Default::default()), + )? .filter(col("a").lt(&lit(2)))? .build(); let expected = "\ diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 5d975b8b8c..dab79c3607 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -135,7 +135,7 @@ mod tests { dummy_scan_node, dummy_scan_node_with_pushdowns, dummy_scan_operator_node_with_pushdowns, }, - LogicalPlan, PartitionScheme, + LogicalPlan, PartitionSchemeConfig, }; /// Helper that creates an optimizer with the PushDownLimit rule registered, optimizes @@ -315,7 +315,11 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? + .repartition( + Some(1), + vec![col("a")], + PartitionSchemeConfig::Hash(Default::default()), + )? .limit(5, false)? .build(); let expected = "\ diff --git a/src/daft-plan/src/partitioning.rs b/src/daft-plan/src/partitioning.rs index a61d3d5694..05e4f66801 100644 --- a/src/daft-plan/src/partitioning.rs +++ b/src/daft-plan/src/partitioning.rs @@ -1,4 +1,7 @@ -use std::fmt::{Display, Formatter}; +use std::{ + fmt::{Display, Formatter}, + sync::Arc, +}; use daft_dsl::Expr; @@ -10,8 +13,8 @@ use serde::{Deserialize, Serialize}; use { daft_dsl::python::PyExpr, pyo3::{ - pyclass, pyclass::CompareOp, pymethods, types::PyBytes, PyObject, PyResult, PyTypeInfo, - Python, ToPyObject, + pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, PyObject, PyResult, + PyTypeInfo, Python, ToPyObject, }, }; @@ -34,23 +37,189 @@ impl Display for PartitionScheme { } } -/// Partition specification: scheme, number of partitions, partition column. +/// Partition scheme for Daft DataFrame. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum PartitionSchemeConfig { + Range(RangeConfig), + Hash(HashConfig), + Random(RandomConfig), + Unknown(UnknownConfig), +} + +impl PartitionSchemeConfig { + pub fn var_name(&self) -> &'static str { + match self { + Self::Range(_) => "Range", + Self::Hash(_) => "Hash", + Self::Random(_) => "Random", + Self::Unknown(_) => "Unknown", + } + } + + pub fn to_scheme(&self) -> PartitionScheme { + match self { + Self::Range(_) => PartitionScheme::Range, + Self::Hash(_) => PartitionScheme::Hash, + Self::Random(_) => PartitionScheme::Random, + Self::Unknown(_) => PartitionScheme::Unknown, + } + } + + pub fn multiline_display(&self) -> Vec { + match self { + Self::Range(conf) => conf.multiline_display(), + Self::Hash(conf) => conf.multiline_display(), + Self::Random(conf) => conf.multiline_display(), + Self::Unknown(conf) => conf.multiline_display(), + } + } +} + +impl Default for PartitionSchemeConfig { + fn default() -> Self { + Self::Unknown(Default::default()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] +pub struct RangeConfig { + pub descending: Vec, +} + +impl RangeConfig { + pub fn new_internal(descending: Vec) -> Self { + Self { descending } + } + + pub fn multiline_display(&self) -> Vec { + vec![self.descending.iter().join(", ")] + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl RangeConfig { + /// Create a config for range partitioning. + #[new] + fn new(descending: Vec) -> Self { + Self::new_internal(descending) + } +} +impl_bincode_py_state_serialization!(RangeConfig); + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +pub struct HashConfig {} + +impl HashConfig { + pub fn new_internal() -> Self { + Self {} + } + + pub fn multiline_display(&self) -> Vec { + vec![] + } +} + +impl Default for HashConfig { + fn default() -> Self { + Self::new_internal() + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl HashConfig { + /// Create a config for hash partitioning. + #[new] + fn new() -> Self { + Self::new_internal() + } +} + +impl_bincode_py_state_serialization!(HashConfig); + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +pub struct RandomConfig {} + +impl RandomConfig { + pub fn new_internal() -> Self { + Self {} + } + + pub fn multiline_display(&self) -> Vec { + vec![] + } +} + +impl Default for RandomConfig { + fn default() -> Self { + Self::new_internal() + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl RandomConfig { + /// Create a config for random partitioning. + #[new] + fn new() -> Self { + Self::new_internal() + } +} + +impl_bincode_py_state_serialization!(RandomConfig); + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +pub struct UnknownConfig {} + +impl UnknownConfig { + pub fn new_internal() -> Self { + Self {} + } + + pub fn multiline_display(&self) -> Vec { + vec![] + } +} + +impl Default for UnknownConfig { + fn default() -> Self { + Self::new_internal() + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl UnknownConfig { + /// Create a config for unknown partitioning. + #[new] + fn new() -> Self { + Self::new_internal() + } +} + +impl_bincode_py_state_serialization!(UnknownConfig); + +/// Partition specification: scheme config, number of partitions, partition column. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct PartitionSpec { - pub scheme: PartitionScheme, + pub scheme_config: PartitionSchemeConfig, pub num_partitions: usize, pub by: Option>, } impl PartitionSpec { - pub fn new_internal( - scheme: PartitionScheme, + pub fn new( + scheme_config: PartitionSchemeConfig, num_partitions: usize, by: Option>, ) -> Self { Self { - scheme, + scheme_config, num_partitions, by, } @@ -58,7 +227,15 @@ impl PartitionSpec { pub fn multiline_display(&self) -> Vec { let mut res = vec![]; - res.push(format!("Scheme = {}", self.scheme)); + res.push(format!("Scheme = {}", self.scheme_config.to_scheme())); + let scheme_config = self.scheme_config.multiline_display(); + if !scheme_config.is_empty() { + res.push(format!( + "{} config = {}", + self.scheme_config.var_name(), + scheme_config.join(", ") + )); + } res.push(format!("Num partitions = {}", self.num_partitions)); if let Some(ref by) = self.by { res.push(format!( @@ -72,56 +249,129 @@ impl PartitionSpec { impl Default for PartitionSpec { fn default() -> Self { - Self { - scheme: PartitionScheme::Unknown, - num_partitions: 1, - by: None, - } + Self::new(PartitionSchemeConfig::Unknown(Default::default()), 1, None) } } +/// Configuration for parsing a particular file format. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(transparent)] +#[cfg_attr( + feature = "python", + pyclass(module = "daft.daft", name = "PartitionSpec") +)] +pub struct PyPartitionSpec(Arc); + #[cfg(feature = "python")] #[pymethods] -impl PartitionSpec { - #[new] - #[pyo3(signature = (scheme=PartitionScheme::Unknown, num_partitions=0usize, by=None))] - pub fn new(scheme: PartitionScheme, num_partitions: usize, by: Option>) -> Self { - Self::new_internal( - scheme, - num_partitions, - by.map(|v| v.iter().map(|e| e.clone().into()).collect()), +impl PyPartitionSpec { + /// Create a range partitioning spec. + #[staticmethod] + fn range(by: Vec, num_partitions: usize, descending: Vec) -> Self { + Self( + PartitionSpec::new( + PartitionSchemeConfig::Range(RangeConfig::new_internal(descending)), + num_partitions, + Some(by.iter().map(|e| e.clone().into()).collect()), + ) + .into(), + ) + } + + /// Create a hash partitioning spec. + #[staticmethod] + fn hash(by: Vec, num_partitions: usize) -> Self { + Self( + PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), + num_partitions, + Some(by.iter().map(|e| e.clone().into()).collect()), + ) + .into(), + ) + } + + /// Create a random partitioning spec. + #[staticmethod] + fn random(num_partitions: usize) -> Self { + Self( + PartitionSpec::new( + PartitionSchemeConfig::Random(Default::default()), + num_partitions, + None, + ) + .into(), + ) + } + + /// Create a unknown partitioning spec. + #[staticmethod] + fn unknown(num_partitions: usize) -> Self { + Self( + PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), + num_partitions, + None, + ) + .into(), ) } #[getter] pub fn get_scheme(&self) -> PyResult { - Ok(self.scheme.clone()) + Ok(self.0.scheme_config.to_scheme()) } #[getter] pub fn get_num_partitions(&self) -> PyResult { - Ok(self.num_partitions as i64) + Ok(self.0.num_partitions as i64) } #[getter] pub fn get_by(&self) -> PyResult>> { Ok(self + .0 .by .as_ref() .map(|v| v.iter().map(|e| e.clone().into()).collect())) } + /// Get the underlying partitioning scheme config. + #[getter] + fn get_scheme_config(&self, py: Python) -> PyObject { + use PartitionSchemeConfig::*; + + match &self.0.scheme_config { + Range(config) => config.clone().into_py(py), + Hash(config) => config.clone().into_py(py), + Random(config) => config.clone().into_py(py), + Unknown(config) => config.clone().into_py(py), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self.0)) + } + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { match op { - CompareOp::Eq => self == other, + CompareOp::Eq => self.0 == other.0, CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), _ => unimplemented!("not implemented"), } } +} - pub fn __str__(&self) -> PyResult { - Ok(format!("{:?}", self)) +impl_bincode_py_state_serialization!(PyPartitionSpec); + +impl From for Arc { + fn from(partition_spec: PyPartitionSpec) -> Self { + partition_spec.0 } } -impl_bincode_py_state_serialization!(PartitionSpec); +impl From> for PyPartitionSpec { + fn from(partition_spec: Arc) -> Self { + Self(partition_spec) + } +} diff --git a/src/daft-plan/src/physical_ops/explode.rs b/src/daft-plan/src/physical_ops/explode.rs index e9751d688a..b0e6308ddc 100644 --- a/src/daft-plan/src/physical_ops/explode.rs +++ b/src/daft-plan/src/physical_ops/explode.rs @@ -4,7 +4,7 @@ use common_error::DaftResult; use daft_dsl::{optimization::get_required_columns, Expr}; use itertools::Itertools; -use crate::{physical_plan::PhysicalPlanRef, PartitionScheme, PartitionSpec}; +use crate::{physical_plan::PhysicalPlanRef, PartitionSpec}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -29,12 +29,12 @@ impl Explode { input_pspec: Arc, to_explode: &Vec, ) -> Arc { - use crate::PartitionScheme::*; - match input_pspec.scheme { + use crate::PartitionSchemeConfig::*; + match input_pspec.scheme_config { // If the scheme is vacuous, the result partiiton spec is the same. - Random | Unknown => input_pspec.clone(), + Random(_) | Unknown(_) => input_pspec.clone(), // Otherwise, need to reevaluate the partition scheme for each expression. - Range | Hash => { + Range(_) | Hash(_) => { let required_cols_for_pspec = input_pspec .by .as_ref() @@ -48,8 +48,8 @@ impl Explode { let newname = expr.name().unwrap().to_string(); // if we clobber one of the required columns for the pspec, invalidate it. if required_cols_for_pspec.contains(&newname) { - return PartitionSpec::new_internal( - PartitionScheme::Unknown, + return PartitionSpec::new( + Unknown(Default::default()), input_pspec.num_partitions, None, ) @@ -82,7 +82,7 @@ mod tests { use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, Expr}; - use crate::{planner::plan, test::dummy_scan_node, PartitionScheme, PartitionSpec}; + use crate::{planner::plan, test::dummy_scan_node, PartitionSchemeConfig, PartitionSpec}; /// do not destroy the partition spec. #[test] @@ -97,15 +97,18 @@ mod tests { .repartition( Some(3), vec![Expr::Column("a".into())], - PartitionScheme::Hash, + PartitionSchemeConfig::Hash(Default::default()), )? .explode(vec![col("b")])? .build(); let physical_plan = plan(&logical_plan, cfg)?; - let expected_pspec = - PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("a")])); + let expected_pspec = PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), + 3, + Some(vec![col("a")]), + ); assert_eq!( expected_pspec, @@ -128,14 +131,15 @@ mod tests { .repartition( Some(3), vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, + PartitionSchemeConfig::Hash(Default::default()), )? .explode(vec![col("b")])? .build(); let physical_plan = plan(&logical_plan, cfg)?; - let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Unknown, 3, None); + let expected_pspec = + PartitionSpec::new(PartitionSchemeConfig::Unknown(Default::default()), 3, None); assert_eq!( expected_pspec, diff --git a/src/daft-plan/src/physical_ops/fanout.rs b/src/daft-plan/src/physical_ops/fanout.rs index 5e1d315cb4..7f1bdae0a0 100644 --- a/src/daft-plan/src/physical_ops/fanout.rs +++ b/src/daft-plan/src/physical_ops/fanout.rs @@ -62,25 +62,35 @@ pub struct FanoutByRange { pub input: PhysicalPlanRef, pub num_partitions: usize, pub sort_by: Vec, + pub descending: Vec, } impl FanoutByRange { #[allow(dead_code)] - pub(crate) fn new(input: PhysicalPlanRef, num_partitions: usize, sort_by: Vec) -> Self { + pub(crate) fn new( + input: PhysicalPlanRef, + num_partitions: usize, + sort_by: Vec, + descending: Vec, + ) -> Self { Self { input, num_partitions, sort_by, + descending, } } pub fn multiline_display(&self) -> Vec { let mut res = vec![]; res.push(format!("FanoutByRange: {}", self.num_partitions)); - res.push(format!( - "Sort by = {}", - self.sort_by.iter().map(|e| e.to_string()).join(", ") - )); + let pairs = self + .sort_by + .iter() + .zip(self.descending.iter()) + .map(|(sb, d)| format!("({}, {})", sb, if *d { "descending" } else { "ascending" },)) + .join(", "); + res.push(format!("Sort by = {}", pairs,)); res } } diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index cfe6003b3f..5fba44d476 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -5,7 +5,10 @@ use daft_dsl::Expr; use indexmap::IndexMap; use itertools::Itertools; -use crate::{physical_plan::PhysicalPlanRef, PartitionScheme, PartitionSpec, ResourceRequest}; +use crate::{ + partitioning::PartitionSchemeConfig, physical_plan::PhysicalPlanRef, PartitionSpec, + ResourceRequest, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -40,12 +43,12 @@ impl Project { // Given an input partition spec, and a new projection, // produce the new partition spec. - use crate::PartitionScheme::*; - match input_pspec.scheme { + use crate::partitioning::PartitionSchemeConfig::*; + match input_pspec.scheme_config { // If the scheme is vacuous, the result partiiton spec is the same. - Random | Unknown => input_pspec.clone(), + Random(_) | Unknown(_) => input_pspec.clone(), // Otherwise, need to reevaluate the partition scheme for each expression. - Range | Hash => { + Range(_) | Hash(_) => { // See what columns the projection directly translates into new columns. let mut old_colname_to_new_colname = IndexMap::new(); for expr in projection { @@ -69,16 +72,16 @@ impl Project { .collect::, _>>(); maybe_new_pspec.map_or_else( |()| { - PartitionSpec::new_internal( - PartitionScheme::Unknown, + PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), input_pspec.num_partitions, None, ) .into() }, |new_pspec: Vec| { - PartitionSpec::new_internal( - input_pspec.scheme.clone(), + PartitionSpec::new( + input_pspec.scheme_config.clone(), input_pspec.num_partitions, Some(new_pspec), ) @@ -231,7 +234,9 @@ mod tests { use daft_dsl::{col, lit, Expr}; use rstest::rstest; - use crate::{planner::plan, test::dummy_scan_node, PartitionScheme, PartitionSpec}; + use crate::{ + partitioning::PartitionSchemeConfig, planner::plan, test::dummy_scan_node, PartitionSpec, + }; /// Test that projections preserving column inputs, even through aliasing, /// do not destroy the partition spec. @@ -251,15 +256,18 @@ mod tests { .repartition( Some(3), vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, + PartitionSchemeConfig::Hash(Default::default()), )? .project(expressions, Default::default())? .build(); let physical_plan = plan(&logical_plan, cfg)?; - let expected_pspec = - PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("aa"), col("b")])); + let expected_pspec = PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), + 3, + Some(vec![col("aa"), col("b")]), + ); assert_eq!( expected_pspec, @@ -281,6 +289,8 @@ mod tests { )] projection: Vec, ) -> DaftResult<()> { + use crate::partitioning::PartitionSchemeConfig; + let cfg = DaftExecutionConfig::default().into(); let logical_plan = dummy_scan_node(vec![ Field::new("a", DataType::Int64), @@ -290,14 +300,15 @@ mod tests { .repartition( Some(3), vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, + PartitionSchemeConfig::Hash(Default::default()), )? .project(projection, Default::default())? .build(); let physical_plan = plan(&logical_plan, cfg)?; - let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Unknown, 3, None); + let expected_pspec = + PartitionSpec::new(PartitionSchemeConfig::Unknown(Default::default()), 3, None); assert_eq!( expected_pspec, physical_plan.partition_spec().as_ref().clone() @@ -321,15 +332,18 @@ mod tests { .repartition( Some(3), vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, + PartitionSchemeConfig::Hash(Default::default()), )? .project(expressions, Default::default())? .build(); let physical_plan = plan(&logical_plan, cfg)?; - let expected_pspec = - PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("a"), col("b")])); + let expected_pspec = PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), + 3, + Some(vec![col("a"), col("b")]), + ); assert_eq!( expected_pspec, diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index fdad24498c..e306d1e593 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -1,6 +1,7 @@ #[cfg(feature = "python")] use { crate::{ + partitioning::PyPartitionSpec, sink_info::OutputFileInfo, source_info::{FileInfos, InMemoryInfo, LegacyExternalInfo}, }, @@ -26,7 +27,12 @@ use daft_core::impl_bincode_py_state_serialization; use serde::{Deserialize, Serialize}; use std::{cmp::max, sync::Arc}; -use crate::{display::TreeDisplay, physical_ops::*, PartitionScheme, PartitionSpec}; +use crate::{ + display::TreeDisplay, + partitioning::{PartitionSchemeConfig, RangeConfig}, + physical_ops::*, + PartitionSpec, +}; pub(crate) type PhysicalPlanRef = Arc; @@ -86,8 +92,13 @@ impl PhysicalPlan { input.partition_spec().clone() } - Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal( - PartitionScheme::Range, + Self::Sort(Sort { + input, + sort_by, + descending, + .. + }) => PartitionSpec::new( + PartitionSchemeConfig::Range(RangeConfig::new_internal(descending.clone())), input.partition_spec().num_partitions, Some(sort_by.clone()), ) @@ -95,23 +106,31 @@ impl PhysicalPlan { Self::Split(Split { output_num_partitions, .. - }) => { - PartitionSpec::new_internal(PartitionScheme::Unknown, *output_num_partitions, None) - .into() - } - Self::Coalesce(Coalesce { num_to, .. }) => { - PartitionSpec::new_internal(PartitionScheme::Unknown, *num_to, None).into() - } + }) => PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), + *output_num_partitions, + None, + ) + .into(), + Self::Coalesce(Coalesce { num_to, .. }) => PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), + *num_to, + None, + ) + .into(), Self::Flatten(Flatten { input }) => input.partition_spec(), - Self::FanoutRandom(FanoutRandom { num_partitions, .. }) => { - PartitionSpec::new_internal(PartitionScheme::Random, *num_partitions, None).into() - } + Self::FanoutRandom(FanoutRandom { num_partitions, .. }) => PartitionSpec::new( + PartitionSchemeConfig::Random(Default::default()), + *num_partitions, + None, + ) + .into(), Self::FanoutByHash(FanoutByHash { num_partitions, partition_by, .. - }) => PartitionSpec::new_internal( - PartitionScheme::Hash, + }) => PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), *num_partitions, Some(partition_by.clone()), ) @@ -119,9 +138,10 @@ impl PhysicalPlan { Self::FanoutByRange(FanoutByRange { num_partitions, sort_by, + descending, .. - }) => PartitionSpec::new_internal( - PartitionScheme::Range, + }) => PartitionSpec::new( + PartitionSchemeConfig::Range(RangeConfig::new_internal(descending.clone())), *num_partitions, Some(sort_by.clone()), ) @@ -132,18 +152,19 @@ impl PhysicalPlan { if input_partition_spec.num_partitions == 1 { input_partition_spec.clone() } else if groupby.is_empty() { - PartitionSpec::new_internal(PartitionScheme::Unknown, 1, None).into() + PartitionSpec::new(PartitionSchemeConfig::Unknown(Default::default()), 1, None) + .into() } else { - PartitionSpec::new_internal( - PartitionScheme::Hash, + PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), input.partition_spec().num_partitions, Some(groupby.clone()), ) .into() } } - Self::Concat(Concat { input, other }) => PartitionSpec::new_internal( - PartitionScheme::Unknown, + Self::Concat(Concat { input, other }) => PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), input.partition_spec().num_partitions + other.partition_spec().num_partitions, None, ) @@ -164,8 +185,8 @@ impl PhysicalPlan { // TODO(Clark): Consolidate this logic with the planner logic when we push the partition spec // to be an entirely planner-side concept. 1 => input_partition_spec, - num_partitions => PartitionSpec::new_internal( - PartitionScheme::Hash, + num_partitions => PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), num_partitions, Some(left_on.clone()), ) @@ -180,8 +201,11 @@ impl PhysicalPlan { right, left_on, .. - }) => PartitionSpec::new_internal( - PartitionScheme::Range, + }) => PartitionSpec::new( + // TODO(Clark): Propagate descending vec once our sort-merge join supports more than all-ascending order. + PartitionSchemeConfig::Range(RangeConfig::new_internal( + std::iter::repeat(false).take(left_on.len()).collect(), + )), max( left.partition_spec().num_partitions, right.partition_spec().num_partitions, @@ -328,7 +352,7 @@ impl PhysicalPlan { Self::Flatten(..) => Self::Flatten(Flatten::new(input.clone())), Self::FanoutRandom(FanoutRandom { num_partitions, .. }) => Self::FanoutRandom(FanoutRandom::new(input.clone(), *num_partitions)), Self::FanoutByHash(FanoutByHash { num_partitions, partition_by, .. }) => Self::FanoutByHash(FanoutByHash::new(input.clone(), *num_partitions, partition_by.clone())), - Self::FanoutByRange(FanoutByRange { num_partitions, sort_by, .. }) => Self::FanoutByRange(FanoutByRange::new(input.clone(), *num_partitions, sort_by.clone())), + Self::FanoutByRange(FanoutByRange { num_partitions, sort_by, descending, .. }) => Self::FanoutByRange(FanoutByRange::new(input.clone(), *num_partitions, sort_by.clone(), descending.clone())), Self::ReduceMerge(..) => Self::ReduceMerge(ReduceMerge::new(input.clone())), Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::new(input.clone(), aggregations.clone(), groupby.clone())), Self::TabularWriteParquet(TabularWriteParquet { schema, file_info, .. }) => Self::TabularWriteParquet(TabularWriteParquet::new(schema.clone(), file_info.clone(), input.clone())), @@ -459,10 +483,10 @@ pub struct PhysicalPlanScheduler { #[pymethods] impl PhysicalPlanScheduler { pub fn num_partitions(&self) -> PyResult { - self.plan.partition_spec().get_num_partitions() + Ok(self.plan.partition_spec().num_partitions as i64) } - pub fn partition_spec(&self) -> PyResult { - Ok(self.plan.partition_spec().as_ref().clone()) + pub fn partition_spec(&self) -> PyResult { + Ok(Arc::new(self.plan.partition_spec().as_ref().clone()).into()) } pub fn repr_ascii(&self, simple: bool) -> PyResult { diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 5e55477ea8..e5d999b2a3 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -21,11 +21,12 @@ use crate::logical_ops::{ Sort as LogicalSort, Source, }; use crate::logical_plan::LogicalPlan; +use crate::partitioning::PartitionSchemeConfig; use crate::physical_plan::PhysicalPlan; use crate::sink_info::{OutputFileInfo, SinkInfo}; use crate::source_info::{ExternalInfo as ExternalSourceInfo, LegacyExternalInfo, SourceInfo}; +use crate::FileFormat; use crate::{physical_ops::*, JoinStrategy, PartitionSpec}; -use crate::{FileFormat, PartitionScheme}; #[cfg(feature = "python")] use crate::physical_ops::InMemoryScan; @@ -45,8 +46,8 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe .. }, )) => { - let partition_spec = Arc::new(PartitionSpec::new_internal( - PartitionScheme::Unknown, + let partition_spec = Arc::new(PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), file_infos.len(), None, )); @@ -100,8 +101,8 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe ); let scan_tasks = scan_tasks.collect::>>()?; if scan_tasks.is_empty() { - let partition_spec = Arc::new(PartitionSpec::new_internal( - PartitionScheme::Unknown, + let partition_spec = Arc::new(PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), 1, None, )); @@ -111,8 +112,8 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe partition_spec, ))) } else { - let partition_spec = Arc::new(PartitionSpec::new_internal( - PartitionScheme::Unknown, + let partition_spec = Arc::new(PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), scan_tasks.len(), None, )); @@ -128,8 +129,12 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe let scan = PhysicalPlan::InMemoryScan(InMemoryScan::new( mem_info.source_schema.clone(), mem_info.clone(), - PartitionSpec::new(PartitionScheme::Unknown, mem_info.num_partitions, None) - .into(), + PartitionSpec::new( + PartitionSchemeConfig::Unknown(Default::default()), + mem_info.num_partitions, + None, + ) + .into(), )); Ok(scan) } @@ -197,66 +202,68 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe input, num_partitions, partition_by, - scheme, + scheme_config, }) => { // Below partition-dropping optimization assumes we are NOT repartitioning using a range partitioning scheme. // A range repartitioning of an existing range-partitioned DataFrame is only redundant if the partition boundaries // are consistent, which is only the case if boundary sampling is deterministic within a query. - assert!(!matches!(scheme, PartitionScheme::Range)); + assert!(!matches!(scheme_config, PartitionSchemeConfig::Range(_))); let input_physical = plan(input, cfg)?; let input_partition_spec = input_physical.partition_spec(); let input_num_partitions = input_partition_spec.num_partitions; let num_partitions = num_partitions.unwrap_or(input_num_partitions); // Partition spec after repartitioning. - let repartitioned_partition_spec = PartitionSpec::new_internal( - scheme.clone(), + let repartitioned_partition_spec = PartitionSpec::new( + scheme_config.clone(), num_partitions, Some(partition_by.clone()), ); // Drop the repartition if the output of the repartition would yield the same partitioning as the input. if (input_num_partitions == 1 && num_partitions == 1) // Simple split/coalesce repartition to the same # of partitions is a no-op, no matter the upstream partitioning scheme. - || (num_partitions == input_num_partitions && matches!(scheme, PartitionScheme::Unknown)) + || (num_partitions == input_num_partitions && matches!(scheme_config, PartitionSchemeConfig::Unknown(_))) // Repartitioning to the same partition spec as the input is always a no-op. || (&repartitioned_partition_spec == input_partition_spec.as_ref()) { return Ok(input_physical); } let input_physical = Arc::new(input_physical); - let repartitioned_plan = match scheme { - PartitionScheme::Unknown => match num_partitions.cmp(&input_num_partitions) { - Ordering::Greater => { - // Split input partitions into num_partitions. - let split_op = PhysicalPlan::Split(Split::new( - input_physical, - input_num_partitions, - num_partitions, - )); - PhysicalPlan::Flatten(Flatten::new(split_op.into())) - } - Ordering::Less => { - // Coalesce input partitions into num_partitions. - PhysicalPlan::Coalesce(Coalesce::new( - input_physical, - input_num_partitions, - num_partitions, - )) - } - Ordering::Equal => { - // # of output partitions == # of input partitions; this should have already short-circuited with - // a repartition drop above. - unreachable!("Simple repartitioning with same # of output partitions as the input; this should have been dropped.") + let repartitioned_plan = match scheme_config { + PartitionSchemeConfig::Unknown(_) => { + match num_partitions.cmp(&input_num_partitions) { + Ordering::Greater => { + // Split input partitions into num_partitions. + let split_op = PhysicalPlan::Split(Split::new( + input_physical, + input_num_partitions, + num_partitions, + )); + PhysicalPlan::Flatten(Flatten::new(split_op.into())) + } + Ordering::Less => { + // Coalesce input partitions into num_partitions. + PhysicalPlan::Coalesce(Coalesce::new( + input_physical, + input_num_partitions, + num_partitions, + )) + } + Ordering::Equal => { + // # of output partitions == # of input partitions; this should have already short-circuited with + // a repartition drop above. + unreachable!("Simple repartitioning with same # of output partitions as the input; this should have been dropped.") + } } - }, - PartitionScheme::Random => { + } + PartitionSchemeConfig::Random(_) => { let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( input_physical, num_partitions, )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) } - PartitionScheme::Hash => { + PartitionSchemeConfig::Hash(_) => { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( input_physical, num_partitions, @@ -264,7 +271,9 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) } - PartitionScheme::Range => unreachable!("Repartitioning by range is not supported"), + PartitionSchemeConfig::Range(_) => { + unreachable!("Repartitioning by range is not supported") + } }; Ok(repartitioned_plan) } @@ -551,13 +560,13 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe let left_pspec = left_physical.partition_spec(); let right_pspec = right_physical.partition_spec(); let num_partitions = max(left_pspec.num_partitions, right_pspec.num_partitions); - let new_left_hash_pspec = Arc::new(PartitionSpec::new_internal( - PartitionScheme::Hash, + let new_left_hash_pspec = Arc::new(PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), num_partitions, Some(left_on.clone()), )); - let new_right_hash_pspec = Arc::new(PartitionSpec::new_internal( - PartitionScheme::Hash, + let new_right_hash_pspec = Arc::new(PartitionSpec::new( + PartitionSchemeConfig::Hash(Default::default()), num_partitions, Some(right_on.clone()), )); @@ -567,26 +576,38 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe // Left-side of join is considered to be sort-partitioned on the join key if it is sort-partitioned on a // sequence of expressions that has the join key as a prefix. - let is_left_sort_partitioned = matches!(left_pspec.scheme, PartitionScheme::Range) - && left_pspec - .by - .as_ref() - .map(|e| { - e.len() >= left_on.len() - && e.iter().zip(left_on.iter()).all(|(e1, e2)| e1 == e2) - }) - .unwrap_or(false); + let is_left_sort_partitioned = + if let PartitionSchemeConfig::Range(range_config) = &left_pspec.scheme_config { + left_pspec + .by + .as_ref() + .map(|e| { + e.len() >= left_on.len() + && e.iter().zip(left_on.iter()).all(|(e1, e2)| e1 == e2) + }) + .unwrap_or(false) + // TODO(Clark): Add support for descending sort orders. + && range_config.descending.iter().all(|v| !*v) + } else { + false + }; // Right-side of join is considered to be sort-partitioned on the join key if it is sort-partitioned on a // sequence of expressions that has the join key as a prefix. - let is_right_sort_partitioned = matches!(right_pspec.scheme, PartitionScheme::Range) - && right_pspec - .by - .as_ref() - .map(|e| { - e.len() >= right_on.len() - && e.iter().zip(right_on.iter()).all(|(e1, e2)| e1 == e2) - }) - .unwrap_or(false); + let is_right_sort_partitioned = + if let PartitionSchemeConfig::Range(range_config) = &right_pspec.scheme_config { + right_pspec + .by + .as_ref() + .map(|e| { + e.len() >= right_on.len() + && e.iter().zip(right_on.iter()).all(|(e1, e2)| e1 == e2) + }) + .unwrap_or(false) + // TODO(Clark): Add support for descending sort orders. + && range_config.descending.iter().all(|v| !*v) + } else { + false + }; // For broadcast joins, ensure that the left side of the join is the smaller side. let (smaller_size_bytes, left_is_larger) = match ( @@ -775,7 +796,7 @@ mod tests { use crate::physical_plan::PhysicalPlan; use crate::planner::plan; - use crate::{test::dummy_scan_node, PartitionScheme}; + use crate::test::dummy_scan_node; /// Tests that planner drops a simple Repartition (e.g. df.into_partitions()) the child already has the desired number of partitions. /// @@ -788,7 +809,7 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(Some(10), vec![], PartitionScheme::Unknown)? + .repartition(Some(10), vec![], Default::default())? .filter(col("a").lt(&lit(2)))?; assert_eq!( plan(builder.build().as_ref(), cfg.clone())? @@ -797,7 +818,7 @@ mod tests { 10 ); let logical_plan = builder - .repartition(Some(10), vec![], PartitionScheme::Unknown)? + .repartition(Some(10), vec![], Default::default())? .build(); let physical_plan = plan(logical_plan.as_ref(), cfg.clone())?; // Check that the last repartition was dropped (the last op should be the filter). @@ -823,7 +844,7 @@ mod tests { 1 ); let logical_plan = builder - .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(1), vec![col("a")], Default::default())? .build(); let physical_plan = plan(logical_plan.as_ref(), cfg.clone())?; assert_matches!(physical_plan, PhysicalPlan::TabularScanJson(_)); @@ -840,9 +861,9 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(10), vec![col("a")], Default::default())? .filter(col("a").lt(&lit(2)))? - .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(10), vec![col("a")], Default::default())? .build(); let physical_plan = plan(logical_plan.as_ref(), cfg)?; // Check that the last repartition was dropped (the last op should be the filter). @@ -860,12 +881,12 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Int64), ]) - .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(10), vec![col("a")], Default::default())? .aggregate( vec![Expr::Agg(AggExpr::Sum(col("a").into()))], vec![col("b")], )? - .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? + .repartition(Some(10), vec![col("b")], Default::default())? .build(); let physical_plan = plan(logical_plan.as_ref(), cfg)?; // Check that the last repartition was dropped (the last op should be a projection for a multi-partition aggregation).