Skip to content

Avoid re-computing hashes in HashJoin and GroupByHash #17625

@adriangb

Description

@adriangb

Currently these operators compute the hashes of the keys at least twice, once in RepartitionExec and once in HashJoinExec:

CREATE EXTERNAL TABLE customer STORED AS PARQUET LOCATION 'benchmarks/data/tpch_sf10/customer/';
CREATE EXTERNAL TABLE orders STORED AS PARQUET LOCATION 'benchmarks/data/tpch_sf10/orders/';

EXPLAIN
SELECT *
FROM customer
JOIN orders on c_custkey = o_custkey
WHERE c_phone = '25-989-741-2988';

-- +---------------+------------------------------------------------------------+
-- | plan_type     | plan                                                       |
-- +---------------+------------------------------------------------------------+
-- | physical_plan | ┌───────────────────────────┐                              |
-- |               | │    CoalesceBatchesExec    │                              |
-- |               | │    --------------------   │                              |
-- |               | │     target_batch_size:    │                              |
-- |               | │            8192           │                              |
-- |               | └─────────────┬─────────────┘                              |
-- |               | ┌─────────────┴─────────────┐                              |
-- |               | │        HashJoinExec       │                              |
-- |               | │    --------------------   │                              |
-- |               | │            on:            ├──────────────┐               |
-- |               | │  (c_custkey = o_custkey)  │              │               |
-- |               | └─────────────┬─────────────┘              │               |
-- |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
-- |               | │    CoalesceBatchesExec    ││    CoalesceBatchesExec    │ |
-- |               | │    --------------------   ││    --------------------   │ |
-- |               | │     target_batch_size:    ││     target_batch_size:    │ |
-- |               | │            8192           ││            8192           │ |
-- |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
-- |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
-- |               | │      RepartitionExec      ││      RepartitionExec      │ |
-- |               | │    --------------------   ││    --------------------   │ |
-- |               | │ partition_count(in->out): ││ partition_count(in->out): │ |
-- |               | │          12 -> 12         ││          12 -> 12         │ |
-- |               | │                           ││                           │ |
-- |               | │    partitioning_scheme:   ││    partitioning_scheme:   │ |
-- |               | │  Hash([c_custkey@0], 12)  ││  Hash([o_custkey@1], 12)  │ |
-- |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
-- |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
-- |               | │    CoalesceBatchesExec    ││       DataSourceExec      │ |
-- |               | │    --------------------   ││    --------------------   │ |
-- |               | │     target_batch_size:    ││         files: 23         │ |
-- |               | │            8192           ││      format: parquet      │ |
-- |               | │                           ││      predicate: true      │ |
-- |               | └─────────────┬─────────────┘└───────────────────────────┘ |
-- |               | ┌─────────────┴─────────────┐                              |
-- |               | │         FilterExec        │                              |
-- |               | │    --------------------   │                              |
-- |               | │         predicate:        │                              |
-- |               | │ c_phone = 25-989-741-2988 │                              |
-- |               | └─────────────┬─────────────┘                              |
-- |               | ┌─────────────┴─────────────┐                              |
-- |               | │       DataSourceExec      │                              |
-- |               | │    --------------------   │                              |
-- |               | │         files: 23         │                              |
-- |               | │      format: parquet      │                              |
-- |               | │                           │                              |
-- |               | │         predicate:        │                              |
-- |               | │ c_phone = 25-989-741-2988 │                              |
-- |               | └───────────────────────────┘                              |
-- |               |                                                            |
-- +---------------+------------------------------------------------------------+
-- 1 row(s) fetched. 
-- Elapsed 0.019 seconds.

The proposal was to edit the plan to be something like:

|               | ┌─────────────┴─────────────┐                              |
|               | │        HashJoinExec       │                              |
|               | │    --------------------   │                              |
|               | │            on:            ├──────────────┐               |
|               | │  (c_custkey = o_custkey)  │              │               |
|               | └─────────────┬─────────────┘              │               |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │    CoalesceBatchesExec    ││    CoalesceBatchesExec    │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │     target_batch_size:    ││     target_batch_size:    │ |
|               | │            8192           ││            8192           │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │      RepartitionExec      ││      RepartitionExec      │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │ partition_count(in->out): ││ partition_count(in->out): │ |
|               | │          12 -> 12         ││          12 -> 12         │ |
|               | │                           ││                           │ |
|               | │    partitioning_scheme:   ││    partitioning_scheme:   │ |
|               | │  List(o_custkey, 12)      ││  List(c_custkey, 12)      │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
|               | ┌───────────────────────────┐┌───────────────────────────┐ |
|               | │       ProjectionExec      ││       ProjectionExec      │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │       expr:               ││       expr:               │ |
|               | │  *, hash_part(o_custkey)  ││  *, hash_part(o_custkey)  │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |

The point is that an optimizer rule injects an extra column w/ the partition calculation as a projection. The RepartitionExec then uses this column for repartitioning and the HashJoinExec uses it to look up the build side hash table to use, and removes it / drops it before emitting batches to its parent node.

Since we push projections down we may even be able to push this projection down all the way into the scan, which ties in with #17599.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions