Skip to content

Conversation

@bogao007
Copy link
Contributor

@bogao007 bogao007 commented Jun 27, 2024

What changes were proposed in this pull request?

  • Base implementation for Python State V2
  • Implemented ValueState

Below we specifically highlight some key files/components for this change:

  • Python
    • group_ops.py: defines transformWithStateInPandas function and its udf.
    • serializer.py: defines how we load and dump arrow streams for data rows between the JVM and Python process.
    • stateful_processor.py: defines StatefulProcessorHandle, ValueState functionalities and StatefulProcessor interface.
    • state_api_client.py and value_state_client.py: contains logics to send API request in protobuf format to the server (JVM)
  • Scala
    • TransformWithStateInPandasExec: physical operator for TransformWithStateInPandas.
    • TransformWithStateInPandasPythonRunner: python runner that launches python worker that executes the udf.
    • TransformWithStateInPandasStateServer: class that handles state requests in protobuf format from python side.

Why are the changes needed?

Support Python State V2 API

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Added unit tests.
Did local integration test with below command

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.shuffle.partitions","1")
output_schema = StructType([
    StructField("value", LongType(), True)
])
state_schema = StructType([
    StructField("id", LongType(), True),
    StructField("value", StringType(), True),
    StructField("comment", StringType(), True)
])

class SimpleStatefulProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    self.value_state = handle.getValueState("testValueState", state_schema)
  def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]:
    self.value_state.update((1,"test_value","comment"))
    exists = self.value_state.exists()
    print(f"value state exists: {exists}")
    value = self.value_state.get()
    print(f"get value: {value}")
    print("clearing value state")
    self.value_state.clear()
    print("value state cleared")
    return rows
  def close(self) -> None:
    pass

q = spark.readStream.format("rate").option("rowsPerSecond", "1").option("numPartitions", "1").load().groupBy("value").transformWithStateInPandas(stateful_processor = SimpleStatefulProcessor(), outputStructType=output_schema, outputMode="Update", timeMode="None").writeStream.format("console").option("checkpointLocation", "/tmp/streaming/temp_ckp").outputMode("update").start()

Verified from the logs that value state methods work as expected for key 11

value state exists: True
get value:    id       value  comment
0   1  test_value  comment
clearing value state
value state cleared

Was this patch authored or co-authored using generative AI tooling?

No

@HyukjinKwon
Copy link
Member

Mind filing a JIRA?

@bogao007
Copy link
Contributor Author

Mind filing a JIRA?

Yeah, will do, thanks!

@bogao007 bogao007 changed the title State V2 base implementation and ValueState support [SPARK-48755] State V2 base implementation and ValueState support Jun 28, 2024
Copy link
Contributor

@sahnib sahnib left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making these changes. Reviewed the Python bits, still reviewing Scala bits.

Copy link
Contributor

@sahnib sahnib left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the changes. Left some comments after the second pass.

Comment on lines 435 to 445
>>> class SimpleStatefulProcessor(StatefulProcessor):
... def init(self, handle: StatefulProcessorHandle) -> None:
... self.value_state = handle.getValueState("testValueState", state_schema)
... def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]:
... self.value_state.update("test_value")
... exists = self.value_state.exists()
... value = self.value_state.get()
... self.value_state.clear()
... return rows
... def close(self) -> None:
... pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] It might be more useful to provide a running count example, where we store values above a specified threshold in the state (to keep track of violations). [something like processing temperature sensor values in a stream]

Comment on lines 1169 to 1171
In addition, this function further groups the return of `gen_data_and_state` by the state
instance (same semantic as grouping by grouping key) and produces an iterator of data
chunks for each group, so that the caller can lazily materialize the data chunk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this documentation is referring to the ApplyInPandasWithState serializer which transfers both state and data.


def generate_data_batches(batches):
for batch in batches:
data_pandas = [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a common pattern in Python, but this line is a little hard to read


self.assertEqual(q.name, "this_query")
self.assertTrue(q.isActive)
q.processAllAvailable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include q.awaitTermination()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Shall we ensure the query to be stopped instead of relying on other test to stop leaking query?


package pyspark.sql.streaming;

message StateRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to add some high level comments here or in some other Python file ?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine at high level

@bogao007
Copy link
Contributor Author

Looks fine at high level

Thanks @HyukjinKwon! I addressed your comments, could you help take another look?

@HyukjinKwon
Copy link
Member

I defer to @HeartSaVioR . I don;t have any high level concern

@HeartSaVioR HeartSaVioR changed the title [SPARK-48755][SS] State V2 base implementation and ValueState support [SPARK-48755][SS][PYTHON] transformWithState pyspark base implementation and ValueState support Aug 12, 2024
@HeartSaVioR
Copy link
Contributor

Let's call this out as transformWithState explicitly as now we finalize the name of the API.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9/31 files reviewed (probably several remaining files are auto-generated) - will continue tomorrow. Please leave file-level comment for auto-generated files.

newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = newChild)
}

object TransformWithStateInPandas {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we can't just use the generated constructor of case class? params here are exactly the same with constructor param in case class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah removed it since it's redundant, thanks for catching this!

}
}

case class TransformWithStateInPandas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we add a short description as class doc while we are here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


val outputIterator = executePython(data, output, runner)

CompletionIterator[InternalRow, Iterator[InternalRow]](outputIterator, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where we count numOutputRows in this node?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

27 / 31 files - I'll continue reviewing 4 files, hopefully by today (or early tomorrow).

private val sqlConf = SQLConf.get
private val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch

private var stateSocketSocketPort: Int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Probably one of Socket should be Server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

* This class is used to handle the state requests from the Python side. It runs on a separate
* thread spawned by TransformWithStateInPandasStateRunner per task. It opens a dedicated socket
* to process/transfer state related info which is shut down when task finishes or there's an error
* on opening the socket. It run It processes following state requests and return responses to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It run It processes?

* - Requests for managing state variables (e.g. valueState).
*/
class TransformWithStateInPandasStateServer(
private val stateServerSocket: ServerSocket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in many cases, having private val in constructor param in class is redundant.

private val valueStateMapForTest: mutable.HashMap[String, ValueState[Row]] = null)
extends Runnable with Logging {
private var inputStream: DataInputStream = _
private var outputStream: DataOutputStream = outputStreamForTest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputStreamForTest <= is this really used? We always assign the output stream from run()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK so we do not call run() when testing...

elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
return args_offsets, wrap_grouped_map_pandas_udf_with_state(func, return_type)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF:
argspec = inspect.getfullargspec(chained_func) # signature was lost when wrapping it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to be used anywhere, blindly copied?

... count = 0
... exists = self.num_violations_state.exists()
... if exists:
... existing_violations_pdf = self.num_violations_state.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expectation of the type of this state "value"? From the variable name pdf and also the way we get the number, I suspect this to be a pandas DataFrame, while the right type should be Row.

... new_violations += violations_pdf.count().get('temperature')
... updated_violations = new_violations + existing_violations
... self.num_violations_state.update((updated_violations,))
... yield pd.DataFrame({'id': key, 'count': count})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the explanation is to produce the number of violations instead of the number of inputs. This doesn't follow the explanation.

+---+-----+
| id|count|
+---+-----+
| 0| 2|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the desired output (0, 1), (1, 1)?


def dump_stream(self, iterator, stream):
"""
Read through an iterator of (iterator of pandas DataFram), serialize them to Arrow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DataFrame

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass.


class ValueState:
"""
Class used for arbitrary stateful operations with the v2 API to capture single value state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not call transformWithState as v2 API as only few people would know what is v2. Please call it by the name.

"""
return self._value_state_client.exists(self._state_name)

def get(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we expect Row as state value, not a pandas DataFrame. Please let me know if you are proposing pandas DataFrame for better suit for more state types.


class StatefulProcessorHandle:
"""
Represents the operation handle provided to the stateful processor used in the arbitrary state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: transformWithState

def remove_implicit_key(self) -> None:
import pyspark.sql.streaming.StateMessage_pb2 as stateMessage

print("calling remove_implicit_key on python side")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debugging purpose, or intentionally left for future debug context?

if status == 0:
self.handle_state = state
else:
raise PySparkRuntimeError(f"Error setting handle state: " f"{response_message[1]}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we just match all errors here to PySparkRuntimeError with error message (no classification) - shall we revisit the Scala codebase and ensure we give the same error class for the same error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also there are internal requests vs user side requests. For example, I don't expect users to call set_implicit_key by themselves (so errors from them are internal errors), but expect users to call get_value_state (so error could be either user facing and internal). The classification of error class has to be different for these cases.

df = self.spark.readStream.format("text").option("maxFilesPerTrigger", 1).load(input_path)
df_split = df.withColumn("split_values", split(df["value"], ","))
df_split = df_split.select(
df_split.split_values.getItem(0).alias("id"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would adding cast here instead of having withColumn in L84 work?


self.assertEqual(q.name, "this_query")
self.assertTrue(q.isActive)
q.processAllAvailable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Shall we ensure the query to be stopped instead of relying on other test to stop leaking query?


self._test_transform_with_state_in_pandas_basic(SimpleStatefulProcessor(), check_results)

def test_transform_with_state_in_pandas_sad_cases(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we be explicit a bit for what is the bad case? method name is test name.

)

def test_transform_with_state_in_pandas_query_restarts(self):
input_path = tempfile.mkdtemp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are using three different sub-directories, shall we call this out as root_path and create a subdirectory input explicitly?

existing_violations = 0
for pdf in rows:
pdf_count = pdf.count()
count += pdf_count.get("temperature")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the API doc example - any reason we count the inputs and count the number of violations separately?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minors which could be also deferred to TODO JIRA ticket(s).

if (valueStates(stateName)._1.exists()) {
sendResponse(0)
} else {
sendResponse(1, s"state $stateName doesn't exist")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we distinguish the case of "no value state is defined for the state variable name" vs "the value state is defined but not having a value yet" if we use the same status code?

sendResponse(1, s"state $stateName doesn't exist")
}
val valueRow = PythonSQLUtils.toJVMRow(byteArray, valueStateTuple._2, valueStateTuple._3)
valueStates(stateName)._1.update(valueRow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: valueStateTuple


private def sendResponse(status: Int, errorMessage: String = null): Unit = {
private def sendResponse(
status: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 more spaces (while we are here)

def get(self) -> Any:
import pandas as pd

def get(self) -> Row:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Optional[Row]?

status = response_message[0]
if status != 0:
raise PySparkRuntimeError(f"Error initializing value state: " f"{response_message[1]}")
raise PySparkRuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we give a better error class as it's user facing error? You can revert back and file a JIRA ticket for this as well to defer the change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect having dedicated error class, if Scala version of the implementation uses the error class then use the same, otherwise define a new one.

return True
elif status == 1:
# server returns 1 if the state does not exist
elif status == 1 and "doesn't exist" in response_message[1]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend to use the different status code instead of parsing. Please consider the change relying on string/hardcode to be unacceptable except specific needs.

else:
raise PySparkRuntimeError(
f"Error checking value state exists: " f"{response_message[1]}"
errorClass="CALL_BEFORE_INITIALIZE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, explicitly define a dedicated error class

return row
else:
raise PySparkRuntimeError(f"Error getting value state: {response_message[1]}")
raise PySparkRuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, probably the same error class with above

status = response_message[0]
if status != 0:
raise PySparkRuntimeError(f"Error updating value state: " f"{response_message[1]}")
raise PySparkRuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, same error class as above

status = response_message[0]
if status != 0:
raise PySparkRuntimeError(f"Error clearing value state: " f"{response_message[1]}")
raise PySparkRuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, same error class as above

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Please leave a comment listing all JIRA tickets for TODOs, for record/reference.

@HeartSaVioR
Copy link
Contributor

I'm going to merge as we have TODO tickets and all others look OK.

Thanks! Merging to master.

@bogao007
Copy link
Contributor Author

+1

Please leave a comment listing all JIRA tickets for TODOs, for record/reference.

Thanks a lot @HeartSaVioR! Here are the TODOs related to this PR:
https://issues.apache.org/jira/browse/SPARK-49233
https://issues.apache.org/jira/browse/SPARK-49100
https://issues.apache.org/jira/browse/SPARK-49212

LuciferYang added a commit that referenced this pull request Nov 1, 2024
… module

### What changes were proposed in this pull request?
This pr makes the following changes to the `maven-shade-plugin` rules for the `sql/core` module:

1. To avoid being influenced by the parent `pom.xml`, use `combine.self = "override"` in the `<configuration>` of the `maven-shade-plugin` for the `sql/core` module. Before this configuration was added, the relocation result was incorrect, and `protobuf-java` was not relocated. We can unzip the packaging result to confirm this issue.

We can use IntelliJ's "Show Effective POM" feature to view the result of this parameter, the result is equivalent to the effective POM log with --debug printing added during the Maven compilation:

**Before**

<img width="828" alt="image" src="https://github.com/user-attachments/assets/0bce810f-57e9-4a50-9fa2-b6063e040a29">

We can see that an unexpected
```
<includes>
  <include>org.eclipse.jetty.**</include>
</includes>
```
 has been added to the relocation rule.

**After**

<img width="787" alt="image" src="https://github.com/user-attachments/assets/0fab3422-2da7-4b8f-bd7f-9357fcdc39c2">

We can see that the extra `<includes>` in the relocation rule is no longer present.

2. Before SPARK-48755 | #47133 overwrote the `maven-shade-plugin` rules for `sql/core`, it inherited the rules from the parent `pom.xml` and shaded `org.spark-project.spark:unused`. This behavior changed after SPARK-48755, so this pr restores it.

3. The relocation rules for Guava should be retained and follow the configuration in the parent `pom.xml`, which relocates `com.google.common` to `${spark.shade.packageName}.guava`. This PR restores this configuration.

4. For `protobuf-java`, which is under the `com.google.protobuf` package, the already shaded `protobuf-java` in the `core` module can be reused instead of shading it again in `sql/core` module. Therefore, this pr only configures the corresponding relocation rule for it: `com.google.protobuf` -> `${spark.shade.packageName}.spark_core.protobuf`.

5. Regarding the `ServicesResourceTransformer` configuration, it is used to merge `META-INF/services` resources. This is not needed for Guava and `protobuf-java`, so this pr removes it.

### Why are the changes needed?
Fix shade and relocation rule of `sql/core` module

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass Github Aciton
- Manually inspect the packaging result: Extract `spark-sql_2.13-4.0.0-SNAPSHOT.jar` to a separate directory, then execute `grep "org.sparkproject.guava" -R *` and `grep "org.sparkproject.spark_core.protobuf" -R *` to confirm the successful relocation.
- Maven test passed: https://github.com/LuciferYang/spark/runs/32278520082

<img width="960" alt="image" src="https://github.com/user-attachments/assets/5435b2ff-3785-4413-83d9-190c16c6ba75">

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48675 from LuciferYang/sql-core-shade.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
HyukjinKwon pushed a commit that referenced this pull request Nov 14, 2024
…ithState`

### What changes were proposed in this pull request?

This follow-ups for #47133 to add missing API ref docs

### Why are the changes needed?

Provide proper API ref doc for `transformWithState`

### Does this PR introduce _any_ user-facing change?

No API changes but only the user-facing API ref docs will include the new API

### How was this patch tested?

The existing doc build in CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48840 from itholic/SPARK-48755-followup.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants