Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Sep 7, 2016

What changes were proposed in this pull request?

When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this line, where a LinkedHashSet.values.toSeq returns Stream. Then when the FileStreamSource filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir.

Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires recursively going through linked list, thus resulting in StackOverflowError.

In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true.

  • file stream defined on a partitioned directory
  • directory has 10k+ files

The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways.

  • Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq
  • Added a require in HDFSMetadataLog such that it is never used with type Seq

How was this patch tested?

Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above.

@tdas
Copy link
Contributor Author

tdas commented Sep 7, 2016

@yhuai @zsxwing Can you take a look.

extends MetadataLog[T] with Logging {

// Avoid serializing generic sequences, see SPARK-17372
require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
Copy link
Contributor Author

@tdas tdas Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a best effort attempt for future Spark developers from accidentally using new HDFSMetadatalog[Seq[X]].

@zsxwing
Copy link
Member

zsxwing commented Sep 7, 2016

LGTM pending tests

@SparkQA
Copy link

SparkQA commented Sep 7, 2016

Test build #65015 has finished for PR 14987 at commit 9bcbb08.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Sep 7, 2016
…ays to save file names in FileStreamSource

## What changes were proposed in this pull request?

When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93), where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79) filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir.

Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError.

In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true.
- file stream defined on a partitioned directory
- directory has 10k+ files

The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways.
- Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq
- Added a `require` in HDFSMetadataLog such that it is never used with type Seq

## How was this patch tested?

Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #14987 from tdas/SPARK-17372.

(cherry picked from commit eb1ab88)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in eb1ab88 Sep 7, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants