Skip to content

Commit

Permalink
Minor changes (#9674)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo authored Mar 18, 2024
1 parent c0a21b2 commit 35ff7a6
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,31 +225,38 @@ impl PartitionEvaluator for NthValueEvaluator {
}

// Extract valid indices if ignoring nulls.
let (slice, valid_indices) = if self.ignore_nulls {
let valid_indices = if self.ignore_nulls {
// Calculate valid indices, inside the window frame boundaries
let slice = arr.slice(range.start, n_range);
let valid_indices =
slice.nulls().unwrap().valid_indices().collect::<Vec<_>>();
let valid_indices = slice
.nulls()
.map(|nulls| {
nulls
.valid_indices()
// Add offset `range.start` to valid indices, to point correct index in the original arr.
.map(|idx| idx + range.start)
.collect::<Vec<_>>()
})
.unwrap_or_default();
if valid_indices.is_empty() {
return ScalarValue::try_from(arr.data_type());
}
(Some(slice), Some(valid_indices))
Some(valid_indices)
} else {
(None, None)
None
};
match self.state.kind {
NthValueKind::First => {
if let Some(slice) = &slice {
let valid_indices = valid_indices.unwrap();
ScalarValue::try_from_array(slice, valid_indices[0])
if let Some(valid_indices) = &valid_indices {
ScalarValue::try_from_array(arr, valid_indices[0])
} else {
ScalarValue::try_from_array(arr, range.start)
}
}
NthValueKind::Last => {
if let Some(slice) = &slice {
let valid_indices = valid_indices.unwrap();
if let Some(valid_indices) = &valid_indices {
ScalarValue::try_from_array(
slice,
arr,
valid_indices[valid_indices.len() - 1],
)
} else {
Expand All @@ -264,15 +271,11 @@ impl PartitionEvaluator for NthValueEvaluator {
if index >= n_range {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else if self.ignore_nulls {
let valid_indices = valid_indices.unwrap();
} else if let Some(valid_indices) = valid_indices {
if index >= valid_indices.len() {
return ScalarValue::try_from(arr.data_type());
}
ScalarValue::try_from_array(
&slice.unwrap(),
valid_indices[index],
)
ScalarValue::try_from_array(&arr, valid_indices[index])
} else {
ScalarValue::try_from_array(arr, range.start + index)
}
Expand All @@ -282,14 +285,13 @@ impl PartitionEvaluator for NthValueEvaluator {
if n_range < reverse_index {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else if self.ignore_nulls {
let valid_indices = valid_indices.unwrap();
} else if let Some(valid_indices) = valid_indices {
if reverse_index > valid_indices.len() {
return ScalarValue::try_from(arr.data_type());
}
let new_index =
valid_indices[valid_indices.len() - reverse_index];
ScalarValue::try_from_array(&slice.unwrap(), new_index)
ScalarValue::try_from_array(&arr, new_index)
} else {
ScalarValue::try_from_array(
arr,
Expand Down

0 comments on commit 35ff7a6

Please sign in to comment.