Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,48 @@
package org.apache.spark.sql.json

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.sources._

private[sql] class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider {

/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio)(sqlContext)
JSONRelation(fileName, samplingRatio, None)(sqlContext)
}

/** Returns a new base relation with the given schema and parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext)
}
}

private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
private[sql] case class JSONRelation(
fileName: String,
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
extends TableScan {

private def baseRDD = sqlContext.sparkContext.textFile(fileName)

override val schema =
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)
override val schema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)))

override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
Expand Down
138 changes: 125 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.sql.sources

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.util.Utils

import scala.language.implicitConversions
import scala.util.parsing.combinator.lexical.StdLexical
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical

Expand All @@ -44,6 +43,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
}
}

def parseType(input: String): DataType = {
phrase(dataType)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x =>
sys.error(s"Unsupported dataType: $x")
}
}

protected case class Keyword(str: String)

protected implicit def asParser(k: Keyword): Parser[String] =
Expand All @@ -55,6 +62,24 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")

// Data types.
protected val STRING = Keyword("STRING")
protected val BINARY = Keyword("BINARY")
protected val BOOLEAN = Keyword("BOOLEAN")
protected val TINYINT = Keyword("TINYINT")
protected val SMALLINT = Keyword("SMALLINT")
protected val INT = Keyword("INT")
protected val BIGINT = Keyword("BIGINT")
protected val FLOAT = Keyword("FLOAT")
protected val DOUBLE = Keyword("DOUBLE")
protected val DECIMAL = Keyword("DECIMAL")
protected val DATE = Keyword("DATE")
protected val TIMESTAMP = Keyword("TIMESTAMP")
protected val VARCHAR = Keyword("VARCHAR")
protected val ARRAY = Keyword("ARRAY")
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")

// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
this.getClass
Expand All @@ -67,26 +92,92 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* CREATE TEMPORARY TABLE avroTable
* `CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ provider ~ opts =>
CreateTableUsing(tableName, provider, opts)
(
CREATE ~ TEMPORARY ~ TABLE ~> ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
}
)

protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"

protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }

protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}

protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) }

protected lazy val column: Parser[StructField] =
ident ~ dataType ^^ { case columnName ~ typ =>
StructField(columnName, typ)
}

protected lazy val primitiveType: Parser[DataType] =
STRING ^^^ StringType |
BINARY ^^^ BinaryType |
BOOLEAN ^^^ BooleanType |
TINYINT ^^^ ByteType |
SMALLINT ^^^ ShortType |
INT ^^^ IntegerType |
BIGINT ^^^ LongType |
FLOAT ^^^ FloatType |
DOUBLE ^^^ DoubleType |
fixedDecimalType | // decimal with precision/scale
DECIMAL ^^^ DecimalType.Unlimited | // decimal with no precision/scale
DATE ^^^ DateType |
TIMESTAMP ^^^ TimestampType |
VARCHAR ~ "(" ~ numericLit ~ ")" ^^^ StringType

protected lazy val fixedDecimalType: Parser[DataType] =
(DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ {
case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
}

protected lazy val arrayType: Parser[DataType] =
ARRAY ~> "<" ~> dataType <~ ">" ^^ {
case tpe => ArrayType(tpe)
}

protected lazy val mapType: Parser[DataType] =
MAP ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
case t1 ~ _ ~ t2 => MapType(t1, t2)
}

protected lazy val structField: Parser[StructField] =
ident ~ ":" ~ dataType ^^ {
case fieldName ~ _ ~ tpe => StructField(fieldName, tpe, nullable = true)
}

protected lazy val structType: Parser[DataType] =
(STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
case fields => new StructType(fields)
}) |
(STRUCT ~> "<>" ^^ {
case fields => new StructType(Nil)
})

private[sql] lazy val dataType: Parser[DataType] =
arrayType |
mapType |
structType |
primitiveType
}

private[sql] case class CreateTableUsing(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to move this class CreateTableUsing to org.apache.spark.sql.sources.commands.scala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will move it

tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

Expand All @@ -99,8 +190,29 @@ private[sql] case class CreateTableUsing(
sys.error(s"Failed to load class for data source: $provider")
}
}
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))

val relation = userSpecifiedSchema match {
case Some(schema: StructType) => {
clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
}
}
case None => {
clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
}
}
}

sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType}
import org.apache.spark.sql.{Row, SQLContext, StructType}
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}

/**
Expand All @@ -44,6 +44,33 @@ trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with
* 1. USING clause: to specify the implemented SchemaRelationProvider
* 2. User defined schema: users can define schema optionally when create table
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*/
@DeveloperApi
trait SchemaRelationProvider {
/**
* Returns a new base relation with the given parameters and user defined schema.
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation
}

/**
* ::DeveloperApi::
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
Expand Down
Loading