Skip to content

Commit

Permalink
Enable Parallel Hash Left Anti Semi (Not-In) Join(parallel-oblivious).
Browse files Browse the repository at this point in the history
This is a parallel-oblivious parallel hash join, that each inner side
table would be duplicately processed without a shared hash table.

We could benefit from parallel if the outer table is large and inner
table is relatively small.

See [1] below for example DDL and DML.

Non-parallel plan:
explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
                                                       QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=1808.872..1808.875 rows=1 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=1745.235..1808.858 rows=3 loops=1)
         ->  Partial Aggregate (actual time=1808.622..1808.625 rows=1 loops=1)
               ->  Hash Left Anti Semi (Not-In) Join (actual time=2.890..1583.005 rows=1667434 loops=1)
                     Hash Cond: (t1.c1 = t2.c1)
                     Extra Text: (seg2)   Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets.
                     ->  Seq Scan on t1 (actual time=0.355..678.531 rows=1667832 loops=1)
                     ->  Hash (actual time=2.068..2.069 rows=1200 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 4139kB
                           ->  Broadcast Motion 3:3  (slice2; segments: 3) (actual time=1.476..1.772 rows=1200 loops=1)
                                 ->  Seq Scan on t2 (actual time=0.356..0.499 rows=407 loops=1)
 Planning Time: 0.454 ms
   (slice0)    Executor memory: 124K bytes.
   (slice1)    Executor memory: 4443K bytes avg x 3x(0) workers, 4443K bytes max (seg0).  Work_mem: 4139K bytes max.
   (slice2)    Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0).
 Memory used:  128000kB
 Optimizer: Postgres query optimizer
 Execution Time: 1809.517 ms
(18 rows)
Time: 1810.827 ms (00:01.811)

Parallel plan:
explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
                                                       QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=758.707..758.710 rows=1 loops=1)
   ->  Gather Motion 6:1  (slice1; segments: 6) (actual time=668.747..758.685 rows=6 loops=1)
         ->  Partial Aggregate (actual time=752.479..752.483 rows=1 loops=1)
               ->  Hash Left Anti Semi (Not-In) Join (actual time=3.010..565.127 rows=833732 loops=1)
                     Hash Cond: (t1.c1 = t2.c1)
                     Extra Text: (seg2)   Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets.
                     ->  Parallel Seq Scan on t1 (actual time=0.368..231.049 rows=833932 loops=1)
                     ->  Hash (actual time=2.148..2.149 rows=1200 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 4139kB
                           ->  Broadcast Motion 3:6  (slice2; segments: 3) (actual time=0.203..1.779 rows=1200 loops=1)
                                 ->  Seq Scan on t2 (actual time=0.361..0.499 rows=407 loops=1)
 Planning Time: 0.470 ms
   (slice0)    Executor memory: 124K bytes.
   (slice1)    Executor memory: 4483K bytes avg x 6x(0) workers, 4483K bytes max (seg0).  Work_mem: 4139K bytes max.
   (slice2)    Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0).
 Memory used:  128000kB
 Optimizer: Postgres query optimizer
 Execution Time: 759.440 ms
(18 rows)
Time: 760.874 ms

[1] Example:
create table t1(c1 int, c2 int) using ao_row distributed by (c1);
create table t2(c1 int, c2 int) using ao_row distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
analyze t1;
analyze t2;

Authored-by: Zhang Mingli avamingli@gmail.com
  • Loading branch information
