Skip to content

Commit 2b07114

Browse files
committed
Materialize dictionaries in group keys (#7647)
Given that group keys inherently have few repeated values, especially when grouping on a single column, the use of dictionary encoding is unlikely to be yielding significant returns
1 parent 58483fb commit 2b07114

File tree

4 files changed

+39
-38
lines changed

4 files changed

+39
-38
lines changed

datafusion/core/tests/path_partition.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ async fn parquet_distinct_partition_col() -> Result<()> {
168168
assert_eq!(min_limit, resulting_limit);
169169

170170
let s = ScalarValue::try_from_array(results[0].column(1), 0)?;
171-
let month = match extract_as_utf(&s) {
172-
Some(month) => month,
173-
s => panic!("Expected month as Dict(_, Utf8) found {s:?}"),
171+
let month = match s {
172+
ScalarValue::Utf8(Some(month)) => month,
173+
s => panic!("Expected month as Utf8 found {s:?}"),
174174
};
175175

176176
let sql_on_partition_boundary = format!(
@@ -191,15 +191,6 @@ async fn parquet_distinct_partition_col() -> Result<()> {
191191
Ok(())
192192
}
193193

194-
fn extract_as_utf(v: &ScalarValue) -> Option<String> {
195-
if let ScalarValue::Dictionary(_, v) = v {
196-
if let ScalarValue::Utf8(v) = v.as_ref() {
197-
return v.clone();
198-
}
199-
}
200-
None
201-
}
202-
203194
#[tokio::test]
204195
async fn csv_filter_with_file_col() -> Result<()> {
205196
let ctx = SessionContext::new();

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,18 @@
1717

1818
use crate::aggregates::group_values::GroupValues;
1919
use ahash::RandomState;
20-
use arrow::compute::cast;
2120
use arrow::record_batch::RecordBatch;
2221
use arrow::row::{RowConverter, Rows, SortField};
23-
use arrow_array::{Array, ArrayRef};
24-
use arrow_schema::{DataType, SchemaRef};
22+
use arrow_array::ArrayRef;
23+
use arrow_schema::SchemaRef;
2524
use datafusion_common::hash_utils::create_hashes;
26-
use datafusion_common::{DataFusionError, Result};
25+
use datafusion_common::Result;
2726
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
2827
use datafusion_physical_expr::EmitTo;
2928
use hashbrown::raw::RawTable;
3029

3130
/// A [`GroupValues`] making use of [`Rows`]
3231
pub struct GroupValuesRows {
33-
/// The output schema
34-
schema: SchemaRef,
35-
3632
/// Converter for the group values
3733
row_converter: RowConverter,
3834

@@ -79,7 +75,6 @@ impl GroupValuesRows {
7975
let map = RawTable::with_capacity(0);
8076

8177
Ok(Self {
82-
schema,
8378
row_converter,
8479
map,
8580
map_size: 0,
@@ -170,7 +165,7 @@ impl GroupValues for GroupValuesRows {
170165
.take()
171166
.expect("Can not emit from empty rows");
172167

173-
let mut output = match emit_to {
168+
let output = match emit_to {
174169
EmitTo::All => {
175170
let output = self.row_converter.convert_rows(&group_values)?;
176171
group_values.clear();
@@ -203,20 +198,6 @@ impl GroupValues for GroupValuesRows {
203198
}
204199
};
205200

206-
// TODO: Materialize dictionaries in group keys (#7647)
207-
for (field, array) in self.schema.fields.iter().zip(&mut output) {
208-
let expected = field.data_type();
209-
if let DataType::Dictionary(_, v) = expected {
210-
let actual = array.data_type();
211-
if v.as_ref() != actual {
212-
return Err(DataFusionError::Internal(format!(
213-
"Converted group rows expected dictionary of {v} got {actual}"
214-
)));
215-
}
216-
*array = cast(array.as_ref(), expected)?;
217-
}
218-
}
219-
220201
self.group_values = Some(group_values);
221202
Ok(output)
222203
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::{
3838
use arrow::array::ArrayRef;
3939
use arrow::datatypes::{Field, Schema, SchemaRef};
4040
use arrow::record_batch::RecordBatch;
41+
use arrow_schema::DataType;
4142
use datafusion_common::stats::Precision;
4243
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
4344
use datafusion_execution::TaskContext;
@@ -286,6 +287,9 @@ pub struct AggregateExec {
286287
limit: Option<usize>,
287288
/// Input plan, could be a partial aggregate or the input to the aggregate
288289
pub input: Arc<dyn ExecutionPlan>,
290+
/// Original aggregation schema, could be different from `schema` before dictionary group
291+
/// keys get materialized
292+
original_schema: SchemaRef,
289293
/// Schema after the aggregate is applied
290294
schema: SchemaRef,
291295
/// Input schema before any aggregation is applied. For partial aggregate this will be the
@@ -469,15 +473,19 @@ impl AggregateExec {
469473
input: Arc<dyn ExecutionPlan>,
470474
input_schema: SchemaRef,
471475
) -> Result<Self> {
472-
let schema = create_schema(
476+
let original_schema = create_schema(
473477
&input.schema(),
474478
&group_by.expr,
475479
&aggr_expr,
476480
group_by.contains_null(),
477481
mode,
478482
)?;
479483

480-
let schema = Arc::new(schema);
484+
let schema = Arc::new(materialize_dict_group_keys(
485+
&original_schema,
486+
group_by.expr.len(),
487+
));
488+
let original_schema = Arc::new(original_schema);
481489
// Reset ordering requirement to `None` if aggregator is not order-sensitive
482490
order_by_expr = aggr_expr
483491
.iter()
@@ -552,6 +560,7 @@ impl AggregateExec {
552560
filter_expr,
553561
order_by_expr,
554562
input,
563+
original_schema,
555564
schema,
556565
input_schema,
557566
projection_mapping,
@@ -973,6 +982,24 @@ fn create_schema(
973982
Ok(Schema::new(fields))
974983
}
975984

985+
/// returns schema with dictionary group keys materialized as their value types
986+
/// The actual convertion happens in `RowConverter` and we don't do unnecessary
987+
/// conversion back into dictionaries
988+
fn materialize_dict_group_keys(schema: &Schema, group_count: usize) -> Schema {
989+
let fields = schema
990+
.fields
991+
.iter()
992+
.enumerate()
993+
.map(|(i, field)| match field.data_type() {
994+
DataType::Dictionary(_, value_data_type) if i < group_count => {
995+
Field::new(field.name(), *value_data_type.clone(), field.is_nullable())
996+
}
997+
_ => Field::clone(field),
998+
})
999+
.collect::<Vec<_>>();
1000+
Schema::new(fields)
1001+
}
1002+
9761003
fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
9771004
let group_fields = schema.fields()[0..group_count].to_vec();
9781005
Arc::new(Schema::new(group_fields))

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,9 @@ impl GroupedHashAggregateStream {
324324
.map(create_group_accumulator)
325325
.collect::<Result<_>>()?;
326326

327-
let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
327+
// we need to use original schema so RowConverter in group_values below
328+
// will do the proper coversion of dictionaries into value types
329+
let group_schema = group_schema(&agg.original_schema, agg_group_by.expr.len());
328330
let spill_expr = group_schema
329331
.fields
330332
.into_iter()

0 commit comments

Comments
 (0)