Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ sparkSession <- if (windows_with_hadoop()) {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
}
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
# materialize the catalog implementation
listTables()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are calling sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) is almost every other test files - does the same apply in those other places?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_sparkSQL.R is the only one uses newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE) on the ssc, so the catalog impl spark conf is changed. So ``test_sparkSQL.R` is the only one broken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object SessionCatalog {
* This class must be thread-safe.
*/
class SessionCatalog(
val externalCatalog: ExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
Expand All @@ -70,8 +70,8 @@ class SessionCatalog(
functionRegistry: FunctionRegistry,
conf: SQLConf) {
this(
externalCatalog,
new GlobalTempViewManager("global_temp"),
() => externalCatalog,
() => new GlobalTempViewManager("global_temp"),
functionRegistry,
conf,
new Configuration(),
Expand All @@ -87,6 +87,9 @@ class SessionCatalog(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
}

lazy val externalCatalog = externalCatalogBuilder()
lazy val globalTempViewManager = globalTempViewManagerBuilder()

/** List of temporary views, mapping from table name to their logical plan. */
@GuardedBy("this")
protected val tempViews = new mutable.HashMap[String, LogicalPlan]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ abstract class BaseSessionStateBuilder(
*/
protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
session.sharedState.externalCatalog,
session.sharedState.globalTempViewManager,
() => session.sharedState.externalCatalog,
() => session.sharedState.globalTempViewManager,
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
// We should be able to run Spark jobs without Hive client.
assert(spark.sparkContext.range(0, 1).count() === 1)

// We should be able to use Spark SQL if no table references.
assert(spark.sql("select 1 + 1").count() === 1)
assert(spark.range(0, 1).count() === 1)

// We should be able to use fs
val path = Utils.createTempDir()
path.delete()
try {
spark.range(0, 1).write.parquet(path.getAbsolutePath)
assert(spark.read.parquet(path.getAbsolutePath).count() === 1)
} finally {
Utils.deleteRecursively(path)
}

// Make sure that we are not using the local derby metastore.
val exceptionString = Utils.exceptionString(intercept[AnalysisException] {
spark.sql("show tables")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ import org.apache.spark.sql.types.{DecimalType, DoubleType}


private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
externalCatalogBuilder: () => HiveExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
externalCatalogBuilder,
globalTempViewManagerBuilder,
functionRegistry,
conf,
hadoopConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
* Create a Hive aware resource loader.
*/
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
val client: HiveClient = externalCatalog.client
new HiveSessionResourceLoader(session, client)
new HiveSessionResourceLoader(session, () => externalCatalog.client)
}

/**
* Create a [[HiveSessionCatalog]].
*/
override protected lazy val catalog: HiveSessionCatalog = {
val catalog = new HiveSessionCatalog(
externalCatalog,
session.sharedState.globalTempViewManager,
() => externalCatalog,
() => session.sharedState.globalTempViewManager,
new HiveMetastoreCatalog(session),
functionRegistry,
conf,
Expand Down Expand Up @@ -105,8 +104,9 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session

class HiveSessionResourceLoader(
session: SparkSession,
client: HiveClient)
clientBuilder: () => HiveClient)
extends SessionResourceLoader(session) {
private lazy val client = clientBuilder()
override def addJar(path: String): Unit = {
client.addJar(path)
super.addJar(path)
Expand Down