Skip to content

Commit

Permalink
refactor: Use Column for the {try,}_apply_columns{_par,} function…
Browse files Browse the repository at this point in the history
…s on `DataFrame` (#19683)
  • Loading branch information
coastalwhite authored Nov 7, 2024
1 parent 8335f75 commit 3cdb7c2
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 71 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/ops/fill_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ fn fill_with_gather<F: Fn(&Bitmap) -> Vec<IdxSize>>(

let idx = bits_to_idx(validity);

Ok(unsafe { s.take_unchecked_from_slice(&idx) })
Ok(unsafe { s.take_slice_unchecked(&idx) })
}

fn fill_forward_gather(s: &Series) -> PolarsResult<Series> {
Expand Down
49 changes: 41 additions & 8 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,38 @@ impl Column {
match self {
Self::Series(s) => unsafe { s.take_unchecked(indices) }.into(),
Self::Partitioned(s) => {
unsafe { s.as_materialized_series().take_unchecked(indices) }.into()
let s = s.as_materialized_series();
unsafe { s.take_unchecked(indices) }.into()
},
Self::Scalar(s) => {
let idxs_length = indices.len();
let idxs_null_count = indices.null_count();

let scalar = ScalarColumn::from_single_value_series(
s.as_single_value_series().take_unchecked(&IdxCa::new(
indices.name().clone(),
&[0][..s.len().min(1)],
)),
idxs_length,
);

// We need to make sure that null values in `idx` become null values in the result
if idxs_null_count == 0 {
scalar.into_column()
} else if idxs_null_count == idxs_length {
scalar.into_nulls().into_column()
} else {
let validity = indices.rechunk_validity();
let series = scalar.take_materialized_series();
let name = series.name().clone();
let dtype = series.dtype().clone();
let mut chunks = series.into_chunks();
assert_eq!(chunks.len(), 1);
chunks[0] = chunks[0].with_validity(validity);
unsafe { Series::from_chunks_and_dtype_unchecked(name, chunks, &dtype) }
.into_column()
}
},
Self::Scalar(s) => s.resize(indices.len()).into(),
}
}
/// # Safety
Expand All @@ -543,13 +572,17 @@ impl Column {
debug_assert!(check_bounds(indices, self.len() as IdxSize).is_ok());

match self {
Self::Series(s) => unsafe { s.take_unchecked_from_slice(indices) }.into(),
Self::Partitioned(s) => unsafe {
s.as_materialized_series()
.take_unchecked_from_slice(indices)
}
Self::Series(s) => unsafe { s.take_slice_unchecked(indices) }.into(),
Self::Partitioned(s) => {
let s = s.as_materialized_series();
unsafe { s.take_slice_unchecked(indices) }.into()
},
Self::Scalar(s) => ScalarColumn::from_single_value_series(
s.as_single_value_series()
.take_slice_unchecked(&[0][..s.len().min(1)]),
indices.len(),
)
.into(),
Self::Scalar(s) => s.resize(indices.len()).into(),
}
}

Expand Down
7 changes: 6 additions & 1 deletion crates/polars-core/src/frame/column/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl ScalarColumn {
/// This will panic if the value cannot be made static or if the series has length `0`.
pub fn from_single_value_series(series: Series, length: usize) -> Self {
debug_assert!(series.len() <= 1);
debug_assert!(length > 0 || series.is_empty());
debug_assert!(!series.is_empty() || length == 0);

let value = series.get(0).map_or(AnyValue::Null, |av| av.into_static());
let value = Scalar::new(series.dtype().clone(), value);
Expand Down Expand Up @@ -279,6 +279,11 @@ impl ScalarColumn {
self.clone()
}
}

pub fn into_nulls(mut self) -> Self {
self.scalar.update(AnyValue::Null);
self
}
}

impl IntoColumn for ScalarColumn {
Expand Down
66 changes: 17 additions & 49 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,48 +206,29 @@ impl DataFrame {
}

// Reduce monomorphization.
pub fn _apply_columns(&self, func: &(dyn Fn(&Series) -> Series)) -> Vec<Column> {
self.materialized_column_iter()
.map(func)
.map(Column::from)
.collect()
fn try_apply_columns(
&self,
func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
) -> PolarsResult<Vec<Column>> {
self.columns.iter().map(func).collect()
}

// Reduce monomorphization.
pub fn _apply_columns_par(
&self,
func: &(dyn Fn(&Series) -> Series + Send + Sync),
) -> Vec<Column> {
POOL.install(|| {
self.par_materialized_column_iter()
.map(func)
.map(Column::from)
.collect()
})
pub fn _apply_columns(&self, func: &(dyn Fn(&Column) -> Column)) -> Vec<Column> {
self.columns.iter().map(func).collect()
}

// Reduce monomorphization.
fn try_apply_columns_par(
&self,
func: &(dyn Fn(&Series) -> PolarsResult<Series> + Send + Sync),
func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
) -> PolarsResult<Vec<Column>> {
POOL.install(|| {
self.par_materialized_column_iter()
.map(func)
.map(|s| s.map(Column::from))
.collect()
})
POOL.install(|| self.columns.par_iter().map(func).collect())
}

// Reduce monomorphization.
fn try_apply_columns(
pub fn _apply_columns_par(
&self,
func: &(dyn Fn(&Series) -> PolarsResult<Series> + Send + Sync),
) -> PolarsResult<Vec<Column>> {
self.materialized_column_iter()
.map(func)
.map(|s| s.map(Column::from))
.collect()
func: &(dyn Fn(&Column) -> Column + Send + Sync),
) -> Vec<Column> {
POOL.install(|| self.columns.par_iter().map(func).collect())
}

/// Get the index of the column.
Expand Down Expand Up @@ -565,13 +546,7 @@ impl DataFrame {
/// Aggregate all the chunks in the DataFrame to a single chunk in parallel.
/// This may lead to more peak memory consumption.
pub fn as_single_chunk_par(&mut self) -> &mut Self {
if self.columns.iter().any(|c| {
if let Column::Series(s) = c {
s.n_chunks() > 1
} else {
false
}
}) {
if self.columns.iter().any(|c| c.n_chunks() > 1) {
self.columns = self._apply_columns_par(&|s| s.rechunk());
}
self
Expand Down Expand Up @@ -1896,12 +1871,9 @@ impl DataFrame {
/// The indices must be in-bounds.
pub unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
let cols = if allow_threads {
POOL.install(|| self._apply_columns_par(&|s| s.take_unchecked(idx)))
POOL.install(|| self._apply_columns_par(&|c| c.take_unchecked(idx)))
} else {
self.materialized_column_iter()
.map(|s| s.take_unchecked(idx))
.map(Column::from)
.collect()
self._apply_columns(&|s| s.take_unchecked(idx))
};
unsafe { DataFrame::new_no_checks(idx.len(), cols) }
}
Expand All @@ -1914,10 +1886,7 @@ impl DataFrame {
let cols = if allow_threads {
POOL.install(|| self._apply_columns_par(&|s| s.take_slice_unchecked(idx)))
} else {
self.materialized_column_iter()
.map(|s| s.take_slice_unchecked(idx))
.map(Column::from)
.collect()
self._apply_columns(&|s| s.take_slice_unchecked(idx))
};
unsafe { DataFrame::new_no_checks(idx.len(), cols) }
}
Expand Down Expand Up @@ -2567,7 +2536,6 @@ impl DataFrame {
if offset == 0 && length == self.height() {
return self.clone();
}
// @scalar-opt
let columns = self._apply_columns_par(&|s| s.slice(offset, length));
unsafe { DataFrame::new_no_checks(length, columns) }
}
Expand Down
8 changes: 0 additions & 8 deletions crates/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,14 +723,6 @@ impl Series {
}
}

/// Take by index if ChunkedArray contains a single chunk.
///
/// # Safety
/// This doesn't check any bounds. Null validity is checked.
pub unsafe fn take_unchecked_from_slice(&self, idx: &[IdxSize]) -> Series {
self.take_slice_unchecked(idx)
}

/// Traverse and collect every nth element in a new array.
pub fn gather_every(&self, n: usize, offset: usize) -> Series {
let idx = ((offset as IdxSize)..self.len() as IdxSize)
Expand Down
16 changes: 12 additions & 4 deletions crates/polars-core/src/series/series_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,19 +287,27 @@ pub trait SeriesTrait:
/// Filter by boolean mask. This operation clones data.
fn filter(&self, _filter: &BooleanChunked) -> PolarsResult<Series>;

/// Take by index. This operation is clone.
/// Take from `self` at the indexes given by `idx`.
///
/// Null values in `idx` because null values in the output array.
///
/// This operation is clone.
fn take(&self, _indices: &IdxCa) -> PolarsResult<Series>;

/// Take by index.
/// Take from `self` at the indexes given by `idx`.
///
/// Null values in `idx` because null values in the output array.
///
/// # Safety
/// This doesn't check any bounds.
unsafe fn take_unchecked(&self, _idx: &IdxCa) -> Series;

/// Take by index. This operation is clone.
/// Take from `self` at the indexes given by `idx`.
///
/// This operation is clone.
fn take_slice(&self, _indices: &[IdxSize]) -> PolarsResult<Series>;

/// Take by index.
/// Take from `self` at the indexes given by `idx`.
///
/// # Safety
/// This doesn't check any bounds.
Expand Down
16 changes: 16 additions & 0 deletions crates/polars-ops/src/chunked_array/gather/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ fn prepare_series(s: &Series) -> Cow<Series> {
phys
}

impl TakeChunked for Column {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
// @scalar-opt
let s = self.as_materialized_series();
let s = unsafe { s.take_chunked_unchecked(by, sorted) };
s.into_column()
}

unsafe fn take_opt_chunked_unchecked(&self, by: &[ChunkId]) -> Self {
// @scalar-opt
let s = self.as_materialized_series();
let s = unsafe { s.take_opt_chunked_unchecked(by) };
s.into_column()
}
}

impl TakeChunked for Series {
unsafe fn take_chunked_unchecked(&self, by: &[ChunkId], sorted: IsSorted) -> Self {
let phys = prepare_series(self);
Expand Down

0 comments on commit 3cdb7c2

Please sign in to comment.