Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48782][SQL] Add support for executing procedures in catalogs #47943

Closed
wants to merge 4 commits into from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds support for executing procedures in catalogs.

Why are the changes needed?

These changes are needed per discussed and voted SPIP tracked in SPARK-44167.

Does this PR introduce any user-facing change?

Yes. This PR adds CALL commands.

How was this patch tested?

This PR comes with tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@@ -92,7 +93,38 @@ class QueryExecution(
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
tracker.setAnalyzed(plan)
plan

mode match {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part will have to evolve. There are 3 options for us to consider.

  1. Add a special type of commands that must be executed during the analysis (i.e. generalize this PR).
  2. Add a special mix-in interface for procedures that know the type of the last result set before the execution. All other procedures will not output anything if invoked via spark.sql.
  3. Migrate to qe.commandExecuted instead of qe.analyzed everywhere as we will know the output only after executing the procedure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, any thoughts?

Copy link
Contributor

@cloud-fan cloud-fan Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer a special mix-in interface that indicates the command returns multiple result sets. It should have a method to return multiple LogicalPlan/DataFrame. SparkSession#sql will recognize this special interface and get the last DataFrame.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, do you mean a special mix-in connector interface or logical plan interface?
When will the execution happen?

/**
* The logical plan for the CALL command.
*/
case class Call(procedure: LogicalPlan, args: Seq[Expression]) extends UnaryCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we extend UnaryRunnableCommand to avoid creating a corresponding physical node?

@dongjoon-hyun
Copy link
Member

Gentle ping, @aokolnychyi .

Although we still have time for feature freeze, I'm wondering if you want to deliver this via Apache Spark 4.0.0-preview2 RC1 (next Monday).

@aokolnychyi
Copy link
Contributor Author

Will update tomorrow. Thanks for pinging, @dongjoon-hyun!

@dongjoon-hyun
Copy link
Member

Thank you, @aokolnychyi .

@@ -298,6 +298,10 @@ statement
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS options=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference #dropIndex
| CALL identifierReference
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can split this into a separate PR, if needed.


private def argMetadata(byName: Boolean): Metadata = {
new MetadataBuilder()
.putBoolean(ProcedureParameter.BY_NAME_METADATA_KEY, byName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we omit this metadata if the arg is not by name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, skipped.

/**
* The physical plan of the CALL statement used in EXPLAIN.
*/
case class CallExec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this

case class ExplainOnlySparkPlan(toExplain: LogicalPlan)... {
  def simpleString = toExplain.simpleString
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that, updated.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 492d1b1 Sep 19, 2024
@aokolnychyi
Copy link
Contributor Author

Thanks, @cloud-fan @dongjoon-hyun!

@dongjoon-hyun
Copy link
Member

Nice. Thank you, @aokolnychyi and @cloud-fan .

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome! Late LGTM

Comment on lines +22 to +24
case class MultiResult(children: Seq[LogicalPlan]) extends LogicalPlan {

override def output: Seq[Attribute] = children.lastOption.map(_.output).getOrElse(Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments for this class and the output here (which uses the last result set's schema)

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute

case class MultiResultExec(children: Seq[SparkPlan]) extends SparkPlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for docstring

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up with some docs.

}

private def validateParameterModes(procedure: BoundProcedure): Unit = {
procedure.parameters.find(_.mode != ProcedureParameter.Mode.IN).foreach { param =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to support more parameter modes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, yes. There is no active work at the moment, as far as I know.

MaxGekk pushed a commit that referenced this pull request Sep 21, 2024
…test in ProcedureSuite

### What changes were proposed in this pull request?

This PR is a followup of #47943 that enables ANSI for malformed input test in ProcedureSuite.

### Why are the changes needed?

The specific test fails with ANSI mode disabled
https://github.com/apache/spark/actions/runs/10951615244/job/30408963913

```
- malformed input to implicit cast *** FAILED *** (4 milliseconds)
  Expected exception org.apache.spark.SparkNumberFormatException to be thrown, but no exception was thrown (ProcedureSuite.scala:264)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
...
```

The test depends on `sum`'s failure so this PR simply enables ANSI mode for that specific test.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually ran with ANSI mode off.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48193 from HyukjinKwon/SPARK-48782-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?

This PR adds support for executing procedures in catalogs.

### Why are the changes needed?

These changes are needed per [discussed and voted](https://lists.apache.org/thread/w586jr53fxwk4pt9m94b413xyjr1v25m) SPIP tracked in [SPARK-44167](https://issues.apache.org/jira/browse/SPARK-44167).

### Does this PR introduce _any_ user-facing change?

Yes. This PR adds CALL commands.

### How was this patch tested?

This PR comes with tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47943 from aokolnychyi/spark-48782.

Authored-by: Anton Okolnychyi <aokolnychyi@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…test in ProcedureSuite

### What changes were proposed in this pull request?

This PR is a followup of apache#47943 that enables ANSI for malformed input test in ProcedureSuite.

### Why are the changes needed?

The specific test fails with ANSI mode disabled
https://github.com/apache/spark/actions/runs/10951615244/job/30408963913

```
- malformed input to implicit cast *** FAILED *** (4 milliseconds)
  Expected exception org.apache.spark.SparkNumberFormatException to be thrown, but no exception was thrown (ProcedureSuite.scala:264)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
...
```

The test depends on `sum`'s failure so this PR simply enables ANSI mode for that specific test.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually ran with ANSI mode off.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48193 from HyukjinKwon/SPARK-48782-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
### What changes were proposed in this pull request?

This PR adds support for executing procedures in catalogs.

### Why are the changes needed?

These changes are needed per [discussed and voted](https://lists.apache.org/thread/w586jr53fxwk4pt9m94b413xyjr1v25m) SPIP tracked in [SPARK-44167](https://issues.apache.org/jira/browse/SPARK-44167).

### Does this PR introduce _any_ user-facing change?

Yes. This PR adds CALL commands.

### How was this patch tested?

This PR comes with tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47943 from aokolnychyi/spark-48782.

Authored-by: Anton Okolnychyi <aokolnychyi@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…test in ProcedureSuite

### What changes were proposed in this pull request?

This PR is a followup of apache#47943 that enables ANSI for malformed input test in ProcedureSuite.

### Why are the changes needed?

The specific test fails with ANSI mode disabled
https://github.com/apache/spark/actions/runs/10951615244/job/30408963913

```
- malformed input to implicit cast *** FAILED *** (4 milliseconds)
  Expected exception org.apache.spark.SparkNumberFormatException to be thrown, but no exception was thrown (ProcedureSuite.scala:264)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
...
```

The test depends on `sum`'s failure so this PR simply enables ANSI mode for that specific test.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually ran with ANSI mode off.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48193 from HyukjinKwon/SPARK-48782-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants