Skip to content

Conversation

@liancheng
Copy link
Contributor

@liancheng liancheng commented Oct 17, 2016

What changes were proposed in this pull request?

(This PR is based on a PoC branch authored by @clockfly.)

Iterative ML code may easily create query plans that grow exponentially. We found that query planning time also increases exponentially even when all the sub-plan trees are cached.

The following snippet illustrates the problem:

(0 until 6).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
  val start = System.currentTimeMillis()
  val result = plan.join(plan, "value").join(plan, "value").join(plan, "value").join(plan, "value")
  result.cache()
  System.out.println(s"Iteration $iteration takes time ${System.currentTimeMillis() - start} ms")
  result.as[Int]
}

// Iteration 0 takes time 9 ms
// Iteration 1 takes time 19 ms
// Iteration 2 takes time 61 ms
// Iteration 3 takes time 219 ms
// Iteration 4 takes time 830 ms
// Iteration 5 takes time 4080 ms

This is because when building a new Dataset, the new plan is always built upon QueryExecution.analyzed, which doesn't leverage existing cached plans. This PR tries to fix this issue by building new Datasets upon QueryExecution.withCachedData to leverage cached plans and avoid super slow query planning.

Here is the result of running 1,000 iterations using the same posted above after applying this PR:

Iteration 0 takes time 10 ms
Iteration 1 takes time 48 ms
Iteration 2 takes time 39 ms
Iteration 3 takes time 56 ms
Iteration 4 takes time 43 ms
Iteration 5 takes time 36 ms
Iteration 6 takes time 43 ms
Iteration 7 takes time 44 ms
Iteration 8 takes time 38 ms
Iteration 9 takes time 42 ms
...
Iteration 990 takes time 207 ms
Iteration 991 takes time 187 ms
Iteration 992 takes time 223 ms
Iteration 993 takes time 220 ms
Iteration 994 takes time 231 ms
Iteration 995 takes time 211 ms
Iteration 996 takes time 216 ms
Iteration 997 takes time 199 ms
Iteration 998 takes time 209 ms
Iteration 999 takes time 202 ms

Query planning time slows down much more slowly. This is mostly because cache manager query lookup slows down as entries stored in the cache manager grows.

Many thanks to @clockfly, who investigated this issue and made an initial PoC.

How was this patch tested?

Existing tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original toString method may OOM for super large query plans. This is especially true for those plan trees built in iterative manner and grow exponentially.

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67081 has finished for PR 15517 at commit 292ef36.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@naliazheli
Copy link

LGTM.
Util this issue is resolved,I can only do Dataset.toRdd.checkpoint() to avoid the growing time of qurry plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, we also cache the analyzed plan right?

I think the major change is that, now we cache cached plan instead of analyzed plan.

@liancheng liancheng changed the title [SPARK-17972][SQL] Cache analyzed plan instead of optimized plan to avoid slow query planning [SPARK-17972][SQL] Build Datasets upon withCachedData instead of analyzed to avoid slow query planning Oct 18, 2016
@liancheng
Copy link
Contributor Author

The previous test failure was because we replace the analyzed plan with withCacheData, while cache manager uses the original analyzed plan as keys.

Force-pushed a new and much simpler approach by building new Datasets upon withCacheData. Let's see whether Jenkins passes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67120 has finished for PR 15517 at commit e1283a8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Oct 19, 2016

cc @mengxr and @jkbradley

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67208 has finished for PR 15517 at commit fffbd69.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

The most recent version still breaks some test cases related to caching. Investigating it.


lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is actually moved to optimizedPlan. It's for fixing the streaming test failures.

Although streaming queries doesn't use QueryExecution for actual execution, it somehow triggers this line after changes made in the previous commit and throws exception. Not quite familiar with structured streaming though, I may missed something here.

@liancheng
Copy link
Contributor Author

I'm closing this since caching is not the ultimate solution for this problem anyway. Caching is too memory consuming when you, say, computing connected components in an iterative way over a graph with 50 billion nodes.

Going to add a checkpoint API for Dataset so that we can truncate both the plan tree and the RDD lineage without caching.

@liancheng liancheng closed this Oct 21, 2016
@naliazheli
Copy link

dataset.checkpoint is what i need.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants