Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 13 additions & 35 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive

import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.io.{BufferedReader, InputStreamReader, PrintStream}
import java.sql.{Date, Timestamp}

import scala.collection.JavaConversions._
Expand All @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
Expand All @@ -42,28 +43,6 @@ import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTable
import org.apache.spark.sql.sources.DataSourceStrategy
import org.apache.spark.sql.types._

/**
* DEPRECATED: Use HiveContext instead.
*/
@deprecated("""
Use HiveContext instead. It will still create a local metastore if one is not specified.
However, note that the default directory is ./metastore_db, not ./metastore
""", "1.1")
class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {

lazy val metastorePath = new File("metastore").getCanonicalPath
lazy val warehousePath: String = new File("warehouse").getCanonicalPath

/** Sets up the system initially or after a RESET command */
protected def configure() {
setConf("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
setConf("hive.metastore.warehouse.dir", warehousePath)
}

configure() // Must be called before initializing the catalog below.
}

/**
* An instance of the Spark SQL execution engine that integrates with data stored in Hive.
* Configuration for Hive is read from hive-site.xml on the classpath.
Expand All @@ -80,7 +59,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe.
*/
private[spark] def convertMetastoreParquet: Boolean =
protected[sql] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
Expand All @@ -97,14 +76,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
}

@deprecated("hiveql() is deprecated as the sql function now parses using HiveQL by default. " +
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))

@deprecated("hql() is deprecated as the sql function now parses using HiveQL by default. " +
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)

/**
* Creates a table using the schema of the given class.
*
Expand All @@ -116,6 +87,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
*/
def refreshTable(tableName: String): Unit = {
// TODO: Database support...
catalog.refreshTable("default", tableName)
Expand All @@ -133,6 +110,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* Right now, it only supports Hive tables and it only updates the size of a Hive table
* in the Hive metastore.
*/
@Experimental
def analyze(tableName: String) {
val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))

Expand Down Expand Up @@ -289,7 +267,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
results
}


/**
* Execute the command using Hive and return the results as a sequence. Each element
* in the sequence is one row.
Expand Down Expand Up @@ -345,7 +322,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}

@transient
val hivePlanner = new SparkPlanner with HiveStrategies {
private val hivePlanner = new SparkPlanner with HiveStrategies {
val hiveContext = self

override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
Expand Down Expand Up @@ -410,7 +387,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
}

object HiveContext {

private object HiveContext {
protected val primitiveTypes =
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
ShortType, DateType, TimestampType, BinaryType)
Expand Down