Skip to content

Commit

Permalink
Add support for co-distributed semi joins
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Apr 27, 2016
1 parent 022aceb commit 8c35026
Showing 1 changed file with 63 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -925,31 +925,74 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, Context context)
filteringSource = node.getFilteringSource().accept(this, context.withPreferredProperties(PreferredProperties.any()));

// force partitioning if source isn't already partitioned on sourceSymbols
if (!source.getProperties().isNodePartitionedOn(FIXED_HASH_DISTRIBUTION, sourceSymbols)) {
if (!source.getProperties().isNodePartitionedOn(sourceSymbols) || (distributedJoins && source.getProperties().isSingleNode())) {
source = withDerivedProperties(
partitionedExchange(idAllocator.getNextId(), REMOTE, source.getNode(), sourceSymbols, node.getSourceHashSymbol()),
source.getProperties());
}

// The following statements would normally be written as: if (condition) { filteringSource = ...; }
// However, the if-condition will always evaluate to true in this case because no externally-visible node produces partition with null replicate.
// As a result, it is written as checkState instead.
PartitionFunctionBinding partitionFunction = new PartitionFunctionBinding(
FIXED_HASH_DISTRIBUTION,
filteringSource.getNode().getOutputSymbols(),
filteringSourceSymbols.stream()
.map(PartitionFunctionArgumentBinding::new)
.collect(toImmutableList()),
node.getFilteringSourceHashSymbol(),
true,
Optional.empty());
filteringSource = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
REMOTE,
filteringSource.getNode(),
partitionFunction),
filteringSource.getProperties());
// translate the partition arguments on the left symbols to the right symbols
SetMultimap<Symbol, Symbol> sourceToFilteringSource = HashMultimap.create();
for (int i = 0; i < sourceSymbols.size(); i++) {
sourceToFilteringSource.put(sourceSymbols.get(i), filteringSourceSymbols.get(i));
}

// Currently, a connector provided partitioning can not be co-distributed as
// this would result in plan fragments with multiple table sources, and a
// plan with multiple table sources can not be executed.
boolean customConnectorPartitioning = !(source.getNode() instanceof ExchangeNode) &&
source.getProperties().getNodePartitioningHandle().flatMap(PartitioningHandle::getConnectorId).isPresent();

if (customConnectorPartitioning || !source.getProperties().isNodePartitionedWith(filteringSource.getProperties(), sourceToFilteringSource::get)) {
// The following statements would normally be written as: if (condition) { filteringSource = ...; }
// However, the if-condition will always evaluate to true in this case because no externally-visible node produces partition with null replicate.
// As a result, it is written as checkState instead.
Function<Symbol, Optional<Symbol>> leftToRightTranslator = sourceSymbol -> sourceToFilteringSource.get(sourceSymbol).stream().findAny();
Optional<List<PartitionFunctionArgumentBinding>> filteringSourcePartitionColumns = source.getProperties()
.translate(leftToRightTranslator)
.getNodePartitioningColumns();

verify(filteringSourcePartitionColumns.isPresent(), "Could not translate SEMI JOIN source partitioning to filtering source symbols");
verify(filteringSourcePartitionColumns.get().size() == 1, "size of partitionFunctionArguments is not 1 when nullPartition is REPLICATE.");
PartitionFunctionArgumentBinding functionArgumentBinding = filteringSourcePartitionColumns.get().get(0);

if (functionArgumentBinding.isVariable()) {
filteringSource = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
REMOTE,
filteringSource.getNode(),
new PartitionFunctionBinding(
source.getProperties().getNodePartitioningHandle().get(),
node.getFilteringSource().getOutputSymbols(),
filteringSourcePartitionColumns.get(),
Optional.empty(),
true,
Optional.empty())),
filteringSource.getProperties());
}
else if (!functionArgumentBinding.getConstant().isNull()) {
// non null constant - do not enable replicate nulls
// todo replace semi join with a projection or at least replace filter with a values node
filteringSource = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
REMOTE,
filteringSource.getNode(),
new PartitionFunctionBinding(
source.getProperties().getNodePartitioningHandle().get(),
node.getFilteringSource().getOutputSymbols(),
filteringSourcePartitionColumns.get())),
filteringSource.getProperties());
}
else {
// all values are null - broadcast the nulls
// todo replace semi join with a projection or at least replace filter with a values node
filteringSource = withDerivedProperties(
replicatedExchange(idAllocator.getNextId(), REMOTE, filteringSource.getNode()),
filteringSource.getProperties());
}
}
}
else {
source = node.getSource().accept(this, context.withPreferredProperties(PreferredProperties.any()));
Expand Down

0 comments on commit 8c35026

Please sign in to comment.