Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49691][PYTHON][CONNECT] Function substring should accept column names #48135

Closed
wants to merge 1 commit into from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Sep 18, 2024

What changes were proposed in this pull request?

Function substring should accept column names

Why are the changes needed?

Bug fix:

In [1]:     >>> import pyspark.sql.functions as sf
   ...:     >>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p', 'l'])
   ...:     >>> df.select('*', sf.substring('s', 'p', 'l')).show()

works in PySpark Classic, but fail in Connect with:

NumberFormatException                     Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.select('*', sf.substring('s', 'p', 'l')).show()

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1170, in DataFrame.show(self, n, truncate, vertical)
   1169 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
-> 1170     print(self._show_string(n, truncate, vertical))

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:927, in DataFrame._show_string(self, n, truncate, vertical)
    910     except ValueError:
    911         raise PySparkTypeError(
    912             errorClass="NOT_BOOL",
    913             messageParameters={
   (...)
    916             },
    917         )
    919 table, _ = DataFrame(
    920     plan.ShowString(
    921         child=self._plan,
    922         num_rows=n,
    923         truncate=_truncate,
    924         vertical=vertical,
    925     ),
    926     session=self._session,
--> 927 )._to_table()
    928 return table[0][0].as_py()

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1844, in DataFrame._to_table(self)
   1842 def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]:
   1843     query = self._plan.to_proto(self._session.client)
-> 1844     table, schema, self._execution_info = self._session.client.to_table(
   1845         query, self._plan.observations
   1846     )
   1847     assert table is not None
   1848     return (table, schema)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:892, in SparkConnectClient.to_table(self, plan, observations)
    890 req = self._execute_plan_request_with_metadata()
    891 req.plan.CopyFrom(plan)
--> 892 table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations)
    894 # Create a query execution object.
    895 ei = ExecutionInfo(metrics, observed_metrics)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1517, in SparkConnectClient._execute_and_fetch(self, req, observations, self_destruct)
   1514 properties: Dict[str, Any] = {}
   1516 with Progress(handlers=self._progress_handlers, operation_id=req.operation_id) as progress:
-> 1517     for response in self._execute_and_fetch_as_iterator(
   1518         req, observations, progress=progress
   1519     ):
   1520         if isinstance(response, StructType):
   1521             schema = response

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1494, in SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations, progress)
   1492     raise kb
   1493 except Exception as error:
-> 1494     self._handle_error(error)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1764, in SparkConnectClient._handle_error(self, error)
   1762 self.thread_local.inside_error_handling = True
   1763 if isinstance(error, grpc.RpcError):
-> 1764     self._handle_rpc_error(error)
   1765 elif isinstance(error, ValueError):
   1766     if "Cannot invoke RPC" in str(error) and "closed" in str(error):

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1840, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1837             if info.metadata["errorClass"] == "INVALID_HANDLE.SESSION_CHANGED":
   1838                 self._closed = True
-> 1840             raise convert_exception(
   1841                 info,
   1842                 status.message,
   1843                 self._fetch_enriched_error(info),
   1844                 self._display_server_stack_trace(),
   1845             ) from None
   1847     raise SparkConnectGrpcException(status.message) from None
   1848 else:

NumberFormatException: [CAST_INVALID_INPUT] The value 'p' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
...

Does this PR introduce any user-facing change?

yes, Function substring in Connect can properly handle column names

How was this patch tested?

new doctests

Was this patch authored or co-authored using generative AI tooling?

No

@xinrong-meng
Copy link
Member

Nice fix, thank you!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@zhengruifeng zhengruifeng deleted the py_substring_fix branch September 18, 2024 23:44
@zhengruifeng
Copy link
Contributor Author

thanks all for reviews!

attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…umn names

### What changes were proposed in this pull request?
Function `substring` should accept column names

### Why are the changes needed?
Bug fix:

```
In [1]:     >>> import pyspark.sql.functions as sf
   ...:     >>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p', 'l'])
   ...:     >>> df.select('*', sf.substring('s', 'p', 'l')).show()
```

works in PySpark Classic, but fail in Connect with:
```
NumberFormatException                     Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.select('*', sf.substring('s', 'p', 'l')).show()

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1170, in DataFrame.show(self, n, truncate, vertical)
   1169 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
-> 1170     print(self._show_string(n, truncate, vertical))

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:927, in DataFrame._show_string(self, n, truncate, vertical)
    910     except ValueError:
    911         raise PySparkTypeError(
    912             errorClass="NOT_BOOL",
    913             messageParameters={
   (...)
    916             },
    917         )
    919 table, _ = DataFrame(
    920     plan.ShowString(
    921         child=self._plan,
    922         num_rows=n,
    923         truncate=_truncate,
    924         vertical=vertical,
    925     ),
    926     session=self._session,
--> 927 )._to_table()
    928 return table[0][0].as_py()

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1844, in DataFrame._to_table(self)
   1842 def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]:
   1843     query = self._plan.to_proto(self._session.client)
-> 1844     table, schema, self._execution_info = self._session.client.to_table(
   1845         query, self._plan.observations
   1846     )
   1847     assert table is not None
   1848     return (table, schema)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:892, in SparkConnectClient.to_table(self, plan, observations)
    890 req = self._execute_plan_request_with_metadata()
    891 req.plan.CopyFrom(plan)
--> 892 table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations)
    894 # Create a query execution object.
    895 ei = ExecutionInfo(metrics, observed_metrics)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1517, in SparkConnectClient._execute_and_fetch(self, req, observations, self_destruct)
   1514 properties: Dict[str, Any] = {}
   1516 with Progress(handlers=self._progress_handlers, operation_id=req.operation_id) as progress:
-> 1517     for response in self._execute_and_fetch_as_iterator(
   1518         req, observations, progress=progress
   1519     ):
   1520         if isinstance(response, StructType):
   1521             schema = response

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1494, in SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations, progress)
   1492     raise kb
   1493 except Exception as error:
-> 1494     self._handle_error(error)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1764, in SparkConnectClient._handle_error(self, error)
   1762 self.thread_local.inside_error_handling = True
   1763 if isinstance(error, grpc.RpcError):
-> 1764     self._handle_rpc_error(error)
   1765 elif isinstance(error, ValueError):
   1766     if "Cannot invoke RPC" in str(error) and "closed" in str(error):

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1840, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1837             if info.metadata["errorClass"] == "INVALID_HANDLE.SESSION_CHANGED":
   1838                 self._closed = True
-> 1840             raise convert_exception(
   1841                 info,
   1842                 status.message,
   1843                 self._fetch_enriched_error(info),
   1844                 self._display_server_stack_trace(),
   1845             ) from None
   1847     raise SparkConnectGrpcException(status.message) from None
   1848 else:

NumberFormatException: [CAST_INVALID_INPUT] The value 'p' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
...
```

### Does this PR introduce _any_ user-facing change?
yes, Function `substring` in Connect can properly handle column names

### How was this patch tested?
new doctests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48135 from zhengruifeng/py_substring_fix.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…umn names

### What changes were proposed in this pull request?
Function `substring` should accept column names

### Why are the changes needed?
Bug fix:

```
In [1]:     >>> import pyspark.sql.functions as sf
   ...:     >>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p', 'l'])
   ...:     >>> df.select('*', sf.substring('s', 'p', 'l')).show()
```

works in PySpark Classic, but fail in Connect with:
```
NumberFormatException                     Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.select('*', sf.substring('s', 'p', 'l')).show()

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1170, in DataFrame.show(self, n, truncate, vertical)
   1169 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
-> 1170     print(self._show_string(n, truncate, vertical))

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:927, in DataFrame._show_string(self, n, truncate, vertical)
    910     except ValueError:
    911         raise PySparkTypeError(
    912             errorClass="NOT_BOOL",
    913             messageParameters={
   (...)
    916             },
    917         )
    919 table, _ = DataFrame(
    920     plan.ShowString(
    921         child=self._plan,
    922         num_rows=n,
    923         truncate=_truncate,
    924         vertical=vertical,
    925     ),
    926     session=self._session,
--> 927 )._to_table()
    928 return table[0][0].as_py()

File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1844, in DataFrame._to_table(self)
   1842 def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]:
   1843     query = self._plan.to_proto(self._session.client)
-> 1844     table, schema, self._execution_info = self._session.client.to_table(
   1845         query, self._plan.observations
   1846     )
   1847     assert table is not None
   1848     return (table, schema)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:892, in SparkConnectClient.to_table(self, plan, observations)
    890 req = self._execute_plan_request_with_metadata()
    891 req.plan.CopyFrom(plan)
--> 892 table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations)
    894 # Create a query execution object.
    895 ei = ExecutionInfo(metrics, observed_metrics)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1517, in SparkConnectClient._execute_and_fetch(self, req, observations, self_destruct)
   1514 properties: Dict[str, Any] = {}
   1516 with Progress(handlers=self._progress_handlers, operation_id=req.operation_id) as progress:
-> 1517     for response in self._execute_and_fetch_as_iterator(
   1518         req, observations, progress=progress
   1519     ):
   1520         if isinstance(response, StructType):
   1521             schema = response

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1494, in SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations, progress)
   1492     raise kb
   1493 except Exception as error:
-> 1494     self._handle_error(error)

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1764, in SparkConnectClient._handle_error(self, error)
   1762 self.thread_local.inside_error_handling = True
   1763 if isinstance(error, grpc.RpcError):
-> 1764     self._handle_rpc_error(error)
   1765 elif isinstance(error, ValueError):
   1766     if "Cannot invoke RPC" in str(error) and "closed" in str(error):

File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1840, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1837             if info.metadata["errorClass"] == "INVALID_HANDLE.SESSION_CHANGED":
   1838                 self._closed = True
-> 1840             raise convert_exception(
   1841                 info,
   1842                 status.message,
   1843                 self._fetch_enriched_error(info),
   1844                 self._display_server_stack_trace(),
   1845             ) from None
   1847     raise SparkConnectGrpcException(status.message) from None
   1848 else:

NumberFormatException: [CAST_INVALID_INPUT] The value 'p' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
...
```

### Does this PR introduce _any_ user-facing change?
yes, Function `substring` in Connect can properly handle column names

### How was this patch tested?
new doctests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48135 from zhengruifeng/py_substring_fix.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants