Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouped execution support for JOINs with Hive connector #8951

Merged
merged 16 commits into from
Dec 9, 2017

Conversation

haozhun
Copy link
Contributor

@haozhun haozhun commented Sep 11, 2017

With the right table organization (e.g. bucketing in Hive), it is possible to process a subset of data at a time for JOINs. This reduces the amount of memory needed to hold the hash table.

@haozhun haozhun self-assigned this Sep 11, 2017
@haozhun haozhun changed the title [WIP] Process a subset of buckets at a time for JOINs Grouped execution support for JOINs with Hive connector Sep 12, 2017
@haozhun haozhun assigned martint and unassigned haozhun Sep 12, 2017
@haozhun
Copy link
Contributor Author

haozhun commented Sep 22, 2017

rebased, @martint

First 20 commits (all commits up to "Make SqlTaskExecution work with LocalExecutionPlan instead of Fragment") should be pretty easy to review.

The 21st commit ("Make SqlTaskExecution work with LocalExecutionPlan instead of Fragment") is a complex one. But it's well tested. Let me know whenever you have any questions.

Copy link
Contributor

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with these commits. Had some comments, especially on the one about refactoring TestBufferingSplitSource:

  • Remove unused field from HivePartitionKey
  • Add initialCount parameter to ReferenceCount constructor
  • Mark LookupJoinOperatorFactory as UsedByGeneratedCode
  • Improve TestMetadataManager to avoid stuck test
  • Make TestBackgroundSplitLoader not use DirectExecutor
  • Refactor TestBufferingSplitSource
  • Add javadoc for guarantee provided by SourcePartitionedScheduler
  • Add javadoc to clarify ConnectorSplitSource.isFinished requirement

@@ -86,7 +88,12 @@ public void testMetadataIsClearedAfterQueryCanceled()

// wait until query starts running
while (true) {
if (queryManager.getQueryInfo(queryId).getState() == RUNNING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment to commit message explaining why it was getting stuck.

@@ -69,7 +70,7 @@
private static final Path RETURNED_PATH = new Path(SAMPLE_PATH);
private static final Path FILTERED_PATH = new Path(SAMPLE_PATH_FILTERED);

private static final Executor EXECUTOR = directExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message needs explanation of motivation to not use direct executor.

List<ConnectorSplit> connectorSplits = hiveSplitSource.getNextBatch(1).get();
assertEquals(1, connectorSplits.size());
assertEquals(RETURNED_PATH.toString(), ((HiveSplit) connectorSplits.get(0)).getPath());
private List<String> drainHiveSplitSource(HiveSplitSource hiveSplitSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argument could just be named source for simplicity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the method could be named drain, too

ImmutableList.Builder<String> paths = ImmutableList.builder();
while (true) {
List<ConnectorSplit> splits = hiveSplitSource.getNextBatch(100).get();
for (ConnectorSplit connectorSplit : splits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable could be named split for simplicity.


public class MockSplitSource
implements SplitSource
{
private static final Split SPLIT = new Split(new ConnectorId("test"), new ConnectorTransactionHandle() {}, new MockConnectorSplit());
private static final SettableFuture<List<Split>> COMPLETED_FUTURE = SettableFuture.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures.immediateFuture(null) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this can, technically, result in a memory leak. I went looking at Guava's docs and was surprised to see there's no explicit statement of whether a ListenableFuture (and, in particular, SettableFuture) is expected to not hold on to the reference to the listener once completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures.immediateFuture cannot be used because it returns a ListenableFuture, not a SettableFuture. We discussed this in person.


private void doGetNextBatch()
{
if (splitsProduced >= totalSplits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

splitsProduce can never be > than totalSplits, right? I'd change the check to == and add a checkState to help catch any logic errors that could cause it to go over.

throws Exception
{
MockSplitSource mockSource = new MockSplitSource(1, 25);
MockSplitSource mockSource = new MockSplitSource().setBatchSize(1).increaseAvailableSplits(25).atSplitCompletion(FINISH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place each .setXXX on a separate line. It makes this easier to read:

MockSplitSource mockSource = new MockSplitSource()
        .setBatchSize(1)
        .increaseAvailableSplits(25)
        .atSplitCompletion(FINISH);

{
ListenableFuture<?> result;
switch (nextBatchCall) {
case SINGLE_ARGUMENT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this? It doesn't seem to serve any purpose in this PR, so either remove it or move it to the PR that's going to need it.

Copy link
Contributor Author

@haozhun haozhun Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this offline. I'm avoiding structural changes to this file in the future commits so that those will be easier to review. As you have realized, reviewing this commit is quite involving despite it made little material change.

}
}

public void testDriverGroups()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

* In the event that no ordinary split is available from the underlying SplitSource,
* a synthesized EmptySplit will be scheduled.
* This at-least-one-split guarantee is provided on a per-SplitSource basis,
* not per-DriverGroup basis.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a "DriverGroup" concept at this point, so defer this part of the javadoc until it's introduced.

Copy link
Contributor Author

@haozhun haozhun Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move this commit (Add javadoc for guarantee provided by SourcePartitionedScheduler) after Support addressable split group in SplitSource/Manager

Copy link
Contributor

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with a few more:

  • Rename TableLayout.PartitioningColumn to StreamPartitioningColumn
  • Rename HiveSplitSource.finished to noMoreSplits
  • Rename OperatorFactory.close to noMoreOperators
  • Rename DriverFactory.close to noMoreOperators
  • Document guarantee on invocation of OperatorFactory.noMoreOperators
  • Add SplitManager to PlanFragmenter
  • Rename NodePartitioning to TablePartitioning
  • Add parameter to ConnectorSplitManager.getSplits

@@ -80,7 +80,7 @@ public TableLayoutHandle getHandle()
nodePartitioning.getPartitioningColumns()));
}

public Optional<Set<ColumnHandle>> getPartitioningColumns()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mention, in the commit message, that this should've probably been renamed when stream properties were added and this is just correcting that oversight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose you meant to say "when node properties were added"

@@ -216,9 +216,9 @@ private void invokeFinishedIfNecessary()
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain motivation in commit message. I.e., what's wrong with "finished"?

@@ -81,7 +81,7 @@ from lineitem join orders using (orderkey)
driversBuilder.add(hashBuilder);
DriverFactory hashBuildDriverFactory = new DriverFactory(0, true, false, driversBuilder.build(), OptionalInt.empty());
Driver hashBuildDriver = hashBuildDriverFactory.createDriver(taskContext.addPipelineContext(0, true, false).addDriverContext());
hashBuildDriverFactory.close();
hashBuildDriverFactory.noMoreDriver();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noMoreDrivers?

Also, commit message incorrectly refers to noMoreOperators

@@ -27,7 +26,6 @@
import static java.util.Objects.requireNonNull;

public class DriverFactory
implements Closeable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to put this in the previous commit?

public synchronized void noMoreDriver()
{
if (!closed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it to Rename DriverFactory.close to noMoreDrivers. It's still an unrelated change. But this file is touched in that commit. And I don't think something this minor need a stand-alone commit.

@@ -65,7 +66,7 @@ private PlanFragmenter()
{
}

public static SubPlan createSubPlans(Session session, Metadata metadata, Plan plan)
public static SubPlan createSubPlans(Session session, Metadata metadata, SplitManager splitManager, Plan plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This change warrants an explanation of the motivation, especially since it's being added without anything using it.

@@ -57,7 +57,7 @@ public AccumuloSplitManager(
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change the title of the commit message to "Pass split scheduling strategy to ConnectorSplitManager.getSplits"

@@ -57,7 +57,7 @@ public AccumuloSplitManager(
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether this should be encoded in the layout, but I can't say without seeing how, exactly, this is derived. My intuition is that this is property of the plan shape, therefore, determined during planning/optimization (vs a runtime scheduling knob)

enum SplitSchedulingStrategy
{
ALL_AT_ONCE,
GROUPED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUPED notion doesn't exist at this point, so it should be deferred to the commit that adds it.


import static com.google.common.base.Preconditions.checkState;

@JsonSerialize(using = DriverGroupId.Serializer.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need custom serializer/deserializer? (I haven't looked deeply at what it does)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this offline. You can't do @JsonValue Integer because JsonCreator won't be invoked for null value.

@@ -85,4 +101,28 @@ public boolean isFinished()
{
return source.isFinished();
}

private static class FetchSplitsResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this part of the change related to making groups addressable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, BufferingSplitSource is implementing the new method in SplitSource that takes SplitGroupId.

@JsonDeserialize(using = DriverGroupId.Deserializer.class)
public class DriverGroupId
{
private final boolean grouped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's an ungrouped group?

public ListenableFuture<SplitBatch> getNextBatch(DriverGroupId driverGroupId, int maxSize)
{
checkState(driverGroupId.isGrouped() == (splitSchedulingStrategy == SplitSchedulingStrategy.GROUPED));
ListenableFuture<ConnectorSplitBatch> nextBatch = toListenableFuture(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard to read. Introduce a variable for the conditional expression.

class SplitBatch
{
private final List<Split> splits;
private final boolean noMoreSplits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it lastBatch. It will make isNoMoreSplits sound better: isLastBatch

if (!sourceOperator.isPresent() || !sourceOperator.get().getSourceId().equals(sourceUpdate.getPlanNodeId())) {
return;
}
checkArgument(sourceOperator.isPresent() && sourceOperator.get().getSourceId().equals(sourceUpdate.getPlanNodeId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a message to the checkArgument call.

@@ -118,16 +118,52 @@ public static SqlTaskExecution createSqlTaskExecution(
Executor notificationExecutor,
QueryMonitor queryMonitor)
{
LocalExecutionPlan localExecutionPlan;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd get rid of this constructor and have the caller (SqlTaskExecutionFactory?) convert {planner,fragment} -> local execution plan, since planner and fragment are not used by this class anymore.

Copy link
Contributor

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments I had pending. They may not be relevant after our face-to-face discussion last week, but I didn't want them to get lost.

this.noMoreSplits = noMoreSplits;
}

public TaskSource(
PlanNodeId planNodeId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguments can all go on the same line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

@@ -28,19 +29,30 @@
{
private final PlanNodeId planNodeId;
private final Set<ScheduledSplit> splits;
private final Set<DriverGroupId> noMoreSplitsForDriverGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this field is misleading given its type. Maybe call it groupsWithNoMoreSplits or groupsWithoutMoreSplits?

@GuardedBy("this")
private final ConcurrentMap<PlanNodeId, TaskSource> pendingSplits = new ConcurrentHashMap<>();
private final Map<PlanNodeId, SplitsForPlanNode> pendingSplitsMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingSplitsMap is not a good name. Maybe call this pendingSplitsPerSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

PlanNode plan,
List<Symbol> outputLayout,
Map<Symbol, Type> types,
List<PlanNodeId> partitionedSourceOrder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's "partitionedSourceOrder"? Specifically, what does "order" mean in this context?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the order in which you must start partitioned splits to avoid deadlocks... so for a collocated join, you must start all build splits before you schedule any probes.

@@ -128,6 +135,18 @@ public static SqlTaskExecution createSqlTaskExecution(
fragment.getPartitioningScheme(),
fragment.getPartitionedSources(),
outputBuffer);

for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of these validations? They just seem to be ensuring that LocalExecutionPlanner is doing its job. If it's to catch potential bugs in that class, I'd move them into that class.

}

// Splits for a particular plan node and driver group combination
class SplitsForDriverGroupInPlanNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing related to driver, plan node or group in this class. It seems to be a container for related splits. A name like SplitGroup might be more appropriate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static

class SplitsForDriverGroupInPlanNode
{
private Set<ScheduledSplit> splits = new HashSet<>();
private SplitsState state = INITIALIZED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of the INITIALIZED state? The only significant checks either do state == SPLITS_ADDED || state == INITIALIZED or look for NO_MORE_SPLITS. Also, I'm not sure why we need a FINISHED state. It's never used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

}
}

public void markAsCleanedUp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? It doesn't do anything other than set state to FINISHED, which is never checked or used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It effectively closes. I added checks.

}

// Splits for a particular plan node (all driver groups)
class SplitsForPlanNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tag these classes with @NotThreadSafe

// Splits for a particular plan node (all driver groups)
class SplitsForPlanNode
{
private final Map<DriverGroupId, SplitsForDriverGroupInPlanNode> map = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map is not a good name. Maybe perGroup?

Copy link
Contributor

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These commits look good:

  • Remove unused field from HivePartitionKey
  • Fix theoretical busy loop in HiveSplitLoader/Source
  • Add initialCount parameter to ReferenceCount constructor
  • Mark LookupJoinOperatorFactory as UsedByGeneratedCode
  • Improve TestMetadataManager to avoid stuck test
  • Make TestBackgroundSplitLoader not use DirectExecutor
  • Refactor TestBufferingSplitSource
  • Add javadoc to clarify ConnectorSplitSource.isFinished requirement
  • Rename TableLayout.PartitioningColumn to StreamPartitioningColumn
  • Rename HiveSplitSource.finished to noMoreSplits
  • Rename OperatorFactory.close to noMoreOperators
  • Document guarantee on invocation of OperatorFactory.noMoreOperators
  • Rename NodePartitioning to TablePartitioning

@@ -367,7 +367,7 @@ private PlanRoot doAnalyzeQuery()
stateMachine.setOutput(output);

// fragment the plan
SubPlan subplan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, plan);
SubPlan subplan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, splitManager, plan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget if we discussed this, but why isn't all the necessary information available from table metadata? If that's the case, the fragmenter shouldn't have to rely on SplitManager.


enum SplitSchedulingStrategy
{
ALL_AT_ONCE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ALL_AT_ONCE, SINGLE_GROUP? "All at once" seems to imply they will all get scheduled at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "ungrouped"? all at once is confusing to me because I thought this was the execution strategy (e.g. phased).

@haozhun
Copy link
Contributor Author

haozhun commented Oct 6, 2017

I merged commits that @martint approved.

@haozhun
Copy link
Contributor Author

haozhun commented Nov 16, 2017

Comments addressed

@dain dain assigned haozhun and unassigned martint and dain Nov 22, 2017
This allows ConnectorSplitManager implementation to return a different
SplitSource depending on whether addressable split groups are needed.
ConnectorNodePartitioningProvider.listPartitionHandles lists all
PartitionHandles that belong to a ConnectorPartitioningHandle.

This commit is a step toward supporting addressable splits.
PlanFragmenter needs to have acceess to NodePartitioningManager to know
if splits of a table can be discovered in an addressable fashion.
This code was useful when Driver can have multiple source node.
This capability has been removed for a long time.
This commit removes artifact left over from back then.
Previously, it takes in Fragment, and invokes LocalExecutionPlanner to get
the LocalExecutionPlan. However, it continues to look into some properties
in Fragment.

This commit adds the additional meta properties into LocalExecutionPlan
so that SqlTaskExecution don't look at Fragment any more (except to turn
it into LocalExecutionPlan). This commit also adds a constructor that
takes LocalExecutionPlan directly for improved testability.
This commit brings the concept of driver groups to tasks, pipelines,
drivers, and operators on workers.

In particular, changes are applied to SqlTask, SqlTaskExecution,
Driver/OperatorFactory, Pipeline/DriverContext.
Operators that share state across drivers need to be aware of grouped
execution in order to manage lifecycle correctly.

LocalExchange is one such operators. LocalExchangeSinkOperator and
LocalExchangeSourceOperator share page buffer across drivers.
There were some methods in PartitionedLookupSourceFactory that isn't part of
LookupSourceFactory interfaces. In a later commit, I need to add a delegator
for PartitionedLookupSourceFactory. It doesn't work because existing code
tries to downcast LookupSourceFactory to PartitionedLookupSourceFactory,
which doesn't work anymore with the delegator.
The test shuts down the executor after every single test case to terminate
any outstanding threads. This can lead to excessive logging of
RejectionExecutionHandler, which in turn leads to Travis failure.
@sopel39
Copy link
Contributor

sopel39 commented Dec 13, 2017

@haozhun It would be great if you could provide some brief explanation for community how grouped execution is implemented. This change touches various components (planner, execution, etc) and it is quite large, so such description would be very helpful.

Especially some information about:

  • what new concepts were introduced to Presto codebase?
  • how planning is affected?
  • how execution pipeline is affected?
  • what are the most important considerations when using grouping?

@findepi @kokosing @losipiuk any other question ideas?

FYI: @kbajda @mattsfuller

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants