diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 03c75e75b8e5..3cc24425435b 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -335,24 +335,22 @@ impl GroupedHashAggregateStream { create_hashes(group_values, &self.random_state, &mut batch_hashes)?; let AggregationState { - map: row_map, - group_states: row_group_states, - .. + map, group_states, .. } = &mut self.aggr_state; for (row, hash) in batch_hashes.into_iter().enumerate() { - let entry = row_map.get_mut(hash, |(_hash, group_idx)| { + let entry = map.get_mut(hash, |(_hash, group_idx)| { // verify that a group that we are inserting with hash is // actually the same key value as the group in // existing_idx (aka group_values @ row) - let group_state = &row_group_states[*group_idx]; + let group_state = &group_states[*group_idx]; group_rows.row(row) == group_state.group_by_values.row() }); match entry { // Existing entry for this group value Some((_hash, group_idx)) => { - let group_state = &mut row_group_states[*group_idx]; + let group_state = &mut group_states[*group_idx]; // 1.3 if group_state.indices.is_empty() { @@ -375,7 +373,7 @@ impl GroupedHashAggregateStream { accumulator_set, indices: vec![row as u32], // 1.3 }; - let group_idx = row_group_states.len(); + let group_idx = group_states.len(); // NOTE: do NOT include the `GroupState` struct size in here because this is captured by // `group_states` (see allocation down below) @@ -395,13 +393,13 @@ impl GroupedHashAggregateStream { .sum::(); // for hasher function, use precomputed hash value - row_map.insert_accounted( + map.insert_accounted( (hash, group_idx), |(hash, _group_index)| *hash, allocated, ); - row_group_states.push_accounted(group_state, allocated); + group_states.push_accounted(group_state, allocated); groups_with_rows.push(group_idx); } @@ -411,6 +409,7 @@ impl GroupedHashAggregateStream { } // Update the accumulator results, according to row_aggr_state. + #[allow(clippy::too_many_arguments)] fn update_accumulators( &mut self, groups_with_rows: &[usize], @@ -668,8 +667,8 @@ impl GroupedHashAggregateStream { for (field_idx, field) in output_fields[start..end].iter().enumerate() { let current = match self.mode { AggregateMode::Partial => ScalarValue::iter_to_array( - group_state_chunk.iter().map(|row_group_state| { - row_group_state.accumulator_set[idx] + group_state_chunk.iter().map(|group_state| { + group_state.accumulator_set[idx] .state() .map(|v| v[field_idx].clone()) .expect("Unexpected accumulator state in hash aggregate") @@ -677,8 +676,8 @@ impl GroupedHashAggregateStream { ), AggregateMode::Final | AggregateMode::FinalPartitioned => { ScalarValue::iter_to_array(group_state_chunk.iter().map( - |row_group_state| { - row_group_state.accumulator_set[idx].evaluate().expect( + |group_state| { + group_state.accumulator_set[idx].evaluate().expect( "Unexpected accumulator state in hash aggregate", ) },