Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -142,29 +142,6 @@ CURRENT_BLOCK=$BLOCK_BUILD

{
HIVE_BUILD_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver"
HIVE_12_BUILD_ARGS="$HIVE_BUILD_ARGS -Phive-0.12.0"

# First build with Hive 0.12.0 to ensure patches do not break the Hive 0.12.0 build
echo "[info] Compile with Hive 0.12.0"
[ -d "lib_managed" ] && rm -rf lib_managed
echo "[info] Building Spark with these arguments: $HIVE_12_BUILD_ARGS"

if [ "${AMPLAB_JENKINS_BUILD_TOOL}" == "maven" ]; then
build/mvn $HIVE_12_BUILD_ARGS clean package -DskipTests
else
# NOTE: echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r, etc
# to quit or retry. This echo is there to make it not block.
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
# single argument!
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
echo -e "q\n" \
| build/sbt $HIVE_12_BUILD_ARGS clean hive/compile hive-thriftserver/compile \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
fi

# Then build with default Hive version (0.13.1) because tests are based on this version
echo "[info] Compile with Hive 0.13.1"
[ -d "lib_managed" ] && rm -rf lib_managed
echo "[info] Building Spark with these arguments: $HIVE_BUILD_ARGS"
Expand Down
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Vector.numActives")
) ++ Seq(
// Execution should never be included as its always internal.
MimaBuild.excludeSparkPackage("sql.execution"),
// This `protected[sql]` method was removed in 1.3.1
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.sql.SQLContext.checkAnalysis"),
Expand Down
9 changes: 8 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ object SparkBuild extends PomBuild {
* Usage: `build/sbt sparkShell`
*/
val sparkShell = taskKey[Unit]("start a spark-shell.")
val sparkSql = taskKey[Unit]("starts the spark sql CLI.")

enable(Seq(
connectInput in run := true,
Expand All @@ -203,6 +204,12 @@ object SparkBuild extends PomBuild {

sparkShell := {
(runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value
},

javaOptions in Compile += "-Dspark.master=local",

sparkSql := {
(runMain in Compile).toTask(" org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver").value
}
))(assembly)

Expand Down Expand Up @@ -497,7 +504,7 @@ object TestSettings {
// Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
// launched by the tests have access to the correct test-time classpath.
envVars in Test ++= Map(
"SPARK_DIST_CLASSPATH" ->
"SPARK_DIST_CLASSPATH" ->
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,6 @@ case class InsertIntoTable(
}
}

case class CreateTableAsSelect[T](
databaseName: Option[String],
tableName: String,
child: LogicalPlan,
allowExisting: Boolean,
desc: Option[T] = None) extends UnaryNode {
override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}

/**
* A container for holding named common table expressions (CTEs) and a query plan.
* This operator will be removed during analysis and the relations will be substituted into child.
Expand All @@ -184,10 +174,10 @@ case class WriteToFile(
}

/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
* False means sorting only apply within the partition.
* @param child Child logical plan
* @param child Child logical plan
*/
case class Sort(
order: Seq[SortOrder],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute

/**
* A logical node that represents a non-query command to be executed by the system. For example,
* commands can be used by parsers to represent DDL operations.
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty
}
trait Command
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.scalatest.FunSuite

private[sql] case class TestCommand(cmd: String) extends Command
private[sql] case class TestCommand(cmd: String) extends LogicalPlan with Command {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}

private[sql] class SuperLongKeywordTestParser extends AbstractSparkSQLParser {
protected val EXECUTE = Keyword("THISISASUPERLONGKEYWORDTEST")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ class DataFrame private[sql](
// happen right away to let these side effects take place eagerly.
case _: Command |
_: InsertIntoTable |
_: CreateTableAsSelect[_] |
_: CreateTableUsingAsSelect |
_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
Expand Down
11 changes: 8 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import org.apache.spark.{Partition, SparkContext}
* spark-sql> SELECT * FROM src LIMIT 1;
*
*-- Exception will be thrown and switch to dialect
*-- "sql" (for SQLContext) or
*-- "sql" (for SQLContext) or
*-- "hiveql" (for HiveContext)
* }}}
*/
Expand Down Expand Up @@ -107,7 +107,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* @return Spark SQL configuration
*/
protected[sql] def conf = tlSession.get().conf
protected[sql] def conf = currentSession().conf

/**
* Set Spark SQL configuration properties.
Expand Down Expand Up @@ -1197,13 +1197,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
|${stringOrError(executedPlan)}
""".stripMargin.trim

override def toString: String =
override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
Expand All @@ -1212,6 +1216,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
self: Product =>

override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
def run(sqlContext: SQLContext): Seq[Row]
}

Expand Down
16 changes: 11 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRel
*/
private[sql] case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends Command {
override val output = Seq(
isExtended: Boolean) extends LogicalPlan with Command {

override def children: Seq[LogicalPlan] = Seq.empty
override val output: Seq[Attribute] = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
Expand All @@ -292,7 +294,11 @@ private[sql] case class CreateTableUsing(
temporary: Boolean,
options: Map[String, String],
allowExisting: Boolean,
managedIfNoPath: Boolean) extends Command
managedIfNoPath: Boolean) extends LogicalPlan with Command {

override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}

/**
* A node used to support CTAS statements and saveAsTable for the data source API.
Expand All @@ -318,7 +324,7 @@ private[sql] case class CreateTempTableUsing(
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
override def run(sqlContext: SQLContext): Seq[Row] = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
Expand All @@ -333,7 +339,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
override def run(sqlContext: SQLContext): Seq[Row] = {
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
sqlContext.registerDataFrameAsTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket

import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.hive.{HiveContext, HiveShim}
import org.apache.spark.util.Utils

private[hive] object SparkSQLCLIDriver {
Expand Down Expand Up @@ -74,7 +74,12 @@ private[hive] object SparkSQLCLIDriver {
System.exit(1)
}

val sessionState = new CliSessionState(new HiveConf(classOf[SessionState]))
val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if I understand this correctly, you're creating just a "dummy" local metastore that won't actually be used; this is just to keep the Hive libraries happy, right?

HiveContext.newTemporaryConfiguration().foreach {
case (key, value) => cliConf.set(key, value)
}
val sessionState = new CliSessionState(cliConf)

sessionState.in = System.in
try {
Expand All @@ -91,10 +96,14 @@ private[hive] object SparkSQLCLIDriver {

// Set all properties specified via command line.
val conf: HiveConf = sessionState.getConf
sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] =>
conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.getOverriddenConfigurations.put(
item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.cmdProperties.entrySet().foreach { item =>
val key = item.getKey.asInstanceOf[String]
val value = item.getValue.asInstanceOf[String]
// We do not propagate metastore options to the execution copy of hive.
if (key != "javax.jdo.option.ConnectionURL") {
conf.set(key, value)
sessionState.getOverriddenConfigurations.put(key, value)
}
}

SessionState.start(sessionState)
Expand Down Expand Up @@ -138,8 +147,9 @@ private[hive] object SparkSQLCLIDriver {
case e: UnsupportedEncodingException => System.exit(3)
}

// use the specified database if specified
cli.processSelectDatabase(sessionState);
if (sessionState.database != null) {
SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}")
}

// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.PrintStream

import scala.collection.JavaConversions._

import org.apache.spark.scheduler.StatsReportListener
Expand All @@ -39,7 +41,6 @@ private[hive] object SparkSQLEnv extends Logging {

sparkConf
.setAppName(s"SparkSQL::${Utils.localHostName()}")
.set("spark.sql.hive.version", HiveShim.version)
.set(
"spark.serializer",
maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
Expand All @@ -51,6 +52,12 @@ private[hive] object SparkSQLEnv extends Logging {
sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)

hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is UTF-8 the right thing here? It's not the default encoding on all platforms. Maybe omit that argument (which probably means using the platform default, even though the javadocs don't explicitly mention that)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))

hiveContext.setConf("spark.sql.hive.version", HiveShim.version)

if (log.isDebugEnabled) {
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,17 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {

// It has a bug and it has been fixed by
// https://issues.apache.org/jira/browse/HIVE-7673 (in Hive 0.14 and trunk).
"input46"
"input46",

// These tests were broken by the hive client isolation PR.
"part_inherit_tbl_props",
"part_inherit_tbl_props_with_star",

"nullformatCTAS", // SPARK-7411: need to finish CTAS parser

// The isolated classloader seemed to make some of our test reset mechanisms less robust.
"combine1", // This test changes compression settings in a way that breaks all subsequent tests.
"load_dyn_part14.*" // These work alone but fail when run with other tests...
) ++ HiveShim.compatibilityBlackList

/**
Expand Down
Loading