From f3836a53122e86f2b73c25557deaa5b800e488e9 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 6 Mar 2024 08:25:31 -0800 Subject: [PATCH] Reduce casts for LEAD/LAG (#9468) * Reduce casts for LEAD/LAG * uncomment test * fix test * test --- .../physical-expr/src/window/lead_lag.rs | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 1d6cfc6b0418..e496c7343f7c 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -236,20 +236,21 @@ impl PartitionEvaluator for WindowShiftEvaluator { values: &[ArrayRef], range: &Range, ) -> Result { - // TODO: try to get rid of i64 usize conversion // TODO: do not recalculate default value every call - // TODO: support LEAD mode for IGNORE NULLS let array = &values[0]; let dtype = array.data_type(); - let len = array.len() as i64; + let len = array.len(); + // LAG mode - let mut idx = if self.is_lag() { - range.end as i64 - self.shift_offset - 1 + let i = if self.is_lag() { + (range.end as i64 - self.shift_offset - 1) as usize } else { // LEAD mode - range.start as i64 - self.shift_offset + (range.start as i64 - self.shift_offset) as usize }; + let mut idx: Option = if i < len { Some(i) } else { None }; + // LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows // If current row index points to NULL value the row is NOT counted if self.ignore_nulls && self.is_lag() { @@ -257,9 +258,9 @@ impl PartitionEvaluator for WindowShiftEvaluator { // Find the nonNULL row index that shifted by offset comparing to current row index idx = if self.non_null_offsets.len() == self.shift_offset as usize { let total_offset: usize = self.non_null_offsets.iter().sum(); - (range.end - 1 - total_offset) as i64 + Some(range.end - 1 - total_offset) } else { - -1 + None }; // Keep track of offset values between non-null entries @@ -296,7 +297,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { break; } } - } else if range.end < len as usize && array.is_valid(range.end) { + } else if range.end < len && array.is_valid(range.end) { // Update `non_null_offsets` with the new end data. if array.is_valid(range.end) { // When non-null, append a new offset. @@ -312,9 +313,9 @@ impl PartitionEvaluator for WindowShiftEvaluator { idx = if self.non_null_offsets.len() >= non_null_row_count { let total_offset: usize = self.non_null_offsets.iter().take(non_null_row_count).sum(); - (range.start + total_offset) as i64 + Some(range.start + total_offset) } else { - -1 + None }; // Prune `self.non_null_offsets` from the start. so that at next iteration // start of the `self.non_null_offsets` matches with current row. @@ -331,10 +332,12 @@ impl PartitionEvaluator for WindowShiftEvaluator { // - index is out of window bounds // OR // - ignore nulls mode and current value is null and is within window bounds - if idx < 0 || idx >= len || (self.ignore_nulls && array.is_null(idx as usize)) { + // .unwrap() is safe here as there is a none check in front + #[allow(clippy::unnecessary_unwrap)] + if idx.is_none() || (self.ignore_nulls && array.is_null(idx.unwrap())) { get_default_value(self.default_value.as_ref(), dtype) } else { - ScalarValue::try_from_array(array, idx as usize) + ScalarValue::try_from_array(array, idx.unwrap()) } }