Skip to content

Commit

Permalink
Make BroadcastMotion if target plan's parallel < 1 when parallel aware.
Browse files Browse the repository at this point in the history
When there is a parallel ware join, we always make ParallelBroadcastMotion
before to guarantee all data are replicated on segments across parallel
processes, even if target side's parallel workers is 0,

For example, inner(Hashed, workers=0) Join outer(HashedWorkers, workers=2)
when parallel aware.
If we ParallelBroadcastMotion the outer side to join with inner, it will
get outer(ReplicatedWorkers) Join inner(Hashed) = Join Locus(HashedWorkers,
workers=0).
That will cause an Assert Failure in cdbpathlocus_parallel_join().

So, we should use BroadcastMotion instead of ParallelBroadcastMotion if
target side's parallel workers is 0  when parallel aware.
There are no differences between BroadcastMotion and ParallelBroadcastMotion
when execution if target slice's parallel workers is 0.
Another benefit is we could get a better Hashed locus instead of HashedWorkers
for upper join with others directly without Motion.

Authored-by: Zhang Mingli avamingli@gmail.com
  • Loading branch information
avamingli committed Oct 8, 2023
1 parent bce6e30 commit 12dc126
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -3655,6 +3655,8 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
{ /* partitioned */
CdbpathMfjRel *large_rel = &outer;
CdbpathMfjRel *small_rel = &inner;
int lp; /* largre rel parallel workers */
int sp; /* small rel parallel workers */

/* Consider locus when parallel_ware. */
if(parallel_aware)
Expand All @@ -3670,6 +3672,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
if (large_rel->bytes < small_rel->bytes)
CdbSwap(CdbpathMfjRel *, large_rel, small_rel);

lp = CdbPathLocus_NumParallelWorkers(large_rel->locus);
sp = CdbPathLocus_NumParallelWorkers(small_rel->locus);

/* Both side are distribued in 1 segment and no parallel, it can join without motion. */
if (CdbPathLocus_NumSegments(large_rel->locus) == 1 &&
CdbPathLocus_NumSegments(small_rel->locus) == 1 &&
Expand Down Expand Up @@ -3700,7 +3705,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
((!parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(large_rel->locus) < large_rel->bytes)) ||
(parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < large_rel->bytes))))
{
if (!parallel_aware)
if (!parallel_aware || lp <= 1)
CdbPathLocus_MakeReplicated(&small_rel->move_to,
CdbPathLocus_NumSegments(large_rel->locus),
large_rel->path->parallel_workers);
Expand All @@ -3719,7 +3724,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
((!parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(small_rel->locus) < small_rel->bytes)) ||
(parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes))))
{
if (!parallel_aware)
if (!parallel_aware || sp <= 1)
CdbPathLocus_MakeReplicated(&large_rel->move_to,
CdbPathLocus_NumSegments(small_rel->locus),
small_rel->path->parallel_workers);
Expand Down Expand Up @@ -3750,7 +3755,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
((!parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(large_rel->locus) < small_rel->bytes + large_rel->bytes)) ||
(parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < small_rel->bytes + large_rel->bytes))))
{
if (!parallel_aware)
if (!parallel_aware || lp <= 1)
CdbPathLocus_MakeReplicated(&small_rel->move_to,
CdbPathLocus_NumSegments(large_rel->locus),
large_rel->path->parallel_workers);
Expand All @@ -3766,7 +3771,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
((!parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(small_rel->locus) < small_rel->bytes + large_rel->bytes)) ||
(parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes + large_rel->bytes))))
{
if (!parallel_aware)
if (!parallel_aware || sp <= 1)
CdbPathLocus_MakeReplicated(&large_rel->move_to,
CdbPathLocus_NumSegments(small_rel->locus),
small_rel->path->parallel_workers);
Expand Down Expand Up @@ -3808,7 +3813,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
else if (!small_rel->require_existing_order &&
small_rel->ok_to_replicate)
{
if (!parallel_aware)
if (!parallel_aware || lp <= 1)
CdbPathLocus_MakeReplicated(&small_rel->move_to,
CdbPathLocus_NumSegments(large_rel->locus),
large_rel->path->parallel_workers);
Expand All @@ -3821,7 +3826,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
else if (!large_rel->require_existing_order &&
large_rel->ok_to_replicate)
{
if (!parallel_aware)
if (!parallel_aware || sp <= 1)
CdbPathLocus_MakeReplicated(&large_rel->move_to,
CdbPathLocus_NumSegments(small_rel->locus),
small_rel->path->parallel_workers);
Expand Down

0 comments on commit 12dc126

Please sign in to comment.