Skip to content

Commit

Permalink
Add motionhazard to the outer side of parallel aware join.
Browse files Browse the repository at this point in the history
For a parallel aware join, we may partition outer data to
batches if there is not enough memory.And if a worker has
consumed all the data from subnode, it will arrive and wait
for others to begin next phase: PHJ_BUILD_DONE.

PHJ_BUILD_DONE means that we has partitioned every thing
from PHJ_BUILD_HASHING_OUTER phase and are ready to probe.
It's true for Postgres, becuase that if a worker pulls no
data from subnode and it leaves PHJ_BUILD_HASHING_OUTER
phase, all subnode's data must be processed(ex: a Parallel
Seqscan).
As all participants leaves PHJ_BUILD_HASHING_OUTER, the whole
data should be pulled completely by these processes, or there
may be other siblings who have never participated in phase
PHJ_BUILD_HASHING_OUTER at all.

But it's not true for CBDB, a plan node may have Motions behind.
Some parallel workers pull no data from subnode, it doesn't mean
that other workers have no data to process at all.
We need to wait for all parallel workers together instead of
build barrier's participants arrived at the current phase before
we move to the next phase.

Add an outer_motion_barrier to ensure all parallel workers finish
their work of partition outer batches.

Authored-by: Zhang Mingli avamingli@gmail.com
  • Loading branch information
avamingli committed Nov 6, 2023
1 parent f544f1d commit 516c611
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 13 deletions.
13 changes: 13 additions & 0 deletions src/backend/executor/nodeHashjoin.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (parallel)
{
Barrier *build_barrier;
Barrier *outer_motion_barrier = &parallel_state->outer_motion_barrier;

build_barrier = &parallel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
Expand All @@ -427,6 +428,14 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
*/
if (hashtable->nbatch > 1)
ExecParallelHashJoinPartitionOuter(node);
/*
* CBDB_PARALLEL
* If outer side has motion behind, we need to wait for all siblings
* before next phase.
*/
if (((HashJoin *)node->js.ps.plan)->outer_motionhazard)
BarrierArriveAndWait(outer_motion_barrier, WAIT_EVENT_PARALLEL_FINISH);

BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASH_OUTER);
}
Expand Down Expand Up @@ -2065,6 +2074,10 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

BarrierInit(&pstate->sync_barrier, pcxt->nworkers);
BarrierInit(&pstate->batch0_barrier, pcxt->nworkers);

if (((HashJoin *)state->js.ps.plan)->outer_motionhazard)
BarrierInit(&pstate->outer_motion_barrier, pcxt->nworkers);

pstate->phs_lasj_has_null = false;

/* Set up the space we'll use for shared temporary files. */
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ _copyHashJoin(const HashJoin *from)
COPY_NODE_FIELD(hashkeys);
COPY_NODE_FIELD(hashqualclauses);
COPY_SCALAR_FIELD(batch0_barrier);
COPY_SCALAR_FIELD(outer_motionhazard);

return newnode;
}
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ _outHashJoin(StringInfo str, const HashJoin *node)
WRITE_NODE_FIELD(hashkeys);
WRITE_NODE_FIELD(hashqualclauses);
WRITE_BOOL_FIELD(batch0_barrier);
WRITE_BOOL_FIELD(outer_motionhazard);
}

static void
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/readfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,7 @@ _readHashJoin(void)
READ_NODE_FIELD(hashkeys);
READ_NODE_FIELD(hashqualclauses);
READ_BOOL_FIELD(batch0_barrier);
READ_BOOL_FIELD(outer_motionhazard);

READ_DONE();
}
Expand Down
11 changes: 8 additions & 3 deletions src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ static HashJoin *make_hashjoin(List *tlist,
List *hashoperators, List *hashcollations,
List *hashkeys,
Plan *lefttree, Plan *righttree,
JoinType jointype, bool inner_unique, bool batch0_barrier);
JoinType jointype, bool inner_unique, bool batch0_barrier, bool outer_motionhazard);
static Hash *make_hash(Plan *lefttree,
List *hashkeys,
Oid skewTable,
Expand Down Expand Up @@ -5786,6 +5786,7 @@ create_hashjoin_plan(PlannerInfo *root,
bool skewInherit = false;
bool partition_selectors_created = false;
ListCell *lc;
bool outer_motionhazard = false;

/* CBDB_PARALLEL_FIXME:
* PartitionSelector is not parallel-aware, so disable it temporarily.
Expand Down Expand Up @@ -5971,6 +5972,7 @@ create_hashjoin_plan(PlannerInfo *root,
{
hash_plan->plan.parallel_aware = true;
hash_plan->rows_total = best_path->inner_rows_total;
outer_motionhazard = best_path->jpath.outerjoinpath->motionHazard;
}

join_plan = make_hashjoin(tlist,
Expand All @@ -5984,7 +5986,8 @@ create_hashjoin_plan(PlannerInfo *root,
(Plan *) hash_plan,
best_path->jpath.jointype,
best_path->jpath.inner_unique,
best_path->batch0_barrier);
best_path->batch0_barrier,
outer_motionhazard);

/*
* MPP-4635. best_path->jpath.outerjoinpath may be NULL.
Expand Down Expand Up @@ -7160,7 +7163,8 @@ make_hashjoin(List *tlist,
Plan *righttree,
JoinType jointype,
bool inner_unique,
bool batch0_barrier)
bool batch0_barrier,
bool outer_motionhazard)
{
HashJoin *node = makeNode(HashJoin);
Plan *plan = &node->join.plan;
Expand All @@ -7177,6 +7181,7 @@ make_hashjoin(List *tlist,
node->join.inner_unique = inner_unique;
node->join.joinqual = joinclauses;
node->batch0_barrier = batch0_barrier;
node->outer_motionhazard = outer_motionhazard;

return node;
}
Expand Down
1 change: 1 addition & 0 deletions src/include/executor/hashjoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ typedef struct ParallelHashJoinState
Barrier grow_buckets_barrier;
Barrier sync_barrier;
Barrier batch0_barrier;
Barrier outer_motion_barrier;
volatile bool phs_lasj_has_null; /* LASJ has found null value, identify early quit */
pg_atomic_uint32 distributor; /* counter for load balancing */

Expand Down
1 change: 1 addition & 0 deletions src/include/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ typedef struct HashJoin
List *hashkeys;
List *hashqualclauses;
bool batch0_barrier;
bool outer_motionhazard; /* CBDB_PARALLEL: for parallel aware join */
} HashJoin;

#define SHARE_ID_NOT_SHARED (-1)
Expand Down
16 changes: 8 additions & 8 deletions src/test/regress/expected/workfile/hashjoin_spill.out
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ insert into test_hj_spill SELECT i,i,i%1000,i,i,i,i,i from
SET statement_mem=1024;
set gp_resqueue_print_operator_memory_limits=on;
set gp_workfile_compression = on;
select avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
avg
----------------------
499.5000000000000000
select count(i3), avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
count | avg
-------+----------------------
45000 | 499.5000000000000000
(1 row)

select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2');
Expand All @@ -59,10 +59,10 @@ select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SEL
(1 row)

set gp_workfile_compression = off;
select avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
avg
----------------------
499.5000000000000000
select count(i3), avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
count | avg
-------+----------------------
45000 | 499.5000000000000000
(1 row)

select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2');
Expand Down
4 changes: 2 additions & 2 deletions src/test/regress/sql/workfile/hashjoin_spill.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ SET statement_mem=1024;
set gp_resqueue_print_operator_memory_limits=on;

set gp_workfile_compression = on;
select avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
select count(i3), avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2');
select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2 LIMIT 15000;');

set gp_workfile_compression = off;
select avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
select count(i3), avg(i3::numeric) from (SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2) foo;
select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2');
select * from hashjoin_spill.is_workfile_created('explain (analyze, verbose) SELECT t1.* FROM test_hj_spill AS t1 RIGHT JOIN test_hj_spill AS t2 ON t1.i1=t2.i2 LIMIT 15000;');

Expand Down

0 comments on commit 516c611

Please sign in to comment.