Skip to content

Commit 7dd8dd5

Browse files
committed
Adds new interfaces and stub methods for data sources API partitioning support
1 parent 4b5e1fe commit 7dd8dd5

File tree

2 files changed

+135
-3
lines changed

2 files changed

+135
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,6 +1417,21 @@ class DataFrame private[sql](
14171417
save(source, mode, options.toMap)
14181418
}
14191419

1420+
/**
1421+
* :: Experimental ::
1422+
* Saves the contents of this DataFrame to the given path based on the given data source,
1423+
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
1424+
* @group output
1425+
*/
1426+
@Experimental
1427+
def save(
1428+
source: String,
1429+
mode: SaveMode,
1430+
options: java.util.Map[String, String],
1431+
partitionColumns: java.util.List[String]): Unit = {
1432+
???
1433+
}
1434+
14201435
/**
14211436
* :: Experimental ::
14221437
* (Scala-specific)
@@ -1432,6 +1447,21 @@ class DataFrame private[sql](
14321447
ResolvedDataSource(sqlContext, source, mode, options, this)
14331448
}
14341449

1450+
/**
1451+
* :: Experimental ::
1452+
* Saves the contents of this DataFrame to the given path based on the given data source,
1453+
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
1454+
* @group output
1455+
*/
1456+
@Experimental
1457+
def save(
1458+
source: String,
1459+
mode: SaveMode,
1460+
options: Map[String, String],
1461+
partitionColumns: Seq[String]): Unit = {
1462+
???
1463+
}
1464+
14351465
/**
14361466
* :: Experimental ::
14371467
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import org.apache.spark.annotation.{Experimental, DeveloperApi}
20+
import org.apache.hadoop.conf.Configuration
21+
22+
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2123
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.sql.{SaveMode, DataFrame, Row, SQLContext}
23-
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2425
import org.apache.spark.sql.types.StructType
26+
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
2527

2628
/**
2729
* ::DeveloperApi::
@@ -78,6 +80,40 @@ trait SchemaRelationProvider {
7880
schema: StructType): BaseRelation
7981
}
8082

83+
/**
84+
* ::DeveloperApi::
85+
* Implemented by objects that produce relations for a specific kind of data source
86+
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
87+
* USING clause specified (to specify the implemented SchemaRelationProvider), a user defined
88+
* schema, and an optional list of partition columns, this interface is used to pass in the
89+
* parameters specified by a user.
90+
*
91+
* Users may specify the fully qualified class name of a given data source. When that class is
92+
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
93+
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
94+
* data source 'org.apache.spark.sql.json.DefaultSource'
95+
*
96+
* A new instance of this class with be instantiated each time a DDL call is made.
97+
*
98+
* The difference between a [[RelationProvider]] and a [[PartitionedSchemaRelationProvider]] is
99+
* that users need to provide a schema and a (possibly empty) list of partition columns when
100+
* using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
101+
* [[SchemaRelationProvider]], and [[PartitionedSchemaRelationProvider]] if it can support schema
102+
* inference, user-specified schemas, and accessing partitioned relations.
103+
*/
104+
trait PartitionedSchemaRelationProvider {
105+
/**
106+
* Returns a new base relation with the given parameters, a user defined schema, and a list of
107+
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
108+
* is enforced by the Map that is passed to the function.
109+
*/
110+
def createRelation(
111+
sqlContext: SQLContext,
112+
parameters: Map[String, String],
113+
schema: StructType,
114+
partitionColumns: StructType): BaseRelation
115+
}
116+
81117
@DeveloperApi
82118
trait CreatableRelationProvider {
83119
/**
@@ -207,3 +243,69 @@ trait InsertableRelation {
207243
trait CatalystScan {
208244
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
209245
}
246+
247+
/**
248+
* ::Experimental::
249+
* [[OutputWriter]] is used together with [[FSBasedPrunedFilteredScan]] for persisting rows to the
250+
* underlying file system. An [[OutputWriter]] instance is created when a new output file is
251+
* opened. This instance is used to persist rows to this single output file.
252+
*/
253+
@Experimental
254+
trait OutputWriter {
255+
/**
256+
* Persists a single row. Invoked on the executor side.
257+
*/
258+
def write(row: Row): Unit
259+
260+
/**
261+
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
262+
* the task output is committed.
263+
*/
264+
def close(): Unit
265+
}
266+
267+
/**
268+
* ::Experimental::
269+
* A [[BaseRelation]] that abstracts file system based data sources.
270+
*
271+
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
272+
* filter using selected predicates before producing an RDD containing all matching tuples as
273+
* [[Row]] objects.
274+
*
275+
* In addition, when reading from Hive style partitioned tables stored in file systems, it's able to
276+
* discover partitioning information from the paths of input directories, and perform partition
277+
* pruning before start reading the data.
278+
*
279+
* For the write path, it provides the ability to write to both non-partitioned and partitioned
280+
* tables. Directory layout of the partitioned tables is compatible with Hive.
281+
*/
282+
@Experimental
283+
trait FSBasedPrunedFilteredScan extends BaseRelation {
284+
/**
285+
* Builds an `RDD[Row]` containing all rows within this relation.
286+
*
287+
* @param requiredColumns Required columns.
288+
* @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
289+
* of all `filters`. The pushed down filters are currently purely an optimization as they
290+
* will all be evaluated again. This means it is safe to use them with methods that produce
291+
* false positives such as filtering partitions based on a bloom filter.
292+
* @param inputPaths Data files to be read. If the underlying relation is partitioned, only data
293+
* files within required partition directories are included.
294+
*/
295+
def buildScan(
296+
requiredColumns: Array[String],
297+
filters: Array[Filter],
298+
inputPaths: Array[String]): RDD[Row]
299+
300+
/**
301+
* When writing rows to this relation, this method is invoked on the driver side before the actual
302+
* write job is issued. It provides an opportunity to configure the write job to be performed.
303+
*/
304+
def prepareForWrite(conf: Configuration): Unit
305+
306+
/**
307+
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output
308+
* file on the executor side.
309+
*/
310+
def newOutputWriter(path: String): OutputWriter
311+
}

0 commit comments

Comments
 (0)