Skip to content

Conversation

@JoshRosen
Copy link
Contributor

This patch disables output spec. validation for jobs launched through Spark Streaming, since this interferes with checkpoint recovery.

Hadoop OutputFormats have a checkOutputSpecs method which performs certain checks prior to writing output, such as checking whether the output directory already exists. SPARK-1100 added checks for FileOutputFormat, SPARK-1677 (#947) added a SparkConf configuration to disable these checks, and SPARK-2309 (#1088) extended these checks to run for all OutputFormats, not just FileOutputFormat.

In Spark Streaming, we might have to re-process a batch during checkpoint recovery, so save actions may be called multiple times. In addition to DStream's own save actions, users might use transform or foreachRDD and call the RDD and PairRDD save actions. When output spec. validation is enabled, the second calls to these actions will fail due to existing output.

This patch automatically disables output spec. validation for jobs submitted by the Spark Streaming scheduler. This is done by using Scala's DynamicVariable to propagate the bypass setting without having to mutate SparkConf or introduce a global variable.

@JoshRosen JoshRosen changed the title [SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs [SPARK-4835] [WIP] Disable validateOutputSpecs for Spark Streaming jobs Dec 30, 2014
@SparkQA
Copy link

SparkQA commented Dec 30, 2014

Test build #24874 has started for PR 3832 at commit 762e473.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor Author

The [WIP] tag in the title is because I just realized that we may be able to improve this based on some discussion at the JIRA ticket.

@SparkQA
Copy link

SparkQA commented Dec 30, 2014

Test build #24874 has finished for PR 3832 at commit 762e473.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GaussianMixtureModel(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24874/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to say "jobs generated through Spark Streaming's StreamingContext" rather than "scheduler" and stuff.

@tdas
Copy link
Contributor

tdas commented Dec 30, 2014

Few minor comments, almost good to go.

@tdas
Copy link
Contributor

tdas commented Dec 30, 2014

@pwendell You should take a look as well.
@JoshRosen Do you think you can add a unit test that covers this pretty-subtle logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not USED by Spark Streaming. Its used by Spark, but it allows higher-level frameworks (like Spark Streaming) to override the behavior. Also this comment should not go into the details of why Spark Streaming updates it. Just refer to JIRA, and the Spark Streaming;s scheduler code change can do the explaining of the issue. This keeps Spark code from referring to too much of Spark Streaming.

@JoshRosen
Copy link
Contributor Author

I'd be glad to add a test here, although this might be a little tricky since the old behavior resulted in silent failures; I should be able to come up with a test though.

Regarding the streaming-specific spark.streaming.hadoop.validateOutputSpecs setting, which of the following behaviors is more intuitive?

  1. Streaming jobs always respect the Streaming version of the setting and non-streaming jobs respect the regular version. If the streaming checks are enabled but the core checks are disabled, then we do output spec validation for streaming.
  2. The Streaming version is just a gate which controls whether the core setting also applies to streaming jobs. If the streaming setting is true but the core setting is false, then the checks are not applied.

Which of these makes more sense? I think that option 2 is a better backwards-compatibility escape hatch / flag.

@tdas
Copy link
Contributor

tdas commented Dec 31, 2014

In (2), what is the default setting of the 2 params? Somehow I am finding (2) more confusing than (1).

Why do you think 2 is better for backwards compatibility? The behavior earlier was actually buggy, that needs to be solved.

@JoshRosen
Copy link
Contributor Author

Since the old behavior was buggy, can we just omit the spark.streaming.hadoop.validateOutputSpecs setting for now and only add it once we have a non-buggy version of it?

@tdas
Copy link
Contributor

tdas commented Jan 1, 2015

So basically you want to split this PR into two - (1) fix the bug, and (2) introduce an override. I like the idea. Lets do that.

@JoshRosen JoshRosen changed the title [SPARK-4835] [WIP] Disable validateOutputSpecs for Spark Streaming jobs [SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs Jan 3, 2015
@SparkQA
Copy link

SparkQA commented Jan 3, 2015

Test build #25003 has started for PR 3832 at commit 6485cf8.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor Author

@tdas I've updated this PR and added a test case. My test case uses calls inside of a transform() call to emulate what Streaming's saveAsHadoopFiles operation does. Is this a valid use of transform() or am I breaking rules by having actions in my transform function? My gut says that we shouldn't endorse / recommend this for the same reason that we advise against using accumulators inside of map() tasks: the transform call might get evaluated multiple times if caching isn't use, which makes it possible to write programs whose behavior changes depending on whether caching is enabled.

I wasn't able to get the existing "recovery with saveAsNewAPIHadoopFiles operation" test to fail, though, even though I discovered this bug while refactoring that test in my other PR. I think that the issue is that the failed saveAsNewAPIHadoopFiles jobs failed but did not trigger a failure of the other actions / transformations in that batch, so we still got the correct output even though the batch completion event wasn't posted to the listener bus. The current tests rely on wall-clock time to detect when batches have been processed and hence didn't detect that the batch completion event was missing. I noticed that the StreamingListener API doesn't really have any events for job / batch failures, but that's a topic for a separate PR.

I was about to write that this bug might not actually affect users who don't use transform but it still impacts users in the partial-failure case where they've used PairDStreamFunctions.saveAsHadoopFiles() but a batch fails with partially-written output: an individual output partition might be atomically committed to the output directory (e.g. if the file exists, then it has the right contents), but I think we can still wind up in a scenario where only a subset of the partitions are written and the non-empty output directory prevents the recovery from recomputing the missing partitions.

@SparkQA
Copy link

SparkQA commented Jan 3, 2015

Test build #25003 has finished for PR 3832 at commit 6485cf8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25003/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Jan 4, 2015

Yes, this is hardly a valid used case of transform. I took a look at the unit test, and I think I know why it works. But this unit test is very very obfuscated, and hard to understand why it works. Does it even fail reliably without the fix? Because if it does not fail reliably without the fix then there isnt much point of having it. If it fails reliably without the fix, and does not fail reliably with the fix, then I am okay with unit test as long as there is sufficient comments explaining how this unit test works. Maybe we can open another JIRA mentioning the exceptions that do not get propagated which makes this unit test so complicated. And refer to the JIRA in the comments around the unit test. Other than that I think I am okay. For this subtle an issue (validity checks), its better to have some unit test than have none at all.

@JoshRosen
Copy link
Contributor Author

The new test added here always failed before the fix in this patch. I agree that it's a bit convoluted; it would be better to catch this bug in the original saveAs* tests, but those tests won't be able to catch it until they're rewritten to use StreamingTestWaiter. I can add a comment / TODO to remove this test once we've strengthened the other tests in a later PR.

@JoshRosen
Copy link
Contributor Author

Alright, I've pushed one more commit that explains the confusing transform() use and refers to https://issues.apache.org/jira/browse/SPARK-5079. Feel free to fix up / adjust the wording yourself on commit.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25040 has started for PR 3832 at commit 36eaf35.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25040 has finished for PR 3832 at commit 36eaf35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25040/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before However

@tdas
Copy link
Contributor

tdas commented Jan 5, 2015

There are few nits, but this LGTM even without it. I am merging this. Thanks for this finding and solving this!

asfgit pushed a commit that referenced this pull request Jan 5, 2015
This patch disables output spec. validation for jobs launched through Spark Streaming, since this interferes with checkpoint recovery.

Hadoop OutputFormats have a `checkOutputSpecs` method which performs certain checks prior to writing output, such as checking whether the output directory already exists.  SPARK-1100 added checks for FileOutputFormat, SPARK-1677 (#947) added a SparkConf configuration to disable these checks, and SPARK-2309 (#1088) extended these checks to run for all OutputFormats, not just FileOutputFormat.

In Spark Streaming, we might have to re-process a batch during checkpoint recovery, so `save` actions may be called multiple times.  In addition to `DStream`'s own save actions, users might use `transform` or `foreachRDD` and call the `RDD` and `PairRDD` save actions.  When output spec. validation is enabled, the second calls to these actions will fail due to existing output.

This patch automatically disables output spec. validation for jobs submitted by the Spark Streaming scheduler.  This is done by using Scala's `DynamicVariable` to propagate the bypass setting without having to mutate SparkConf or introduce a global variable.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3832 from JoshRosen/SPARK-4835 and squashes the following commits:

36eaf35 [Josh Rosen] Add comment explaining use of transform() in test.
6485cf8 [Josh Rosen] Add test case in Streaming; fix bug for transform()
7b3e06a [Josh Rosen] Remove Streaming-specific setting to undo this change; update conf. guide
bf9094d [Josh Rosen] Revise disableOutputSpecValidation() comment to not refer to Spark Streaming.
e581d17 [Josh Rosen] Deduplicate isOutputSpecValidationEnabled logic.
762e473 [Josh Rosen] [SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs.

(cherry picked from commit 939ba1f)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 939ba1f Jan 5, 2015
@JoshRosen JoshRosen deleted the SPARK-4835 branch January 5, 2015 04:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants