Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2394,10 +2394,13 @@ class Analyzer(
object ResolveProcedures extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_PROCEDURE), ruleId) {
case Call(UnresolvedProcedure(CatalogAndIdentifier(catalog, ident)), args, execute) =>
val procedureCatalog = catalog.asProcedureCatalog
case UnresolvedProcedure(CatalogAndIdentifier(catalog, ident)) =>
if (!catalog.isInstanceOf[ProcedureCatalog]) {
throw QueryCompilationErrors.missingCatalogProceduresAbilityError(catalog)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does catalog.asProcedureCatalog do this check?

}
val procedureCatalog = catalog.asInstanceOf[ProcedureCatalog]
val procedure = load(procedureCatalog, ident)
Call(ResolvedProcedure(procedureCatalog, ident, procedure), args, execute)
ResolvedProcedure(procedureCatalog, ident, procedure)
}

private def load(catalog: ProcedureCatalog, ident: Identifier): UnboundProcedure = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, GlobalTempView, LocalTempView,
PersistedView, PlanWithUnresolvedIdentifier, SchemaEvolution, SchemaTypeEvolution,
UnresolvedAttribute, UnresolvedFunctionName, UnresolvedIdentifier, UnresolvedNamespace}
UnresolvedAttribute, UnresolvedFunctionName, UnresolvedIdentifier, UnresolvedNamespace,
UnresolvedProcedure}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.parser._
Expand Down Expand Up @@ -1349,7 +1350,7 @@ class SparkSqlAstBuilder extends AstBuilder {
override def visitDescribeProcedure(
ctx: DescribeProcedureContext): LogicalPlan = withOrigin(ctx) {
withIdentClause(ctx.identifierReference(), procIdentifier =>
DescribeProcedureCommand(UnresolvedIdentifier(procIdentifier)))
DescribeProcedureCommand(UnresolvedProcedure(procIdentifier)))
}

override def visitCreatePipelineInsertIntoFlow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ package org.apache.spark.sql.execution.command

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.SparkException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedProcedure
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{Identifier, ProcedureCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.procedures.UnboundProcedure
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.connector.catalog.procedures.{ProcedureParameter, UnboundProcedure}
import org.apache.spark.sql.types.{StringType, StructType}

/**
* A command for users to describe a procedure.
Expand All @@ -45,34 +42,63 @@ case class DescribeProcedureCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
child match {
case ResolvedIdentifier(catalog, ident) =>
val procedure = load(catalog.asProcedureCatalog, ident)
describeV2Procedure(procedure)
case ResolvedProcedure(catalog, ident, procedure) =>
describeV2Procedure(procedure.asInstanceOf[UnboundProcedure])
case _ =>
throw SparkException.internalError(s"Invalid procedure identifier: ${child.getClass}")
}
}

private def load(catalog: ProcedureCatalog, ident: Identifier): UnboundProcedure = {
try {
catalog.loadProcedure(ident)
} catch {
case e: Exception if !e.isInstanceOf[SparkThrowable] =>
val nameParts = catalog.name +: ident.asMultipartIdentifier
throw QueryCompilationErrors.failedToLoadRoutineError(nameParts, e)
}
}

private def describeV2Procedure(procedure: UnboundProcedure): Seq[Row] = {
val buffer = new ArrayBuffer[(String, String)]
append(buffer, "Procedure:", procedure.name())
append(buffer, "Description:", procedure.description())

// UnboundProcedure requires binding to retrieve parameters. We try to bind with an empty
// argument list to get the parameters. If the procedure requires arguments, binding might
// fail. In that case, we suppress the exception and just show the procedure metadata
// without parameters.
Copy link
Member

@pan3793 pan3793 Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this is a common case, not rare

try {
val bound = procedure.bind(new StructType())
val params = bound.parameters()
if (params != null && params.nonEmpty) {
val formattedParams = formatProcedureParameters(params)
append(buffer, "Parameters:", formattedParams.head)
formattedParams.tail.foreach(s => append(buffer, "", s))
} else {
append(buffer, "Parameters:", "()")
}
} catch {
case _: Exception =>
// Ignore if binding fails
}

val keys = tabulate(buffer.map(_._1).toSeq)
val values = buffer.map(_._2)
keys.zip(values).map { case (key, value) => Row(s"$key $value") }
}

// This helper is needed because the V2 Procedure API returns an array of ProcedureParameter,
// which differs from the StructType used by internal stored procedures (handled by
// formatParameters).
private def formatProcedureParameters(params: Array[ProcedureParameter]): Seq[String] = {
val modes = tabulate(params.map(_.mode().toString).toSeq)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The impl here is really awkward, can we iterate param just once?

params.map { param =>
  s"${param.mode} ${param.name} ..."
}

val names = tabulate(params.map(_.name()).toSeq)
val dataTypes = tabulate(params.map(_.dataType().sql).toSeq)
val comments = params.map { p =>
if (p.comment() != null) s" '${p.comment()}'" else ""
}
val defaults = params.map { p =>
val defaultVal = if (p.defaultValue() != null) p.defaultValue().getSql else null
if (defaultVal != null) s" DEFAULT $defaultVal" else ""
}
modes zip names zip dataTypes zip defaults zip comments map {
case ((((mode, name), dataType), default), comment) =>
s"$mode $name $dataType$default$comment"
}
}

private def append(buffer: ArrayBuffer[(String, String)], key: String, value: String): Unit = {
buffer += (key -> value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,47 +512,64 @@ class ProcedureSuite extends QueryTest with SharedSparkSession with BeforeAndAft
checkAnswer(
sql("DESC PROCEDURE cat.ns.foo"),
Row("Procedure: sum") ::
Row("Description: sum integers") :: Nil)
Row("Description: sum integers") ::
Row("Parameters: IN in1 INT") ::
Row(" IN in2 INT") :: Nil)

checkAnswer(
// use DESCRIBE instead of DESC
sql("DESCRIBE PROCEDURE cat.ns.foo"),
Row("Procedure: sum") ::
Row("Description: sum integers") :: Nil)
Row("Description: sum integers") ::
Row("Parameters: IN in1 INT") ::
Row(" IN in2 INT") :: Nil)

checkAnswer(
// use default catalog
sql("DESC PROCEDURE ns.foo"),
Row("Procedure: sum") ::
Row("Description: sum integers") :: Nil)
Row("Description: sum integers") ::
Row("Parameters: IN in1 INT") ::
Row(" IN in2 INT") :: Nil)

checkAnswer(
// use multi-part namespace
sql("DESCRIBE PROCEDURE cat.ns.db.abc"),
Row("Procedure: long_sum") ::
Row("Description: sum longs") :: Nil)
Row("Description: sum longs") ::
Row("Parameters: IN in1 BIGINT") ::
Row(" IN in2 BIGINT") :: Nil)

checkAnswer(
// use multi-part namespace with default catalog
sql("DESCRIBE PROCEDURE ns.db.abc"),
Row("Procedure: long_sum") ::
Row("Description: sum longs") :: Nil)
Row("Description: sum longs") ::
Row("Parameters: IN in1 BIGINT") ::
Row(" IN in2 BIGINT") :: Nil)

checkAnswer(
sql("DESC PROCEDURE cat.``.xyz"),
Row("Procedure: complex") ::
Row("Description: complex procedure") :: Nil)
Row("Description: complex procedure") ::
Row("Parameters: IN in1 STRING DEFAULT 'A'") ::
Row(" IN in2 STRING DEFAULT 'B'") ::
Row(" IN in3 INT DEFAULT 1 + 1 - 1") :: Nil)

checkAnswer(
sql("DESC PROCEDURE cat.xxx"),
Row("Procedure: struct_input") ::
Row("Description: struct procedure") :: Nil)
Row("Description: struct procedure") ::
Row("Parameters: IN in1 STRUCT<nested1: INT, nested2: STRING>") ::
Row(" IN in2 STRING ") :: Nil)

checkAnswer(
// check across catalogs
sql("DESC PROCEDURE cat2.ns_1.db_1.foo"),
Row("Procedure: void") ::
Row("Description: void procedure") :: Nil)
Row("Description: void procedure") ::
Row("Parameters: IN in1 STRING") ::
Row(" IN in2 STRING") :: Nil)
}
}

Expand Down