Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticCostFun.this"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution"),
// Parquet support is considered private.
excludePackage("org.apache.spark.sql.parquet"),
// The old JSON RDD is removed in favor of streaming Jackson
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
Expand Down Expand Up @@ -155,7 +153,27 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
// SPARK-9763 Minimize exposure of internal SQL classes
excludePackage("org.apache.spark.sql.parquet"),
excludePackage("org.apache.spark.sql.json"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$JDBCConversion"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$DriverWrapper"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DefaultSource"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
) ++ Seq(
// SPARK-4751 Dynamic allocation for standalone mode
ProblemFilters.exclude[MissingMethodProblem](
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
org.apache.spark.sql.jdbc.DefaultSource
org.apache.spark.sql.json.DefaultSource
org.apache.spark.sql.parquet.DefaultSource
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
org.apache.spark.sql.execution.datasources.json.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.DefaultSource
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.JSONRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.HadoopFsRelation


Expand Down Expand Up @@ -264,15 +264,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {

// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JDBCWriteDetails.schemaString(df, url)
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
}
} finally {
conn.close()
}

JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
JdbcUtils.saveTable(df, url, table, connectionProperties)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.util.Utils

private[sql] object SQLExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.metric.{IntSQLMetric, LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.DataType

object SparkPlan {
Expand Down Expand Up @@ -98,12 +98,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics

/**
* Return a IntSQLMetric according to the name.
*/
private[sql] def intMetric(name: String): IntSQLMetric =
metrics(name).asInstanceOf[IntSQLMetric]

/**
* Return a LongSQLMetric according to the name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.language.implicitConversions
import scala.util.matching.Regex

import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{TableIdentifier, AbstractSparkSQLParser}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._


/**
* A parser for foreign DDL commands.
*/
class DDLParser(parseQuery: String => LogicalPlan)
extends AbstractSparkSQLParser with DataTypeParser with Logging {

def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parse(input)
} catch {
case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => parseQuery(input)
case x: Throwable => throw x
}
}

// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
protected val TEMPORARY = Keyword("TEMPORARY")
protected val TABLE = Keyword("TABLE")
protected val IF = Keyword("IF")
protected val NOT = Keyword("NOT")
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
protected val REFRESH = Keyword("REFRESH")

protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable

protected def start: Parser[LogicalPlan] = ddl

/**
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] = {
// TODO: Support database.table.
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}

val options = opts.getOrElse(Map.empty[String, String])
if (query.isDefined) {
if (columns.isDefined) {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}

val queryPlan = parseQuery(query.get)
CreateTableUsingAsSelect(tableName,
provider,
temp.isDefined,
Array.empty[String],
mode,
options,
queryPlan)
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(
tableName,
userSpecifiedSchema,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}
}
}

protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"

/*
* describe [extended] table avroTable
* This will display all columns of table `avroTable` includes column_name,column_type,comment
*/
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
case e ~ db ~ tbl =>
val tblIdentifier = db match {
case Some(dbName) =>
Seq(dbName, tbl)
case None =>
Seq(tbl)
}
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
}

protected lazy val refreshTable: Parser[LogicalPlan] =
REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
case maybeDatabaseName ~ tableName =>
RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
}

protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }

protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}

override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
s"identifier matching regex $regex", {
case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
}
)

protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
case name => name
}

protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
case parts => parts.mkString(".")
}

protected lazy val pair: Parser[(String, String)] =
optionName ~ stringLit ^^ { case k ~ v => (k, v) }

protected lazy val column: Parser[StructField] =
ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
val meta = cm match {
case Some(comment) =>
new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
case None => Metadata.empty
}

StructField(columnName, typ, nullable = true, meta)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.util.Properties

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCRelation, JDBCPartitioningInfo, DriverRegistry}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}


class DefaultSource extends RelationProvider with DataSourceRegister {

override def shortName(): String = "jdbc"

/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
val driver = parameters.getOrElse("driver", null)
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
val partitionColumn = parameters.getOrElse("partitionColumn", null)
val lowerBound = parameters.getOrElse("lowerBound", null)
val upperBound = parameters.getOrElse("upperBound", null)
val numPartitions = parameters.getOrElse("numPartitions", null)

if (driver != null) DriverRegistry.register(driver)

if (partitionColumn != null
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}

val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
partitionColumn,
lowerBound.toLong,
upperBound.toLong,
numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(url, table, parts, properties)(sqlContext)
}
}
Loading