Skip to content

Conversation

@marmbrus
Copy link
Contributor

@marmbrus marmbrus commented Mar 4, 2016

HadoopFsRelation is used for reading most files into Spark SQL. However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data. As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency. This PR is a first cut at separating this into several components / interfaces that are each described below. Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API FileFormat. External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.

HadoopFsRelation

A simple case class that acts as a container for all of the metadata required to read from a datasource. All discovery, resolution and merging logic for schemas and partitions has been removed. This an internal representation that no longer needs to be exposed to developers.

case class HadoopFsRelation(
    sqlContext: SQLContext,
    location: FileCatalog,
    partitionSchema: StructType,
    dataSchema: StructType,
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String]) extends BaseRelation

FileFormat

The primary interface that will be implemented by each different format including external libraries. Implementors are responsible for reading a given format and converting it into InternalRow as well as writing out an InternalRow. A format can optionally return a schema that is inferred from a set of files.

trait FileFormat {
  def inferSchema(
      sqlContext: SQLContext,
      options: Map[String, String],
      files: Seq[FileStatus]): Option[StructType]

  def prepareWrite(
      sqlContext: SQLContext,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory

  def buildInternalScan(
      sqlContext: SQLContext,
      dataSchema: StructType,
      requiredColumns: Array[String],
      filters: Array[Filter],
      bucketSet: Option[BitSet],
      inputFiles: Array[FileStatus],
      broadcastedConf: Broadcast[SerializableConfiguration],
      options: Map[String, String]): RDD[InternalRow]
}

The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. bucketSet is passed down to the scan instead of being resolved by the planner). Additionally, scans are still returning RDDs instead of iterators for single files. In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.

FileCatalog

This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.

trait FileCatalog {
  def paths: Seq[Path]
  def partitionSpec(schema: Option[StructType]): PartitionSpec
  def allFiles(): Seq[FileStatus]
  def getStatus(path: Path): Array[FileStatus]
  def refresh(): Unit
}

Currently there are two implementations:

  • HDFSFileCatalog - based on code from the old HadoopFsRelation. Infers partitioning by recursive listing and caches this data for performance
  • HiveFileCatalog - based on the above, but it uses the partition spec from the Hive Metastore.

ResolvedDataSource

Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):

  • paths: Seq[String] = Nil
  • userSpecifiedSchema: Option[StructType] = None
  • partitionColumns: Array[String] = Array.empty
  • bucketSpec: Option[BucketSpec] = None
  • provider: String
  • options: Map[String, String]

This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones). All reconciliation of partitions, buckets, schema from metastores or inference is done here.

DataSourceAnalysis / DataSourceStrategy

Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:

  • pruning the files from partitions that will be read based on filters.
  • appending partition columns*
  • applying additional filters when a data source can not evaluate them internally.
  • constructing an RDD that is bucketed correctly when required*
  • sanity checking schema match-up and other analysis when writing.

*In the future we should do that following:

  • Break out file handling into its own Strategy as its sufficiently complex / isolated.
  • Push the appending of partition columns down in to FileFormat to avoid an extra copy / unvectorization.
  • Use a custom RDD for scans instead of SQLNewNewHadoopRDD2

marmbrus and others added 23 commits February 26, 2016 14:39
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
	sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
	sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
	sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
	sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
	sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
@marmbrus
Copy link
Contributor Author

marmbrus commented Mar 4, 2016

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52439 has finished for PR 11509 at commit ac54278.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

test("call failure callbacks before close writer - default") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is deleted because it's flaky? Or because it does not work with new APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be rewritten to work against the new API. Filed SPARK-13681

class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
class DefaultSource extends FileFormat with DataSourceRegister {

override def shortName(): String = "csv"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add toString for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@davies
Copy link
Contributor

davies commented Mar 5, 2016

Did one pass on this, looks great! All the comments are minor, it's fine to be addressed later.

options: Map[String, String]): RDD[InternalRow] = {
// TODO: This does not handle cases where column pruning has been performed.

verifySchema(dataSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also verify schema when write? i.e. in prepareWrite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we do already, on line 69

marmbrus added 3 commits March 7, 2016 10:43
Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
	sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52590 has finished for PR 11509 at commit 3e5c7b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Mar 7, 2016

Going to merge this in master.

We should rename HiveFileCatalog to MetastoreFileCatalog. cc @andrewor14

@asfgit asfgit closed this in e720dda Mar 7, 2016
@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52582 has finished for PR 11509 at commit fd65bcb.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.


private var _partitionSpec: PartitionSpec = _
/**
* Used to read a write data in files to [[InternalRow]] format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a write -> and write

@koertkuipers
Copy link
Contributor

i believe the need to pass all files along (e.g. inputFiles: Array[FileStatus]) instead of just the input paths came from the need to cache it so that stuff looked snappy on s3 which has slow meta operations.
however it is not very realistic to pass along all files for real datasets, since it can easily be size 100k+ (and some people reported using millions of files on mailing list).

because of this inputFiles param we now need driver programs with 16G of heap or larger (before 1G was enough), and even then it doesn't always work on very large datasets. i would hate to see inputFiles make it into spark 2.0 api, instead of just inputPaths.

@marmbrus
Copy link
Contributor Author

marmbrus commented Mar 8, 2016

@koertkuipers improving the efficiency of working with large files was certainly a goal in this refactoring and this API is definitely not done yet. That said, I'm not really sure that the correct thing to do is to avoid listing all of the files at the driver. Every version of Spark SQL has done this listing AFAIK during split planning even before we added a caching layer.

@koertkuipers
Copy link
Contributor

if it did then it was not always in the apis i think? i remember the apis
having paths: Seq[String] instead of files: Seq[FileStatus]. by explicitly
requiring the user to list all files in the api you make it impossible not
to, even if it turns out it is not always necessary. for 1mm files thats no
joke.

i found it was relatively straightforward to revert back to paths:
Seq[String] once i ripped out the cache, modified partition discovery, and
disabled some kind of data size estimation. so i more or less assumed it
wasn't used anywhere else. but i might have missed split planning.

On Tue, Mar 8, 2016 at 1:26 PM, Michael Armbrust notifications@github.com
wrote:

@koertkuipers https://github.com/koertkuipers improving the efficiency
of working with large files was certainly a goal in this refactoring and
this API is definitely not done yet. That said, I'm not really sure that
the correct thing to do is to avoid listing all of the files at the driver.
Every version of Spark SQL has done this listing AFAIK during split
planning even before we added a caching layer.


Reply to this email directly or view it on GitHub
#11509 (comment).

ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 8, 2016
Follow-up to apache#11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`.
 - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`.  Actual resolution is now done by calling a function on this class.
 - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`.
 - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places.

Author: Michael Armbrust <michael@databricks.com>

Closes apache#11572 from marmbrus/dataSourceResolution.
ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 9, 2016
…cked.

## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13728

apache#11509 makes the output only single ORC file.
It was 10 files but this PR writes only single file. So, this could not skip stripes in ORC by the pushed down filters.
So, this PR simply repartitions data into 10 so that the test could pass.
## How was this patch tested?

unittest and `./dev/run_tests` for code style test.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#11593 from HyukjinKwon/SPARK-13728.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
`HadoopFsRelation` is used for reading most files into Spark SQL.  However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data.  As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency.  This PR is a first cut at separating this into several components / interfaces that are each described below.  Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`.  External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.

### HadoopFsRelation
A simple `case class` that acts as a container for all of the metadata required to read from a datasource.  All discovery, resolution and merging logic for schemas and partitions has been removed.  This an internal representation that no longer needs to be exposed to developers.

```scala
case class HadoopFsRelation(
    sqlContext: SQLContext,
    location: FileCatalog,
    partitionSchema: StructType,
    dataSchema: StructType,
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String]) extends BaseRelation
```

### FileFormat
The primary interface that will be implemented by each different format including external libraries.  Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`.  A format can optionally return a schema that is inferred from a set of files.

```scala
trait FileFormat {
  def inferSchema(
      sqlContext: SQLContext,
      options: Map[String, String],
      files: Seq[FileStatus]): Option[StructType]

  def prepareWrite(
      sqlContext: SQLContext,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory

  def buildInternalScan(
      sqlContext: SQLContext,
      dataSchema: StructType,
      requiredColumns: Array[String],
      filters: Array[Filter],
      bucketSet: Option[BitSet],
      inputFiles: Array[FileStatus],
      broadcastedConf: Broadcast[SerializableConfiguration],
      options: Map[String, String]): RDD[InternalRow]
}
```

The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner).  Additionally, scans are still returning `RDD`s instead of iterators for single files.  In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.

### FileCatalog
This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.

```scala
trait FileCatalog {
  def paths: Seq[Path]
  def partitionSpec(schema: Option[StructType]): PartitionSpec
  def allFiles(): Seq[FileStatus]
  def getStatus(path: Path): Array[FileStatus]
  def refresh(): Unit
}
```

Currently there are two implementations:
 - `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`.  Infers partitioning by recursive listing and caches this data for performance
 - `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.

### ResolvedDataSource
Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
 - `paths: Seq[String] = Nil`
 - `userSpecifiedSchema: Option[StructType] = None`
 - `partitionColumns: Array[String] = Array.empty`
 - `bucketSpec: Option[BucketSpec] = None`
 - `provider: String`
 - `options: Map[String, String]`

This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones).  All reconciliation of partitions, buckets, schema from metastores or inference is done here.

### DataSourceAnalysis / DataSourceStrategy
Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
 - pruning the files from partitions that will be read based on filters.
 - appending partition columns*
 - applying additional filters when a data source can not evaluate them internally.
 - constructing an RDD that is bucketed correctly when required*
 - sanity checking schema match-up and other analysis when writing.

*In the future we should do that following:
 - Break out file handling into its own Strategy as its sufficiently complex / isolated.
 - Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
 - Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`

Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#11509 from marmbrus/fileDataSource.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
Follow-up to apache#11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`.
 - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`.  Actual resolution is now done by calling a function on this class.
 - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`.
 - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places.

Author: Michael Armbrust <michael@databricks.com>

Closes apache#11572 from marmbrus/dataSourceResolution.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…cked.

## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13728

apache#11509 makes the output only single ORC file.
It was 10 files but this PR writes only single file. So, this could not skip stripes in ORC by the pushed down filters.
So, this PR simply repartitions data into 10 so that the test could pass.
## How was this patch tested?

unittest and `./dev/run_tests` for code style test.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#11593 from HyukjinKwon/SPARK-13728.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants