From b20f6a23e466192a85ab0465d0eead4b875ab276 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Thu, 16 Jan 2025 10:20:14 -0800 Subject: [PATCH 1/4] update err --- .../python_streaming_source_runner.py | 6 -- .../worker/python_streaming_sink_runner.py | 68 ++++++++----------- 2 files changed, 30 insertions(+), 44 deletions(-) diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py b/python/pyspark/sql/streaming/python_streaming_source_runner.py index a7349779dc626..d5e90c4ccb60b 100644 --- a/python/pyspark/sql/streaming/python_streaming_source_runner.py +++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py @@ -183,12 +183,6 @@ def main(infile: IO, outfile: IO) -> None: }, ) outfile.flush() - except Exception as e: - error_msg = "data source {} throw exception: {}".format(data_source.name, e) - raise PySparkRuntimeError( - errorClass="PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", - messageParameters={"msg": error_msg}, - ) finally: reader.stop() except BaseException as e: diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py b/python/pyspark/sql/worker/python_streaming_sink_runner.py index c1bf5289cbf89..d2dbed9ec7196 100644 --- a/python/pyspark/sql/worker/python_streaming_sink_runner.py +++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py @@ -96,44 +96,36 @@ def main(infile: IO, outfile: IO) -> None: ) # Receive the `overwrite` flag. overwrite = read_bool(infile) - # Instantiate data source reader. - try: - # Create the data source writer instance. - writer = data_source.streamWriter(schema=schema, overwrite=overwrite) - - # Receive the commit messages. - num_messages = read_int(infile) - commit_messages = [] - for _ in range(num_messages): - message = pickleSer._read_with_length(infile) - if message is not None and not isinstance(message, WriterCommitMessage): - raise PySparkAssertionError( - errorClass="DATA_SOURCE_TYPE_MISMATCH", - messageParameters={ - "expected": "an instance of WriterCommitMessage", - "actual": f"'{type(message).__name__}'", - }, - ) - commit_messages.append(message) - - batch_id = read_long(infile) - abort = read_bool(infile) - - # Commit or abort the Python data source write. - # Note the commit messages can be None if there are failed tasks. - if abort: - writer.abort(commit_messages, batch_id) - else: - writer.commit(commit_messages, batch_id) - # Send a status code back to JVM. - write_int(0, outfile) - outfile.flush() - except Exception as e: - error_msg = "data source {} throw exception: {}".format(data_source.name, e) - raise PySparkRuntimeError( - errorClass="PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", - messageParameters={"action": "commitOrAbort", "error": error_msg}, - ) + # Create the data source writer instance. + writer = data_source.streamWriter(schema=schema, overwrite=overwrite) + # Receive the commit messages. + num_messages = read_int(infile) + + commit_messages = [] + for _ in range(num_messages): + message = pickleSer._read_with_length(infile) + if message is not None and not isinstance(message, WriterCommitMessage): + raise PySparkAssertionError( + errorClass="DATA_SOURCE_TYPE_MISMATCH", + messageParameters={ + "expected": "an instance of WriterCommitMessage", + "actual": f"'{type(message).__name__}'", + }, + ) + commit_messages.append(message) + + batch_id = read_long(infile) + abort = read_bool(infile) + + # Commit or abort the Python data source write. + # Note the commit messages can be None if there are failed tasks. + if abort: + writer.abort(commit_messages, batch_id) + else: + writer.commit(commit_messages, batch_id) + # Send a status code back to JVM. + write_int(0, outfile) + outfile.flush() except BaseException as e: handle_worker_exception(e, outfile) sys.exit(-1) From ce20b44d069103004af75c15d28c8bd83ca854c8 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Thu, 16 Jan 2025 16:30:16 -0800 Subject: [PATCH 2/4] address comments --- python/pyspark/sql/streaming/python_streaming_source_runner.py | 3 ++- python/pyspark/sql/worker/python_streaming_sink_runner.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py b/python/pyspark/sql/streaming/python_streaming_source_runner.py index d5e90c4ccb60b..11aa4e15ab1ee 100644 --- a/python/pyspark/sql/streaming/python_streaming_source_runner.py +++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py @@ -21,7 +21,7 @@ from typing import IO, Iterator, Tuple from pyspark.accumulators import _accumulatorRegistry -from pyspark.errors import IllegalArgumentException, PySparkAssertionError, PySparkRuntimeError +from pyspark.errors import IllegalArgumentException, PySparkAssertionError from pyspark.serializers import ( read_int, write_int, @@ -78,6 +78,7 @@ def partitions_func( start_offset = json.loads(utf8_deserializer.loads(infile)) end_offset = json.loads(utf8_deserializer.loads(infile)) partitions = reader.partitions(start_offset, end_offset) + # Return the serialized partition values. write_int(len(partitions), outfile) for partition in partitions: diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py b/python/pyspark/sql/worker/python_streaming_sink_runner.py index d2dbed9ec7196..13b8f4d30786c 100644 --- a/python/pyspark/sql/worker/python_streaming_sink_runner.py +++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py @@ -21,7 +21,7 @@ from typing import IO from pyspark.accumulators import _accumulatorRegistry -from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.errors import PySparkAssertionError from pyspark.serializers import ( read_bool, read_int, From 0d1c5a30d9fe427f014d1ac3dff694c90ad41fda Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Wed, 22 Jan 2025 16:18:15 -0800 Subject: [PATCH 3/4] update --- .../python/streaming/PythonStreamingDataSourceSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala index e6e5ee62efeb9..5c86817cba68c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala @@ -266,7 +266,6 @@ class PythonStreamingDataSourceSimpleSuite extends PythonDataSourceSuiteBase { ) ) assert(err.getMessage.contains(msg)) - assert(err.getMessage.contains("ErrorDataSource")) stream.stop() } @@ -332,7 +331,6 @@ class PythonStreamingDataSourceSimpleSuite extends PythonDataSourceSuiteBase { ) ) assert(err.getMessage.contains(msg)) - assert(err.getMessage.contains("ErrorDataSource")) stream.stop() } From 2d0a638969feda2023d84130e4adfa1d872fde42 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Wed, 22 Jan 2025 20:18:20 -0800 Subject: [PATCH 4/4] more fix --- .../python/streaming/PythonStreamingDataSourceSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala index 5c86817cba68c..5914abd11c01c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala @@ -667,7 +667,6 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { ) ) assert(err.getMessage.contains(msg)) - assert(err.getMessage.contains("ErrorDataSource")) stream.stop() } @@ -729,7 +728,6 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { ) ) assert(err.getMessage.contains(msg)) - assert(err.getMessage.contains("ErrorDataSource")) stream.stop() }