Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify creation of join operator #13783

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,45 @@
package io.trino.operator;

import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.LookupJoinOperatorFactory.JoinType;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory;
import io.trino.spi.type.Type;
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.type.BlockTypeOperators;

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER;
import static java.util.Objects.requireNonNull;

public interface OperatorFactories
{
OperatorFactory innerJoin(
public OperatorFactory join(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean waitForBuild,
JoinBridgeManager<? extends PartitionedLookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);

OperatorFactory probeOuterJoin(
public OperatorFactory spillingJoin(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -56,32 +61,62 @@ OperatorFactory probeOuterJoin(
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);

OperatorFactory lookupOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean waitForBuild,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);
public static class JoinOperatorType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifier 'public' is redundant for interface members
Modifier 'static' is redundant for inner classes of interfaces

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I wonder why InteliJ did not notice this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've InteliJ Ultimate - and it's not that I'm an eagle.

{
private final JoinType type;
private final boolean outputSingleMatch;
private final boolean waitForBuild;

OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);
public static JoinOperatorType ofJoinNodeType(JoinNode.Type joinNodeType, boolean outputSingleMatch, boolean waitForBuild)
{
return switch (joinNodeType) {
case INNER -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT -> probeOuterJoin(outputSingleMatch);
case RIGHT -> lookupOuterJoin(waitForBuild);
case FULL -> fullOuterJoin();
};
}

public static JoinOperatorType innerJoin(boolean outputSingleMatch, boolean waitForBuild)
{
return new JoinOperatorType(INNER, outputSingleMatch, waitForBuild);
}

public static JoinOperatorType probeOuterJoin(boolean outputSingleMatch)
{
return new JoinOperatorType(PROBE_OUTER, outputSingleMatch, false);
}

public static JoinOperatorType lookupOuterJoin(boolean waitForBuild)
{
return new JoinOperatorType(LOOKUP_OUTER, false, waitForBuild);
}

public static JoinOperatorType fullOuterJoin()
{
return new JoinOperatorType(FULL_OUTER, false, false);
}

private JoinOperatorType(JoinType type, boolean outputSingleMatch, boolean waitForBuild)
{
this.type = requireNonNull(type, "type is null");
this.outputSingleMatch = outputSingleMatch;
this.waitForBuild = waitForBuild;
}

public boolean isOutputSingleMatch()
{
return outputSingleMatch;
}

public boolean isWaitForBuild()
{
return waitForBuild;
}

public JoinType getType()
{
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.JoinProbe.JoinProbeFactory;
import io.trino.operator.join.LookupJoinOperatorFactory;
import io.trino.operator.join.LookupJoinOperatorFactory.JoinType;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory;
import io.trino.spi.type.Type;
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.sql.planner.plan.PlanNodeId;
Expand All @@ -29,144 +29,76 @@
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER;

public class TrinoOperatorFactories
implements OperatorFactories
{
@Override
public OperatorFactory innerJoin(
public OperatorFactory join(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean waitForBuild,
JoinBridgeManager<? extends PartitionedLookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
Optional<List<Integer>> probeOutputChannelsOptional,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
INNER,
outputSingleMatch,
waitForBuild,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
}
List<Integer> probeOutputChannels = probeOutputChannelsOptional.orElse(rangeList(probeTypes.size()));
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

@Override
public OperatorFactory probeOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactory.getBuildOutputTypes(),
joinType,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
PROBE_OUTER,
outputSingleMatch,
false,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
probeHashChannel);
}

@Override
public OperatorFactory lookupOuterJoin(
public OperatorFactory spillingJoin(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean waitForBuild,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
Optional<List<Integer>> probeOutputChannelsOptional,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
LOOKUP_OUTER,
false,
waitForBuild,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
}
List<Integer> probeOutputChannels = probeOutputChannelsOptional.orElse(rangeList(probeTypes.size()));
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

@Override
public OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
return createJoinOperatorFactory(
return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactory.getBuildOutputTypes(),
joinType,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
FULL_OUTER,
false,
false,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
partitioningSpillerFactory);
}

private static List<Integer> rangeList(int endExclusive)
Expand All @@ -175,59 +107,4 @@ private static List<Integer> rangeList(int endExclusive)
.boxed()
.collect(toImmutableList());
}

private OperatorFactory createJoinOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<?> lookupSourceFactoryManager,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
List<Integer> probeOutputChannels,
JoinType joinType,
boolean outputSingleMatch,
boolean waitForBuild,
boolean spillingEnabled,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
{
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

if (spillingEnabled) {
return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
(JoinBridgeManager<? extends LookupSourceFactory>) lookupSourceFactoryManager,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
partitioningSpillerFactory);
}

return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory(
operatorId,
planNodeId,
(JoinBridgeManager<? extends io.trino.operator.join.unspilled.PartitionedLookupSourceFactory>) lookupSourceFactoryManager,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
probeJoinChannel,
probeHashChannel);
}
}
Loading