avamingli committed Aug 9, 2023
1 parent 4a9c401 commit a5eca8b
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/backend/optimizer/path/joinpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -2299,7 +2299,6 @@ hash_inner_and_outer(PlannerInfo *root,
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_RIGHT &&
save_jointype != JOIN_LASJ_NOTIN &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
outerrel->partial_pathlist != NIL &&
Expand All @@ -2319,6 +2318,7 @@ hash_inner_and_outer(PlannerInfo *root,
*/
if (innerrel->partial_pathlist != NIL &&
save_jointype != JOIN_UNIQUE_INNER &&
save_jointype != JOIN_LASJ_NOTIN &&
enable_parallel_hash)
{
cheapest_partial_inner =
Expand Down
75 changes: 75 additions & 0 deletions src/test/regress/expected/gp_parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,81 @@ select * from t1 order by c2 asc limit 3 offset 5;

abort;
--
-- Test Parallel Hash Left Anti Semi (Not-In) Join(parallel-oblivious).
--
create table t1(c1 int, c2 int) using ao_row distributed by (c1);
create table t2(c1 int, c2 int) using ao_row distributed by (c1);
create table t3_null(c1 int, c2 int) using ao_row distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
insert into t3_null select i+1, i from generate_series(1, 1200) i;
insert into t3_null values(NULL, NULL);
analyze t1;
analyze t2;
analyze t3_null;
explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
QUERY PLAN
---------------------------------------------------------------------------
Finalize Aggregate
-> Gather Motion 6:1 (slice1; segments: 6)
-> Partial Aggregate
-> Hash Left Anti Semi (Not-In) Join
Hash Cond: (t1.c1 = t2.c1)
-> Parallel Seq Scan on t1
-> Hash
-> Broadcast Motion 3:6 (slice2; segments: 3)
-> Seq Scan on t2
Optimizer: Postgres query optimizer
(10 rows)

select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
sum
----------------
12500001778200
(1 row)

explain(costs off) select * from t1 where c1 not in (select c1 from t3_null);
QUERY PLAN
---------------------------------------------------------------
Gather Motion 6:1 (slice1; segments: 6)
-> Hash Left Anti Semi (Not-In) Join
Hash Cond: (t1.c1 = t3_null.c1)
-> Parallel Seq Scan on t1
-> Hash
-> Broadcast Motion 3:6 (slice2; segments: 3)
-> Seq Scan on t3_null
Optimizer: Postgres query optimizer
(8 rows)

select * from t1 where c1 not in (select c1 from t3_null);
c1 | c2
----+----
(0 rows)

-- non-parallel results.
set enable_parallel = off;
select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
sum
----------------
12500001778200
(1 row)

select * from t1 where c1 not in (select c1 from t3_null);
c1 | c2
----+----
(0 rows)

drop table t1;
drop table t2;
drop table t3_null;
--
-- End of Test Parallel Hash Left Anti Semi (Not-In) Join.
--
--
-- Test alter ao/aocs table parallel_workers options
--
begin;
Expand Down
33 changes: 33 additions & 0 deletions src/test/regress/sql/gp_parallel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,39 @@ set local enable_parallel = off;
explain(costs off, locus) select * from t1 order by c2 asc limit 3 offset 5;
select * from t1 order by c2 asc limit 3 offset 5;
abort;

--
-- Test Parallel Hash Left Anti Semi (Not-In) Join(parallel-oblivious).
--
create table t1(c1 int, c2 int) using ao_row distributed by (c1);
create table t2(c1 int, c2 int) using ao_row distributed by (c1);
create table t3_null(c1 int, c2 int) using ao_row distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
insert into t3_null select i+1, i from generate_series(1, 1200) i;
insert into t3_null values(NULL, NULL);
analyze t1;
analyze t2;
analyze t3_null;
explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
explain(costs off) select * from t1 where c1 not in (select c1 from t3_null);
select * from t1 where c1 not in (select c1 from t3_null);
-- non-parallel results.
set enable_parallel = off;
select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
select * from t1 where c1 not in (select c1 from t3_null);
drop table t1;
drop table t2;
drop table t3_null;
--
-- End of Test Parallel Hash Left Anti Semi (Not-In) Join.
--

--
-- Test alter ao/aocs table parallel_workers options
--
Expand Down

0 comments on commit a5eca8b

Please sign in to comment.