Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,100 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
}
}

/**
* Registers this Dataset as a temporary table using the given name. The lifetime of this
* temporary table is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 3.4.0
*/
@deprecated("Use createOrReplaceTempView(viewName) instead.", "3.4.0")
def registerTempTable(tableName: String): Unit = {
createOrReplaceTempView(tableName)
}

/**
* Creates a local temporary view using the given name. The lifetime of this temporary view is
* tied to the [[SparkSession]] that was used to create this Dataset.
*
* Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
* created it, i.e. it will be automatically dropped when the session terminates. It's not tied
* to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
*
* @throws AnalysisException
* if the view name is invalid or already exists
*
* @group basic
* @since 3.4.0
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = {
buildAndExecuteTempView(viewName, replace = false, global = false)
}

/**
* Creates a local temporary view using the given name. The lifetime of this temporary view is
* tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 3.4.0
*/
def createOrReplaceTempView(viewName: String): Unit = {
buildAndExecuteTempView(viewName, replace = true, global = false)
}

/**
* Creates a global temporary view using the given name. The lifetime of this temporary view is
* tied to this Spark application.
*
* Global temporary view is cross-session. Its lifetime is the lifetime of the Spark
* application,
* i.e. it will be automatically dropped when the application terminates. It's tied to a system
* preserved database `global_temp`, and we must use the qualified name to refer a global temp
* view, e.g. `SELECT * FROM global_temp.view1`.
*
* @throws AnalysisException
* if the view name is invalid or already exists
*
* @group basic
* @since 3.4.0
*/
@throws[AnalysisException]
def createGlobalTempView(viewName: String): Unit = {
buildAndExecuteTempView(viewName, replace = false, global = true)
}

/**
* Creates or replaces a global temporary view using the given name. The lifetime of this
* temporary view is tied to this Spark application.
*
* Global temporary view is cross-session. Its lifetime is the lifetime of the Spark
* application,
* i.e. it will be automatically dropped when the application terminates. It's tied to a system
* preserved database `global_temp`, and we must use the qualified name to refer a global temp
* view, e.g. `SELECT * FROM global_temp.view1`.
*
* @group basic
* @since 3.4.0
*/
def createOrReplaceGlobalTempView(viewName: String): Unit = {
buildAndExecuteTempView(viewName, replace = true, global = true)
}

private def buildAndExecuteTempView(
viewName: String,
replace: Boolean,
global: Boolean): Unit = {
val command = session.newCommand { builder =>
builder.getCreateDataframeViewBuilder
.setInput(plan.getRoot)
.setName(viewName)
.setIsGlobal(global)
.setReplace(replace)
}
session.execute(command)
}

/**
* Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column
* name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class SparkSession(
new Dataset[T](this, plan)
}

private[sql] def newCommand[T](f: proto.Command.Builder => Unit): proto.Command = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to use this method else where? Otherwise it seems a bit much to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for any other command related API, we can use it.

val builder = proto.Command.newBuilder()
f(builder)
builder.build()
}

private[sql] def analyze(
plan: proto.Plan,
mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,15 @@ class ClientE2ETestSuite extends RemoteSparkSession {
val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
}

test("test temp view") {
spark.range(100).createTempView("test1")
assert(spark.sql("SELECT * FROM test1").count() == 100)
spark.range(1000).createOrReplaceTempView("test1")
assert(spark.sql("SELECT * FROM test1").count() == 1000)
spark.range(100).createGlobalTempView("view1")
assert(spark.sql("SELECT * FROM global_temp.view1").count() == 100)
spark.range(1000).createOrReplaceGlobalTempView("view1")
assert(spark.sql("SELECT * FROM global_temp.view1").count() == 1000)
}
}