Skip to content

Commit

Permalink
[SPARK-50031][SQL] Add the TryParseUrl expression
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds `try_parse_url` expression that sets `failOnError` to false by default.

### Why are the changes needed?
INVALID_URL contains suggested fix for turning off ANSI mode. Now that in Spark 4.0.0 we have moved to ANSI mode on by default, we want to keep suggestions of this kind to the minimum. There exist implementations of `try_*` functions which provide safe way to get behavior as for ANSI mode off and suggestions of this kind should be sufficient.

In this case, try expressions were missing so new expressions were added to patch up the missing implementations.

### Does this PR introduce _any_ user-facing change?
Yes, new expression added.

### How was this patch tested?
Tests added.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48500 from jovanm-db/invalidUrl.

Authored-by: Jovan Markovic <jovan.markovic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
jovanm-db authored and MaxGekk committed Oct 22, 2024
1 parent 3ec28ff commit abc4986
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ When ANSI mode is on, it throws exceptions for invalid operations. You can use t
- `try_avg`: identical to the function `avg`, except that it returns `NULL` result instead of throwing an exception on decimal/interval value overflow.
- `try_element_at`: identical to the function `element_at`, except that it returns `NULL` result instead of throwing an exception on array's index out of bound.
- `try_to_timestamp`: identical to the function `to_timestamp`, except that it returns `NULL` result instead of throwing an exception on string parsing error.
- `try_parse_url`: identical to the function `parse_url`, except that it returns `NULL` result instead of throwing an exception on url parsing error.

### SQL Keywords (optional, disabled by default)

Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ URL Functions
:toctree: api/

parse_url
try_parse_url
url_decode
url_encode
try_url_decode
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2819,6 +2819,18 @@ def parse_url(
parse_url.__doc__ = pysparkfuncs.parse_url.__doc__


def try_parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
if key is not None:
return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("try_parse_url", url, partToExtract)


try_parse_url.__doc__ = pysparkfuncs.try_parse_url.__doc__


def printf(format: "ColumnOrName", *cols: "ColumnOrName") -> Column:
return _invoke_function("printf", _to_col(format), *[_to_col(c) for c in cols])

Expand Down
116 changes: 116 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13090,6 +13090,122 @@ def substr(
return _invoke_function_over_columns("substr", str, pos)


@_try_remote_functions
def try_parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
"""
This is a special version of `parse_url` that performs the same operation, but returns a
NULL value instead of raising an error if the parsing cannot be performed.

.. versionadded:: 4.0.0

Parameters
----------
url : :class:`~pyspark.sql.Column` or str
A column of strings, each representing a URL.
partToExtract : :class:`~pyspark.sql.Column` or str
A column of strings, each representing the part to extract from the URL.
key : :class:`~pyspark.sql.Column` or str, optional
A column of strings, each representing the key of a query parameter in the URL.

Returns
-------
:class:`~pyspark.sql.Column`
A new column of strings, each representing the value of the extracted part from the URL.

Examples
--------
Example 1: Extracting the query part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "QUERY")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| query=1|
+------------------------+

Example 2: Extracting the value of a specific query parameter from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "QUERY", "query")],
... ["url", "part", "key"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part, df.key)).show()
+-----------------------------+
|try_parse_url(url, part, key)|
+-----------------------------+
| 1|
+-----------------------------+

Example 3: Extracting the protocol part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "PROTOCOL")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| https|
+------------------------+

Example 4: Extracting the host part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "HOST")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| spark.apache.org|
+------------------------+

Example 5: Extracting the path part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "PATH")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| /path|
+------------------------+

Example 6: Invalid URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("inva lid://spark.apache.org/path?query=1", "QUERY", "query")],
... ["url", "part", "key"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part, df.key)).show()
+-----------------------------+
|try_parse_url(url, part, key)|
+-----------------------------+
| NULL|
+-----------------------------+
"""
if key is not None:
return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("try_parse_url", url, partToExtract)


@_try_remote_functions
def parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,20 @@ def test_rand_functions(self):
rndn2 = df.select("key", F.randn(0)).collect()
self.assertEqual(sorted(rndn1), sorted(rndn2))

def test_try_parse_url(self):
df = self.spark.createDataFrame(
[("https://spark.apache.org/path?query=1", "QUERY", "query")],
["url", "part", "key"],
)
actual = df.select(F.try_parse_url(df.url, df.part, df.key)).collect()
self.assertEqual(actual, [Row("1")])
df = self.spark.createDataFrame(
[("inva lid://spark.apache.org/path?query=1", "QUERY", "query")],
["url", "part", "key"],
)
actual = df.select(F.try_parse_url(df.url, df.part, df.key)).collect()
self.assertEqual(actual, [Row(None)])

def test_string_functions(self):
string_functions = [
"upper",
Expand Down
18 changes: 18 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4678,6 +4678,24 @@ object functions {
*/
def substr(str: Column, pos: Column): Column = Column.fn("substr", str, pos)

/**
* Extracts a part from a URL.
*
* @group url_funcs
* @since 4.0.0
*/
def try_parse_url(url: Column, partToExtract: Column, key: Column): Column =
Column.fn("try_parse_url", url, partToExtract, key)

/**
* Extracts a part from a URL.
*
* @group url_funcs
* @since 4.0.0
*/
def try_parse_url(url: Column, partToExtract: Column): Column =
Column.fn("try_parse_url", url, partToExtract)

/**
* Extracts a part from a URL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ object FunctionRegistry {
expression[UrlEncode]("url_encode"),
expression[UrlDecode]("url_decode"),
expression[ParseUrl]("parse_url"),
expression[TryParseUrl]("try_parse_url"),

// datetime functions
expression[AddMonths]("add_months"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,36 @@ object ParseUrl {
private val REGEXSUBFIX = "=([^&]*)"
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(url, partToExtract[, key]) - This is a special version of `parse_url` that performs the same operation, but returns a NULL value instead of raising an error if the parsing cannot be performed.",
examples = """
Examples:
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'HOST');
spark.apache.org
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY');
query=1
> SELECT _FUNC_('inva lid://spark.apache.org/path?query=1', 'QUERY');
NULL
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY', 'query');
1
""",
since = "4.0.0",
group = "url_funcs")
// scalastyle:on line.size.limit
case class TryParseUrl(params: Seq[Expression], replacement: Expression)
extends RuntimeReplaceable with InheritAnalysisRules {
def this(children: Seq[Expression]) = this(children, ParseUrl(children, failOnError = false))

override def prettyName: String = "try_parse_url"

override def parameters: Seq[Expression] = params

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(replacement = newChild)
}
}

/**
* Extracts a part from a URL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@
| org.apache.spark.sql.catalyst.expressions.TryElementAt | try_element_at | SELECT try_element_at(array(1, 2, 3), 2) | struct<try_element_at(array(1, 2, 3), 2):int> |
| org.apache.spark.sql.catalyst.expressions.TryMod | try_mod | SELECT try_mod(3, 2) | struct<try_mod(3, 2):int> |
| org.apache.spark.sql.catalyst.expressions.TryMultiply | try_multiply | SELECT try_multiply(2, 3) | struct<try_multiply(2, 3):int> |
| org.apache.spark.sql.catalyst.expressions.TryParseUrl | try_parse_url | SELECT try_parse_url('http://spark.apache.org/path?query=1', 'HOST') | struct<try_parse_url(http://spark.apache.org/path?query=1, HOST):string> |
| org.apache.spark.sql.catalyst.expressions.TryReflect | try_reflect | SELECT try_reflect('java.util.UUID', 'randomUUID') | struct<try_reflect(java.util.UUID, randomUUID):string> |
| org.apache.spark.sql.catalyst.expressions.TrySubtract | try_subtract | SELECT try_subtract(2, 1) | struct<try_subtract(2, 1):int> |
| org.apache.spark.sql.catalyst.expressions.TryToBinary | try_to_binary | SELECT try_to_binary('abc', 'utf-8') | struct<try_to_binary(abc, utf-8):binary> |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ class UrlFunctionsSuite extends QueryTest with SharedSparkSession {
}
}

test("url try_parse_url function") {

def testUrl(url: String, expected: Row): Unit = {
checkAnswer(Seq[String]((url)).toDF("url").selectExpr(
"try_parse_url(url, 'HOST')", "try_parse_url(url, 'PATH')",
"try_parse_url(url, 'QUERY')", "try_parse_url(url, 'REF')",
"try_parse_url(url, 'PROTOCOL')", "try_parse_url(url, 'FILE')",
"try_parse_url(url, 'AUTHORITY')", "try_parse_url(url, 'USERINFO')",
"try_parse_url(url, 'QUERY', 'query')"), expected)
}

testUrl(
"http://userinfo@spark.apache.org/path?query=1#Ref",
Row("spark.apache.org", "/path", "query=1", "Ref",
"http", "/path?query=1", "userinfo@spark.apache.org", "userinfo", "1"))

testUrl(
"https://use%20r:pas%20s@example.com/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two",
Row("example.com", "/dir%20/pa%20th.HTML", "query=x%20y&q2=2", "Ref%20two",
"https", "/dir%20/pa%20th.HTML?query=x%20y&q2=2", "use%20r:pas%20s@example.com",
"use%20r:pas%20s", "x%20y"))

testUrl(
"http://user:pass@host",
Row("host", "", null, null, "http", "", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/",
Row("host", "/", null, null, "http", "/", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/?#",
Row("host", "/", "", "", "http", "/?", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/file;param?query;p2",
Row("host", "/file;param", "query;p2", null, "http", "/file;param?query;p2",
"user:pass@host", "user:pass", null))

testUrl(
"inva lid://user:pass@host/file;param?query;p2",
Row(null, null, null, null, null, null, null, null, null))
}

test("url encode/decode function") {
def testUrl(url: String, fn: String, expected: Row): Unit = {
checkAnswer(Seq[String]((url)).toDF("url")
Expand Down

0 comments on commit abc4986

Please sign in to comment.