Skip to content

Conversation

@liancheng
Copy link
Contributor

@liancheng liancheng commented Oct 27, 2016

What changes were proposed in this pull request?

Problem

Iterative ML code may easily create query plans that grow exponentially. We found that query planning time also increases exponentially even when all the sub-plan trees are cached.

The following snippet illustrates the problem:

(0 until 6).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
  println(s"== Iteration $iteration ==")
  val time0 = System.currentTimeMillis()
  val joined = plan.join(plan, "value").join(plan, "value").join(plan, "value").join(plan, "value")
  joined.cache()
  println(s"Query planning takes ${System.currentTimeMillis() - time0} ms")
  joined.as[Int]
}

// == Iteration 0 ==
// Query planning takes 9 ms
// == Iteration 1 ==
// Query planning takes 26 ms
// == Iteration 2 ==
// Query planning takes 53 ms
// == Iteration 3 ==
// Query planning takes 163 ms
// == Iteration 4 ==
// Query planning takes 700 ms
// == Iteration 5 ==
// Query planning takes 3418 ms

This is because when building a new Dataset, the new plan is always built upon QueryExecution.analyzed, which doesn't leverage existing cached plans.

On the other hand, usually, doing caching every a few iterations may not be the right direction for this problem since caching is too memory consuming (imaging computing connected components over a graph with 50 billion nodes). What we really need here is to truncate both the query plan (to minimize query planning time) and the lineage of the underlying RDD (to avoid stack overflow).

Changes introduced in this PR

This PR tries to fix this issue by introducing a checkpoint() method into Dataset[T], which does exactly the things described above. The following snippet, which is essentially the same as the one above but invokes checkpoint() instead of cache(), shows the micro benchmark result of this PR:

One key point is that the checkpointed Dataset should preserve the origianl partitioning and ordering information of the original Dataset, so that we can avoid unnecessary shuffling (similar to reading from a pre-bucketed table). This is done by adding outputPartitioning and outputOrdering to LogicalRDD and RDDScanExec.

Micro benchmark

spark.sparkContext.setCheckpointDir("/tmp/cp")

(0 until 100).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
  println(s"== Iteration $iteration ==")
  val time0 = System.currentTimeMillis()
  val cp = plan.checkpoint()
  cp.count()
  System.out.println(s"Checkpointing takes ${System.currentTimeMillis() - time0} ms")

  val time1 = System.currentTimeMillis()
  val joined = cp.join(cp, "value").join(cp, "value").join(cp, "value").join(cp, "value")
  val result = joined.as[Int]

  println(s"Query planning takes ${System.currentTimeMillis() - time1} ms")
  result
}

// == Iteration 0 ==
// Checkpointing takes 591 ms
// Query planning takes 13 ms
// == Iteration 1 ==
// Checkpointing takes 1605 ms
// Query planning takes 16 ms
// == Iteration 2 ==
// Checkpointing takes 782 ms
// Query planning takes 8 ms
// == Iteration 3 ==
// Checkpointing takes 729 ms
// Query planning takes 10 ms
// == Iteration 4 ==
// Checkpointing takes 734 ms
// Query planning takes 9 ms
// == Iteration 5 ==
// ...
// == Iteration 50 ==
// Checkpointing takes 571 ms
// Query planning takes 7 ms
// == Iteration 51 ==
// Checkpointing takes 548 ms
// Query planning takes 7 ms
// == Iteration 52 ==
// Checkpointing takes 596 ms
// Query planning takes 8 ms
// == Iteration 53 ==
// Checkpointing takes 568 ms
// Query planning takes 7 ms
// ...

You may see that although checkpointing is more heavy weight an operation, it always takes roughly the same amount of time to perform both checkpointing and query planning.

Open question

@mengxr mentioned that it would be more convenient if we can make Dataset.checkpoint() eager, i.e., always performs a RDD.count() after calling RDD.checkpoint(). Not quite sure whether this is a universal requirement. Maybe we can add a eager: Boolean argument for Dataset.checkpoint() to support that.

How was this patch tested?

Unit test added in DatasetSuite.

@liancheng
Copy link
Contributor Author

cc @mengxr @jkbradley @yhuai

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we would like to pick the first leaf Partitioning here is that PartitioningCollection, which is also an Expression and participates query planning, may grow exponentially in the benchmark snippet, which essentially builds a full binary tree of Joins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the partitioning other than the first useful? Can we just filter out the partitioning guaranteed by other partitionings instead of picking the first only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be cases where the optimizer fails to eliminate an unnecessary shuffle if we strip all the other partitionings. But that's still better than an exponentially growing PartitioningCollection, which basically runs into the same slow query planning issue this PR tries to solve.

I talked to @yhuai offline about exactly the same issue you brought up before sending out this PR, and we decided to have a working version first and optimize it later since we still need feedback from ML people to see whether the basic mechanism works for their workloads.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67608 has finished for PR 15651 at commit c7503e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set it back to the original checkpoint dir at the end of this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks.

@viirya
Copy link
Member

viirya commented Oct 27, 2016

@liancheng We have RDD.localCheckpoint which provides alternative approach to truncate periodically long lineages while skipping the expensive step of replicating the materialized data in a reliable distributed file system.

What do you think we can also add it to Dataset?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to convert the old attributes in outputPartitioning and outputOrdering to new attributes? Otherwise the partitioning and ordering are still referring to old attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good point. I think we probably don't need to call _.newInstance() here anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we do need the _.newInstance() call. You're right.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67619 has finished for PR 15651 at commit 1dea5af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67660 has finished for PR 15651 at commit b631d57.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67663 has finished for PR 15651 at commit b631d57.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

