Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move local parallel to planner #4699

Merged
merged 19 commits into from
Apr 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;

public abstract class AbstractSimpleOperatorBenchmark
extends AbstractOperatorBenchmark
Expand All @@ -47,7 +48,7 @@ protected DriverFactory createDriverFactory()

operatorFactories.add(new NullOutputOperatorFactory(999, new PlanNodeId("test"), Iterables.getLast(operatorFactories).getTypes()));

return new DriverFactory(true, true, operatorFactories);
return new DriverFactory(true, true, operatorFactories, OptionalInt.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ protected List<? extends OperatorFactory> createOperatorFactories()
COUNT.bind(ImmutableList.of(2), Optional.empty(), Optional.empty(), 1.0)
),
Optional.empty(),
Optional.empty(),
10_000,
new DataSize(16, MEGABYTE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ protected List<? extends OperatorFactory> createOperatorFactories()
Step.SINGLE,
ImmutableList.of(DOUBLE_SUM.bind(ImmutableList.of(1), Optional.empty(), Optional.empty(), 1.0)),
Optional.empty(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE));
return ImmutableList.of(tableScanOperator, aggregationOperator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.benchmark.BenchmarkQueryRunner.createLocalQueryRunner;
import static com.facebook.presto.benchmark.BenchmarkQueryRunner.createLocalQueryRunnerHashEnabled;
Expand Down Expand Up @@ -73,9 +74,9 @@ protected List<Driver> createDrivers(TaskContext taskContext)
}

// hash build
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(2, new PlanNodeId("test"), source.getTypes(), Ints.asList(0), hashChannel, 1_500_000);
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(2, new PlanNodeId("test"), source.getTypes(), Ints.asList(0), hashChannel, false, 1_500_000);
driversBuilder.add(hashBuilder);
DriverFactory hashBuildDriverFactory = new DriverFactory(true, false, driversBuilder.build());
DriverFactory hashBuildDriverFactory = new DriverFactory(true, false, driversBuilder.build(), OptionalInt.empty());
Driver hashBuildDriver = hashBuildDriverFactory.createDriver(taskContext.addPipelineContext(true, false).addDriverContext());

// join
Expand All @@ -92,7 +93,7 @@ protected List<Driver> createDrivers(TaskContext taskContext)
OperatorFactory joinOperator = LookupJoinOperators.innerJoin(2, new PlanNodeId("test"), hashBuilder.getLookupSourceSupplier(), source.getTypes(), Ints.asList(0), hashChannel);
joinDriversBuilder.add(joinOperator);
joinDriversBuilder.add(new NullOutputOperatorFactory(3, new PlanNodeId("test"), joinOperator.getTypes()));
DriverFactory joinDriverFactory = new DriverFactory(true, true, joinDriversBuilder.build());
DriverFactory joinDriverFactory = new DriverFactory(true, true, joinDriversBuilder.build(), OptionalInt.empty());
Driver joinDriver = joinDriverFactory.createDriver(taskContext.addPipelineContext(true, true).addDriverContext());

return ImmutableList.of(hashBuildDriver, joinDriver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.benchmark.BenchmarkQueryRunner.createLocalQueryRunner;

Expand All @@ -40,9 +41,16 @@ public HashBuildBenchmark(LocalQueryRunner localQueryRunner)
protected List<Driver> createDrivers(TaskContext taskContext)
{
OperatorFactory ordersTableScan = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey", "totalprice");
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(1, new PlanNodeId("test"), ordersTableScan.getTypes(), Ints.asList(0), Optional.empty(), 1_500_000);
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(
1,
new PlanNodeId("test"),
ordersTableScan.getTypes(),
Ints.asList(0),
Optional.empty(),
false,
1_500_000);

DriverFactory driverFactory = new DriverFactory(true, true, ordersTableScan, hashBuilder);
DriverFactory driverFactory = new DriverFactory(true, true, ImmutableList.of(ordersTableScan, hashBuilder), OptionalInt.empty());
Driver driver = driverFactory.createDriver(taskContext.addPipelineContext(true, true).addDriverContext());
return ImmutableList.of(driver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.benchmark.BenchmarkQueryRunner.createLocalQueryRunner;

Expand All @@ -52,10 +53,17 @@ protected List<Driver> createDrivers(TaskContext taskContext)
{
if (lookupSourceSupplier == null) {
OperatorFactory ordersTableScan = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey", "totalprice");
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(1, new PlanNodeId("test"), ordersTableScan.getTypes(), Ints.asList(0), Optional.empty(), 1_500_000);
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(
1,
new PlanNodeId("test"),
ordersTableScan.getTypes(),
Ints.asList(0),
Optional.empty(),
false,
1_500_000);

DriverContext driverContext = taskContext.addPipelineContext(false, false).addDriverContext();
Driver driver = new DriverFactory(false, false, ordersTableScan, hashBuilder).createDriver(driverContext);
Driver driver = new DriverFactory(false, false, ImmutableList.of(ordersTableScan, hashBuilder), OptionalInt.empty()).createDriver(driverContext);
while (!driver.isFinished()) {
driver.process();
}
Expand All @@ -68,7 +76,7 @@ protected List<Driver> createDrivers(TaskContext taskContext)

NullOutputOperatorFactory output = new NullOutputOperatorFactory(2, new PlanNodeId("test"), joinOperator.getTypes());

DriverFactory driverFactory = new DriverFactory(true, true, lineItemTableScan, joinOperator, output);
DriverFactory driverFactory = new DriverFactory(true, true, ImmutableList.of(lineItemTableScan, joinOperator, output), OptionalInt.empty());
DriverContext driverContext = taskContext.addPipelineContext(true, true).addDriverContext();
Driver driver = driverFactory.createDriver(driverContext);
return ImmutableList.of(driver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public GroupByHashPageIndexer(List<? extends Type> hashTypes)
hashTypes,
IntStream.range(0, hashTypes.size()).toArray(),
Optional.empty(),
Optional.empty(),
20,
false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,7 +31,9 @@
import static com.facebook.presto.spi.session.PropertyMetadata.booleanSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringSessionProperty;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;

public final class SystemSessionProperties
{
Expand All @@ -39,10 +43,7 @@ public final class SystemSessionProperties
public static final String HASH_PARTITION_COUNT = "hash_partition_count";
public static final String PREFER_STREAMING_OPERATORS = "prefer_streaming_operators";
public static final String TASK_WRITER_COUNT = "task_writer_count";
public static final String TASK_JOIN_CONCURRENCY = "task_join_concurrency";
public static final String TASK_HASH_BUILD_CONCURRENCY = "task_hash_build_concurrency";
public static final String TASK_AGGREGATION_CONCURRENCY = "task_aggregation_concurrency";
public static final String TASK_INTERMEDIATE_AGGREGATION = "task_intermediate_aggregation";
public static final String TASK_CONCURRENCY = "task_concurrency";
public static final String TASK_SHARE_INDEX_LOADING = "task_share_index_loading";
public static final String QUERY_MAX_MEMORY = "query_max_memory";
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
Expand Down Expand Up @@ -119,26 +120,22 @@ public SystemSessionProperties(
"Parallelize writes when using UNION ALL in queries that write data",
featuresConfig.isPushTableWriteThroughUnion(),
false),
integerSessionProperty(
TASK_JOIN_CONCURRENCY,
"Experimental: Default number of local parallel join jobs per worker",
taskManagerConfig.getTaskJoinConcurrency(),
false),
integerSessionProperty(
TASK_HASH_BUILD_CONCURRENCY,
"Experimental: Default number of local parallel hash build jobs per worker",
taskManagerConfig.getTaskDefaultConcurrency(),
false),
integerSessionProperty(
TASK_AGGREGATION_CONCURRENCY,
"Experimental: Default number of local parallel aggregation jobs per worker",
taskManagerConfig.getTaskDefaultConcurrency(),
false),
booleanSessionProperty(
TASK_INTERMEDIATE_AGGREGATION,
"Experimental: add intermediate aggregation jobs per worker",
featuresConfig.isIntermediateAggregationsEnabled(),
false),
new PropertyMetadata<>(
TASK_CONCURRENCY,
"Default number of local parallel jobs per worker",
BIGINT,
Integer.class,
taskManagerConfig.getTaskConcurrency(),
false,
value -> {
int concurrency = ((Number) value).intValue();
if (Integer.bitCount(concurrency) != 1) {
throw new PrestoException(
StandardErrorCode.INVALID_SESSION_PROPERTY,
format("%s must be a power of 2: %s", TASK_CONCURRENCY, concurrency));
}
return concurrency;
}),
booleanSessionProperty(
TASK_SHARE_INDEX_LOADING,
"Share index join lookups and caching within a task",
Expand Down Expand Up @@ -263,24 +260,9 @@ public static boolean isPushTableWriteThroughUnion(Session session)
return session.getProperty(PUSH_TABLE_WRITE_THROUGH_UNION, Boolean.class);
}

public static int getTaskJoinConcurrency(Session session)
{
return session.getProperty(TASK_JOIN_CONCURRENCY, Integer.class);
}

public static int getTaskHashBuildConcurrency(Session session)
{
return session.getProperty(TASK_HASH_BUILD_CONCURRENCY, Integer.class);
}

public static int getTaskAggregationConcurrency(Session session)
{
return session.getProperty(TASK_AGGREGATION_CONCURRENCY, Integer.class);
}

public static boolean isIntermediateAggregation(Session session)
public static int getTaskConcurrency(Session session)
{
return session.getProperty(TASK_INTERMEDIATE_AGGREGATION, Boolean.class);
return session.getProperty(TASK_CONCURRENCY, Integer.class);
}

public static boolean isShareIndexLoading(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private PlanRoot doAnalyzeQuery()
stateMachine.setInputs(inputs);

// fragment the plan
SubPlan subplan = new PlanFragmenter().createSubPlans(plan);
SubPlan subplan = new PlanFragmenter().createSubPlans(stateMachine.getSession(), metadata, plan);

// record analysis time
stateMachine.recordAnalysisTime(analysisStart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -149,9 +150,7 @@ private SqlTaskExecution(
fragment.getRoot(),
fragment.getSymbols(),
fragment.getPartitionFunction(),
sharedBuffer,
fragment.getPartitioning().isSingleNode(),
fragment.getPartitionedSource() == null);
sharedBuffer);
driverFactories = localExecutionPlan.getDriverFactories();
}
catch (Throwable e) {
Expand All @@ -166,6 +165,7 @@ private SqlTaskExecution(
for (DriverFactory driverFactory : driverFactories) {
if (driverFactory.getSourceIds().contains(fragment.getPartitionedSource())) {
checkState(partitionedDriverFactory == null, "multiple partitioned sources are not supported");
checkArgument(!driverFactory.getDriverInstances().isPresent(), "Driver instances must not be set on a partitioned driver");
partitionedDriverFactory = new DriverSplitRunnerFactory(driverFactory);
}
else {
Expand Down Expand Up @@ -208,7 +208,7 @@ private void start()
// start unpartitioned drivers
List<DriverSplitRunner> runners = new ArrayList<>();
for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) {
for (int i = 0; i < driverFactory.getDriverInstances(); i++) {
for (int i = 0; i < driverFactory.getDriverInstances().orElse(1); i++) {
runners.add(driverFactory.createDriverRunner(null, false));
}
driverFactory.setNoMoreSplits();
Expand Down Expand Up @@ -507,7 +507,7 @@ private void closeDriverFactoryIfFullyCreated()
}
}

public int getDriverInstances()
public OptionalInt getDriverInstances()
{
return driverFactory.getDriverInstances();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public class TaskManagerConfig
private Duration infoUpdateInterval = new Duration(200, TimeUnit.MILLISECONDS);

private int writerCount = 1;
private int taskDefaultConcurrency = 1;
private Integer taskJoinConcurrency;
private int taskConcurrency = 1;
private int httpResponseThreads = 100;
private int httpTimeoutThreads = 3;

Expand Down Expand Up @@ -293,33 +292,16 @@ public TaskManagerConfig setWriterCount(int writerCount)
}

@Min(1)
public int getTaskJoinConcurrency()
public int getTaskConcurrency()
{
if (taskJoinConcurrency == null) {
return taskDefaultConcurrency;
}
return taskJoinConcurrency;
}

@Config("task.join-concurrency")
@ConfigDescription("Local concurrency for join operators")
public TaskManagerConfig setTaskJoinConcurrency(int taskJoinConcurrency)
{
this.taskJoinConcurrency = taskJoinConcurrency;
return this;
}

@Min(1)
public int getTaskDefaultConcurrency()
{
return taskDefaultConcurrency;
return taskConcurrency;
}

@Config("task.default-concurrency")
@ConfigDescription("Default local concurrency for parallel operators")
public TaskManagerConfig setTaskDefaultConcurrency(int taskDefaultConcurrency)
@Config("task.concurrency")
@ConfigDescription("Default number of local parallel jobs per worker")
public TaskManagerConfig setTaskConcurrency(int taskConcurrency)
{
this.taskDefaultConcurrency = taskDefaultConcurrency;
this.taskConcurrency = taskConcurrency;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.IndexJoinNode;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
Expand Down Expand Up @@ -47,8 +48,10 @@

import static com.facebook.presto.execution.StageState.RUNNING;
import static com.facebook.presto.execution.StageState.SCHEDULED;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableMap;
import static com.google.common.base.Preconditions.checkArgument;

@NotThreadSafe
public class PhasedExecutionSchedule
Expand Down Expand Up @@ -249,6 +252,26 @@ public Set<PlanFragmentId> visitRemoteSource(RemoteSourceNode node, PlanFragment
return sources.build();
}

@Override
public Set<PlanFragmentId> visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId)
{
checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the phased execution scheduler");
ImmutableSet.Builder<PlanFragmentId> allSources = ImmutableSet.builder();

// Link the source fragments together, so we only schedule one at a time.
Set<PlanFragmentId> previousSources = ImmutableSet.of();
for (PlanNode subPlanNode : node.getSources()) {
Set<PlanFragmentId> currentSources = subPlanNode.accept(this, currentFragmentId);
allSources.addAll(currentSources);

addEdges(previousSources, currentSources);

previousSources = currentSources;
}

return allSources.build();
}

@Override
public Set<PlanFragmentId> visitUnion(UnionNode node, PlanFragmentId currentFragmentId)
{
Expand Down
Loading