Skip to content

Commit

Permalink
Simplify creation of join operator
Browse files Browse the repository at this point in the history
Since the split into spilling and non-spilling, creation of join operators had
some unnecessary casting. This commit cleans it and makes clear
spilling/non-spilling code paths in LocalExecutionPlanner.

Changes in this class are strictly mechanical. No logical changes are made.
  • Loading branch information
skrzypo987 committed Sep 5, 2022
1 parent 5150feb commit 1c8f7c2
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 458 deletions.
111 changes: 73 additions & 38 deletions core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,45 @@
package io.trino.operator;

import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.LookupJoinOperatorFactory.JoinType;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory;
import io.trino.spi.type.Type;
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.type.BlockTypeOperators;

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER;
import static java.util.Objects.requireNonNull;

public interface OperatorFactories
{
OperatorFactory innerJoin(
public OperatorFactory join(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean waitForBuild,
JoinBridgeManager<? extends PartitionedLookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);

OperatorFactory probeOuterJoin(
public OperatorFactory spillingJoin(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -56,32 +61,62 @@ OperatorFactory probeOuterJoin(
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);

OperatorFactory lookupOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean waitForBuild,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);
public static class JoinOperatorType
{
private final JoinType type;
private final boolean outputSingleMatch;
private final boolean waitForBuild;

OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);
public static JoinOperatorType ofJoinNodeType(JoinNode.Type joinNodeType, boolean outputSingleMatch, boolean waitForBuild)
{
return switch (joinNodeType) {
case INNER -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT -> probeOuterJoin(outputSingleMatch);
case RIGHT -> lookupOuterJoin(waitForBuild);
case FULL -> fullOuterJoin();
};
}

public static JoinOperatorType innerJoin(boolean outputSingleMatch, boolean waitForBuild)
{
return new JoinOperatorType(INNER, outputSingleMatch, waitForBuild);
}

public static JoinOperatorType probeOuterJoin(boolean outputSingleMatch)
{
return new JoinOperatorType(PROBE_OUTER, outputSingleMatch, false);
}

public static JoinOperatorType lookupOuterJoin(boolean waitForBuild)
{
return new JoinOperatorType(LOOKUP_OUTER, false, waitForBuild);
}

public static JoinOperatorType fullOuterJoin()
{
return new JoinOperatorType(FULL_OUTER, false, false);
}

private JoinOperatorType(JoinType type, boolean outputSingleMatch, boolean waitForBuild)
{
this.type = requireNonNull(type, "type is null");
this.outputSingleMatch = outputSingleMatch;
this.waitForBuild = waitForBuild;
}

public boolean isOutputSingleMatch()
{
return outputSingleMatch;
}

public boolean isWaitForBuild()
{
return waitForBuild;
}

public JoinType getType()
{
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.JoinProbe.JoinProbeFactory;
import io.trino.operator.join.LookupJoinOperatorFactory;
import io.trino.operator.join.LookupJoinOperatorFactory.JoinType;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory;
import io.trino.spi.type.Type;
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.sql.planner.plan.PlanNodeId;
Expand All @@ -29,144 +29,76 @@
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER;

public class TrinoOperatorFactories
implements OperatorFactories
{
@Override
public OperatorFactory innerJoin(
public OperatorFactory join(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean waitForBuild,
JoinBridgeManager<? extends PartitionedLookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
Optional<List<Integer>> probeOutputChannelsOptional,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
INNER,
outputSingleMatch,
waitForBuild,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
}
List<Integer> probeOutputChannels = probeOutputChannelsOptional.orElse(rangeList(probeTypes.size()));
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

@Override
public OperatorFactory probeOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactory.getBuildOutputTypes(),
joinType,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
PROBE_OUTER,
outputSingleMatch,
false,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
probeHashChannel);
}

@Override
public OperatorFactory lookupOuterJoin(
public OperatorFactory spillingJoin(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean waitForBuild,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
Optional<List<Integer>> probeOutputChannelsOptional,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
LOOKUP_OUTER,
false,
waitForBuild,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
}
List<Integer> probeOutputChannels = probeOutputChannelsOptional.orElse(rangeList(probeTypes.size()));
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

@Override
public OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactory.getBuildOutputTypes(),
joinType,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
FULL_OUTER,
false,
false,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
partitioningSpillerFactory);
}

private static List<Integer> rangeList(int endExclusive)
Expand All @@ -175,59 +107,4 @@ private static List<Integer> rangeList(int endExclusive)
.boxed()
.collect(toImmutableList());
}

private OperatorFactory createJoinOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactoryManager,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
List<Integer> probeOutputChannels,
JoinType joinType,
boolean outputSingleMatch,
boolean waitForBuild,
boolean spillingEnabled,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

if (spillingEnabled) {
return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
(JoinBridgeManager<? extends LookupSourceFactory>) lookupSourceFactoryManager,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
partitioningSpillerFactory);
}

return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory(
operatorId,
planNodeId,
(JoinBridgeManager<? extends io.trino.operator.join.unspilled.PartitionedLookupSourceFactory>) lookupSourceFactoryManager,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
probeJoinChannel,
probeHashChannel);
}
}
Loading

0 comments on commit 1c8f7c2

Please sign in to comment.