@viirya Dataset.localCheckpoint() also makes sense. Would like to add it as a follow-up though. Thanks for the suggestion!

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67679 has finished for PR 15651 at commit 0ee8f41.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Restore checkpoint directory at the end of the test case

Add eager argument to Dataset.checkpoint()

Address PR comments
@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67691 has finished for PR 15651 at commit 609fba7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

Also cc @JoshRosen

case e: Attribute => rewrite.getOrElse(e, e)
}.asInstanceOf[Partitioning]

case p => p
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all Partitioning classes are Expressions, while we only need to rewrite attributes within those Partitionings that are also Expressions.


/**
* Returns a checkpointed version of this Dataset.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this API does not have this param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice catch!

@viirya
Copy link
Member

viirya commented Oct 31, 2016

LGTM except for one minor comment.

@SparkQA
Copy link

SparkQA commented Oct 31, 2016

Test build #67802 has finished for PR 15651 at commit cee16ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Oct 31, 2016

lgtm pending jenkins

@SparkQA
Copy link

SparkQA commented Oct 31, 2016

Test build #67813 has finished for PR 15651 at commit 5405a94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 31, 2016

Test build #67816 has finished for PR 15651 at commit ffe4318.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 31, 2016

Test build #67817 has finished for PR 15651 at commit e0e38bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 8bfc3b7 Oct 31, 2016
@liancheng liancheng deleted the ds-checkpoint branch November 2, 2016 21:09
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…lans

## What changes were proposed in this pull request?
### Problem

Iterative ML code may easily create query plans that grow exponentially. We found that query planning time also increases exponentially even when all the sub-plan trees are cached.

The following snippet illustrates the problem:

``` scala
(0 until 6).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
  println(s"== Iteration $iteration ==")
  val time0 = System.currentTimeMillis()
  val joined = plan.join(plan, "value").join(plan, "value").join(plan, "value").join(plan, "value")
  joined.cache()
  println(s"Query planning takes ${System.currentTimeMillis() - time0} ms")
  joined.as[Int]
}

// == Iteration 0 ==
// Query planning takes 9 ms
// == Iteration 1 ==
// Query planning takes 26 ms
// == Iteration 2 ==
// Query planning takes 53 ms
// == Iteration 3 ==
// Query planning takes 163 ms
// == Iteration 4 ==
// Query planning takes 700 ms
// == Iteration 5 ==
// Query planning takes 3418 ms
```

This is because when building a new Dataset, the new plan is always built upon `QueryExecution.analyzed`, which doesn't leverage existing cached plans.

On the other hand, usually, doing caching every a few iterations may not be the right direction for this problem since caching is too memory consuming (imaging computing connected components over a graph with 50 billion nodes). What we really need here is to truncate both the query plan (to minimize query planning time) and the lineage of the underlying RDD (to avoid stack overflow).
### Changes introduced in this PR

This PR tries to fix this issue by introducing a `checkpoint()` method into `Dataset[T]`, which does exactly the things described above. The following snippet, which is essentially the same as the one above but invokes `checkpoint()` instead of `cache()`, shows the micro benchmark result of this PR:

One key point is that the checkpointed Dataset should preserve the origianl partitioning and ordering information of the original Dataset, so that we can avoid unnecessary shuffling (similar to reading from a pre-bucketed table). This is done by adding `outputPartitioning` and `outputOrdering` to `LogicalRDD` and `RDDScanExec`.
### Micro benchmark

``` scala
spark.sparkContext.setCheckpointDir("/tmp/cp")

(0 until 100).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
  println(s"== Iteration $iteration ==")
  val time0 = System.currentTimeMillis()
  val cp = plan.checkpoint()
  cp.count()
  System.out.println(s"Checkpointing takes ${System.currentTimeMillis() - time0} ms")

  val time1 = System.currentTimeMillis()
  val joined = cp.join(cp, "value").join(cp, "value").join(cp, "value").join(cp, "value")
  val result = joined.as[Int]

  println(s"Query planning takes ${System.currentTimeMillis() - time1} ms")
  result
}

// == Iteration 0 ==
// Checkpointing takes 591 ms
// Query planning takes 13 ms
// == Iteration 1 ==
// Checkpointing takes 1605 ms
// Query planning takes 16 ms
// == Iteration 2 ==
// Checkpointing takes 782 ms
// Query planning takes 8 ms
// == Iteration 3 ==
// Checkpointing takes 729 ms
// Query planning takes 10 ms
// == Iteration 4 ==
// Checkpointing takes 734 ms
// Query planning takes 9 ms
// == Iteration 5 ==
// ...
// == Iteration 50 ==
// Checkpointing takes 571 ms
// Query planning takes 7 ms
// == Iteration 51 ==
// Checkpointing takes 548 ms
// Query planning takes 7 ms
// == Iteration 52 ==
// Checkpointing takes 596 ms
// Query planning takes 8 ms
// == Iteration 53 ==
// Checkpointing takes 568 ms
// Query planning takes 7 ms
// ...
```

You may see that although checkpointing is more heavy weight an operation, it always takes roughly the same amount of time to perform both checkpointing and query planning.
### Open question

mengxr mentioned that it would be more convenient if we can make `Dataset.checkpoint()` eager, i.e., always performs a `RDD.count()` after calling `RDD.checkpoint()`. Not quite sure whether this is a universal requirement. Maybe we can add a `eager: Boolean` argument for `Dataset.checkpoint()` to support that.
## How was this patch tested?

Unit test added in `DatasetSuite`.

Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes apache#15651 from liancheng/ds-checkpoint.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants