Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,21 @@ object Unidoc {
names.map(s => "org.apache.spark." + s).mkString(":")
}

private def ignoreUndocumentedPackages(packages: Seq[Seq[File]]): Seq[Seq[File]] = {
packages
.map(_.filterNot(_.getName.contains("$")))
.map(_.filterNot(_.getCanonicalPath.contains("akka")))
.map(_.filterNot(_.getCanonicalPath.contains("deploy")))
.map(_.filterNot(_.getCanonicalPath.contains("network")))
.map(_.filterNot(_.getCanonicalPath.contains("shuffle")))
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
.map(_.filterNot(_.getCanonicalPath.contains("python")))
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/hive/test")))
}

lazy val settings = scalaJavaUnidocSettings ++ Seq (
publish := {},

Expand All @@ -368,22 +383,12 @@ object Unidoc {
// Skip actual catalyst, but include the subproject.
// Catalyst is not public API and contains quasiquotes which break scaladoc.
unidocAllSources in (ScalaUnidoc, unidoc) := {
(unidocAllSources in (ScalaUnidoc, unidoc)).value
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
ignoreUndocumentedPackages((unidocAllSources in (ScalaUnidoc, unidoc)).value)
},

// Skip class names containing $ and some internal packages in Javadocs
unidocAllSources in (JavaUnidoc, unidoc) := {
(unidocAllSources in (JavaUnidoc, unidoc)).value
.map(_.filterNot(_.getName.contains("$")))
.map(_.filterNot(_.getCanonicalPath.contains("akka")))
.map(_.filterNot(_.getCanonicalPath.contains("deploy")))
.map(_.filterNot(_.getCanonicalPath.contains("network")))
.map(_.filterNot(_.getCanonicalPath.contains("shuffle")))
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
.map(_.filterNot(_.getCanonicalPath.contains("python")))
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
ignoreUndocumentedPackages((unidocAllSources in (JavaUnidoc, unidoc)).value)
},

// Javadoc options: create a window title, and group key packages on index page
Expand Down
36 changes: 32 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[sql] object DataFrame {
* val people = sqlContext.parquetFile("...")
*
* // Create a DataFrame from data sources
* val df =
* val df = sqlContext.load("...", "json")
* }}}
*
* Once created, it can be manipulated using the various domain-specific-language (DSL) functions
Expand All @@ -80,9 +80,10 @@ private[sql] object DataFrame {
* {{{
* // The following creates a new column that increases everybody's age by 10.
* people("age") + 10 // in Scala
* people.col("age").plus(10); // in Java
* }}}
*
* A more concrete example:
* A more concrete example in Scala:
* {{{
* // To create DataFrame using SQLContext
* val people = sqlContext.parquetFile("...")
Expand All @@ -94,6 +95,18 @@ private[sql] object DataFrame {
* .agg(avg(people("salary")), max(people("age")))
* }}}
*
* and in Java:
* {{{
* // To create DataFrame using SQLContext
* DataFrame people = sqlContext.parquetFile("...");
* DataFrame department = sqlContext.parquetFile("...");
*
* people.filter("age".gt(30))
* .join(department, people.col("deptId").equalTo(department("id")))
* .groupBy(department.col("name"), "gender")
* .agg(avg(people.col("salary")), max(people.col("age")));
* }}}
*
* @groupname basic Basic DataFrame functions
* @groupname dfops Language Integrated Queries
* @groupname rdd RDD Operations
Expand All @@ -102,7 +115,7 @@ private[sql] object DataFrame {
*/
// TODO: Improve documentation.
@Experimental
class DataFrame protected[sql](
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
extends RDDApi[Row] with Serializable {
Expand Down Expand Up @@ -295,12 +308,14 @@ class DataFrame protected[sql](
* 1984 04 0.450090 0.483521
* }}}
* @param numRows Number of rows to show
* @group basic
*
* @group action
*/
def show(numRows: Int): Unit = println(showString(numRows))

/**
* Displays the top 20 rows of [[DataFrame]] in a tabular form.
* @group action
*/
def show(): Unit = show(20)

Expand Down Expand Up @@ -738,16 +753,19 @@ class DataFrame protected[sql](

/**
* Returns the first `n` rows.
* @group action
*/
def head(n: Int): Array[Row] = limit(n).collect()

/**
* Returns the first row.
* @group action
*/
def head(): Row = head(1).head

/**
* Returns the first row. Alias for head().
* @group action
*/
override def first(): Row = head()

Expand Down Expand Up @@ -831,6 +849,11 @@ class DataFrame protected[sql](
this
}

/**
* @group basic
*/
override def cache(): this.type = persist()

/**
* @group basic
*/
Expand All @@ -847,6 +870,11 @@ class DataFrame protected[sql](
this
}

/**
* @group basic
*/
override def unpersist(): this.type = unpersist(blocking = false)

/////////////////////////////////////////////////////////////////////////////
// I/O
/////////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import org.apache.spark.storage.StorageLevel
*/
private[sql] trait RDDApi[T] {

def cache(): this.type = persist()
def cache(): this.type

def persist(): this.type

def persist(newLevel: StorageLevel): this.type

def unpersist(): this.type = unpersist(blocking = false)
def unpersist(): this.type

def unpersist(blocking: Boolean): this.type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ private[sql] case class JDBCRelation(
url: String,
table: String,
parts: Array[Partition])(@transient val sqlContext: SQLContext)
extends PrunedFilteredScan {
extends BaseRelation
with PrunedFilteredScan {

override val schema = JDBCRDD.resolveTable(url, table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ private[sql] case class JSONRelation(
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
extends TableScan with InsertableRelation {
extends BaseRelation
with TableScan
with InsertableRelation {

// TODO: Support partitioned JSON relation.
private def baseRDD = sqlContext.sparkContext.textFile(path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ private[sql] case class ParquetRelation2(
maybeSchema: Option[StructType] = None,
maybePartitionSpec: Option[PartitionSpec] = None)(
@transient val sqlContext: SQLContext)
extends CatalystScan
extends BaseRelation
with CatalystScan
with InsertableRelation
with SparkHadoopMapReduceUtil
with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources

import org.apache.spark.annotation.{Experimental, DeveloperApi}
Expand Down Expand Up @@ -90,12 +91,6 @@ trait CreatableRelationProvider {
* existing data is expected to be overwritten by the contents of the DataFrame.
* ErrorIfExists mode means that when saving a DataFrame to a data source,
* if data already exists, an exception is expected to be thrown.
*
* @param sqlContext
* @param mode
* @param parameters
* @param data
* @return
*/
def createRelation(
sqlContext: SQLContext,
Expand Down Expand Up @@ -138,7 +133,7 @@ abstract class BaseRelation {
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*/
@DeveloperApi
trait TableScan extends BaseRelation {
trait TableScan {
def buildScan(): RDD[Row]
}

Expand All @@ -148,7 +143,7 @@ trait TableScan extends BaseRelation {
* containing all of its tuples as Row objects.
*/
@DeveloperApi
trait PrunedScan extends BaseRelation {
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}

Expand All @@ -162,24 +157,10 @@ trait PrunedScan extends BaseRelation {
* as filtering partitions based on a bloom filter.
*/
@DeveloperApi
trait PrunedFilteredScan extends BaseRelation {
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

/**
* ::Experimental::
* An interface for experimenting with a more direct connection to the query planner. Compared to
* [[PrunedFilteredScan]], this operator receives the raw expressions from the
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
* interface is not designed to be binary compatible across releases and thus should only be used
* for experimentation.
*/
@Experimental
trait CatalystScan extends BaseRelation {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}

@DeveloperApi
/**
* ::DeveloperApi::
* A BaseRelation that can be used to insert data into it through the insert method.
Expand All @@ -196,6 +177,20 @@ trait CatalystScan extends BaseRelation {
* If a data source needs to check the actual nullability of a field, it needs to do it in the
* insert method.
*/
trait InsertableRelation extends BaseRelation {
@DeveloperApi
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}

/**
* ::Experimental::
* An interface for experimenting with a more direct connection to the query planner. Compared to
* [[PrunedFilteredScan]], this operator receives the raw expressions from the
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
* interface is NOT designed to be binary compatible across releases and thus should only be used
* for experimentation.
*/
@Experimental
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DDLScanSource extends RelationProvider {
}

case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends TableScan {
extends BaseRelation with TableScan {

override def schema =
StructType(Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class FilteredScanSource extends RelationProvider {
}

case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends PrunedFilteredScan {
extends BaseRelation
with PrunedFilteredScan {

override def schema =
StructType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class PrunedScanSource extends RelationProvider {
}

case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends PrunedScan {
extends BaseRelation
with PrunedScan {

override def schema =
StructType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SimpleScanSource extends RelationProvider {
}

case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends TableScan {
extends BaseRelation with TableScan {

override def schema =
StructType(StructField("i", IntegerType, nullable = false) :: Nil)
Expand All @@ -51,10 +51,11 @@ class AllDataTypesScanSource extends SchemaRelationProvider {
}

case class AllDataTypesScan(
from: Int,
to: Int,
userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
extends TableScan {
from: Int,
to: Int,
userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
extends BaseRelation
with TableScan {

override def schema = userSpecifiedSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ private[hive] case class MetastoreRelation
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
}

object HiveMetastoreTypes {

private[hive] object HiveMetastoreTypes {
protected val ddlParser = new DDLParser(HiveQl.parseSql(_))

def toDataType(metastoreType: String): DataType = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.MetastoreRelation

/**
* :: Experimental ::
* Create table and insert the query result into it.
* @param database the database name of the new relation
* @param tableName the table name of the new relation
Expand All @@ -38,7 +37,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
* @param desc the CreateTableDesc, which may contains serde, storage handler etc.

*/
@Experimental
private[hive]
case class CreateTableAsSelect(
database: String,
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.SQLContext

/**
* :: DeveloperApi ::
*
* Implementation for "describe [extended] table".
*/
@DeveloperApi
private[hive]
case class DescribeHiveTableCommand(
table: MetastoreRelation,
override val output: Seq[Attribute],
Expand Down
Loading