Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Sep 15, 2022

What changes were proposed in this pull request?

This PR proposes to introduce the new API applyInPandasWithState in PySpark, which provides the functionality to perform arbitrary stateful processing in Structured Streaming.

This will be a pair API with applyInPandas - applyInPandas in PySpark covers the use case of flatMapGroups in Scala/Java API, applyInPandasWithState in PySpark covers the use case of flatMapGroupsWithState in Scala/Java API.

The signature of API follows:

# call this function after groupBy
def applyInPandasWithState(
    self,
    func: "PandasGroupedMapFunctionWithState",
    outputStructType: Union[StructType, str],
    stateStructType: Union[StructType, str],
    outputMode: str,
    timeoutConf: str,
) -> DataFrame

and the signature of user function follows:

def func(
    key: Tuple,
    pdf_iter: Iterator[pandas.DataFrame],
    state: GroupStateImpl
) -> Iterator[pandas.DataFrame]

(Please refer the code diff for function doc of new function.)

Major design choices which differ from existing APIs:

  1. The new API is untyped, while flatMapGroupsWithState in typed API.

This is based on the nature of Python language - it's really duck typing and type definition is just a hint. We don't have the implementation of typed API for PySpark DataFrame.

This leads us to design the API to be untyped, meaning, all types for (input, state, output) should be Row-compatible. While we don't require end users to deal with Row directly, the model they will use for state and output must be convertible to Row with default encoder. If they want the python type for state which is not compatible with Row (e.g. custom class), they need to pickle and use BinaryType to store it.

This requires end users to specify the type of state and output via Spark SQL schema in the method.

Note that this helps to ensure compatibility for state data across Spark versions, as long as the encoders for 1) python type -> python Row and 2) python Row -> UnsafeRow are not changed. We won't change the underlying data layout for UnsafeRow, as it will break all of existing stateful query.

  1. The new API will produce Pandas DataFrame to user function, while flatMapGroupsWithState produces iterator of rows.

We decided to follow the user experience applyInPandas provides for both consistency and performance (Arrow batching, vectorization, etc). This leads us to design the user function to leverage pandas DataFrame rather than iterator of rows. While this leads inconsistency of the UX from the Scala/Java API, we don't think this will come up as a problem since Pandas is considered as de-facto standard for Python data scientists.

  1. The new API will produce iterator of Pandas DataFrame to user function and also require to return iterator of Pandas DataFrame to address scalability.

There is known limitation of applyInPandas, scalability. It basically requires data in a specific group to be fit into memory. During the design phase of new API, we decided to address the scalability rather than inheriting the limitation.

To address the scalability, we tweak the user function to receive an iterator (generator) of Pandas DataFrame instead of a single Pandas DataFrame, and also return an iterator (generator) of Pandas DataFrame. We think it does not hurt the UX too much, as for-each and yield would be enough to deal with the requirement of dealing with iterator.

Implementation perspective, we split the data in a specific group to multiple chunks, which each chunk is stored and sent as "an" Arrow RecordBatch, and then finally materialized to "a" pandas DataFrame. This way, as long as end users don't materialize lots of pandas DataFrames from the iterator at the same time, only one chunk will be materialized into memory which is scalable. Similar logic applies to the output of user function, hence scalable as well.

  1. The new API also bin-packs the data with multiple groups into "an" Arrow RecordBatch.

Given the API is mainly used for streaming workload, it could be high likely that the volume of data in a specific group may not be huge enough to leverage the benefit of Arrow columnar batching, which would hurt the performance. To address this, we also do the opposite thing what we do for scalability, bin-pack. That said, an Arrow RecordBatch can contain data for multiple groups, as well as a part of data for specific group. This address both aspects of concerns together, scalability and performance.

Note that we are not implementing all of features Scala/Java API provide from the initial phase. e.g. Support for batch query and support for initial state will be left as TODO.

Why are the changes needed?

PySpark users don't have a way to perform arbitrary stateful processing in Structured Streaming and being forced to use either Java or Scala which is unacceptable for users in many cases. This PR enables PySpark users to deal with it without moving to Java/Scala world.

Does this PR introduce any user-facing change?

Yes. We are exposing new public API in PySpark which performs arbitrary stateful processing.

How was this patch tested?

N/A. We will make sure test suites are constructed via E2E manner under SPARK-40431 - #37894

@HeartSaVioR HeartSaVioR marked this pull request as ready for review September 15, 2022 22:16
@HeartSaVioR HeartSaVioR force-pushed the SPARK-40434-on-top-of-SPARK-40433-SPARK-40432 branch from f4435a2 to 0000994 Compare September 15, 2022 22:16
@HeartSaVioR HeartSaVioR changed the title [DRAFT][DO-NOT-MERGE][SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark Sep 15, 2022
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 16, 2022

cc. @viirya @HyukjinKwon
Please take a look into this. Thanks. I understand this is huge and a bit complicated in some part, logic around binpack/chunk. Please feel free to leave comments if the code comment isn't sufficient to understand, I'll try my best to cover it.

And please help finding more eyes of reviewers. I'm not sure who would be available to review the PR touching both PySpark and Structured Streaming, but probably knowing one area and reviewing one area is still also helpful.

@HyukjinKwon
Copy link
Member

Will take a close look next Monday in KST.

]

PandasGroupedMapFunctionWithState = Callable[
[Any, Iterable[DataFrameLike], GroupStateImpl], Iterable[DataFrameLike]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the type be GroupState without the 'Impl' - looks bad in public api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either we can split out interface and implementation, or just change the name. I'm fine with any direction.
cc. @HyukjinKwon What'd be the best practice of such case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way too. Users aren't able to create this instance directly anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern is that if we happen to have a different implementation of GroupState in the far future. But the type is dynamic anyway so I don't worry too much.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I just renamed GroupStateImpl to GroupState. Once we find the necessity we can use the same name to become interface and move out the implementation (I guess this is what @HyukjinKwon said the type is dynamic but please let me know if I miss something.)

will be input data -> state timeout. When the function is invoked for state timeout, there
will be no data being presented.
The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function takes parameters ... and returns Iterator[...]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows the existing method doc in applyInPandas.

The "function" here refers to user function end users will provide, not a function Spark provides as public API, so using should here does not seem to be wrong. The mood is something like "you should construct an user function blabla...". "s" should be removed though.

For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
and the returned `pandas.DataFrame` across all invocations are combined as a
:class:`DataFrame`. Note that the user function should loop through and process all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Note that the user function should loop through and process all elements in the iterator. The user function should not make a guess of the number of elements in the iterator.'

  • Why? This sounds like the use must process all iterator entries or otherwise something bad would happen. I would reword this to indicate that the grouped data could be split into multiple entries -

'Note that the group data may be split as multiple Iterator records and the user function should not assume that it receives a single record.'

I would still suggest we have a design discussion about splitting groups unnecessary as I believe we should not do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the user function should loop through and process all elements in the iterator.

Why? This sounds like the use must process all iterator entries or otherwise something bad would happen. I would reword this to indicate that the grouped data could be split into multiple entries -

I agree this is too conservative and we can remove that once there is technically no issue. I don't think we never have such a test for even existing flatMapGroupsWithState so we actually don't clearly know what happens if we pull a part of data from group.

The user function should not make a guess of the number of elements in the iterator.

I would still suggest we have a design discussion about splitting groups unnecessary as I believe we should not do this.

I think there is a room for discussion on how to split group with in mind we also binpack in terms of performance, but I really doubt this has to be an interface contract. For former, it's not a first class concern and we shouldn't block this PR. For latter, I really want to see what is the real use case which leverages the interface contract, and how much it will be harder to implement for the same if we do not guarantee such contract.

Stricter interface contract can be loose without breaking anything, looser interface contract can never be stricter without breaking compatibility. Why not we go with conservative till we are very clear there is a clear use case?

The `stateStructType` should be :class:`StructType` describing the schema of user-defined
state. The value of state will be presented as a tuple, as well as the update should be
performed with the tuple. User defined types e.g. native Python class types are not
supported. Alternatively, you can pickle the data and produce the data as BinaryType, but
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Alternatively, you can pickle the data ...' - instead say

'For such cases, the user should pickle the data into BinaryType. Note that this approach may be sensitive to backwards and forward compatibility issues of Python picks and Spark can not guarantee compatibility.

though I think you could drop the note as that is orthogonal to Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's simply just remove the suggestion.

it is tied to the backward and forward compatibility of pickle in Python, and Spark itself
does not guarantee the compatibility.
The length of each element in both input and returned value, `pandas.DataFrame`, can be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'The size of each DataFrame in both the input and output ...'

'The number of DataFrames in both the input and output can also be arbitrary.'

schema if specified as strings, or match the field data types by position if not strings,
e.g. integer indices.
The `stateStructType` should be :class:`StructType` describing the schema of user-defined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... describing the schema of the user-defined state. The value of the state will be presented as a tuple and the update should be performed with a tuple.

The `stateStructType` should be :class:`StructType` describing the schema of user-defined
state. The value of state will be presented as a tuple, as well as the update should be
performed with the tuple. User defined types e.g. native Python class types are not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not StructType types, e.g. user-defined or native Python types are not supported.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tricky - native Python types contain int, float, str, ... and of course they are supported. Probably the clear definition is "python types are supported as long as the default encoder can convert to the Spark SQL type". Not sure we have a clear documentation describing the matrix of compatibility.

cc. @HyukjinKwon could you please help us make this clear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just say that "the corresponding Python types for :class:DataType are supported". Documented here https://spark.apache.org/docs/latest/sql-ref-datatypes.html (click python tab)

func : function
a Python native function to be called on every group. It should takes parameters
(key, Iterator[`pandas.DataFrame`], state) and returns Iterator[`pandas.DataFrame`].
Note that the type of key is tuple, and the type of state is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the type of the key is tuple and the type of the state is ...

:class:`pyspark.sql.streaming.state.GroupStateImpl`.
outputStructType : :class:`pyspark.sql.types.DataType` or str
the type of the output records. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you provide an example here of the string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the doc or here? All other PySpark method docs do not have example of this string.

Maybe we could have examples like other APIs do and provide DDL-formatted type string to compensate.

:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
stateStructType : :class:`pyspark.sql.types.DataType` or str
the type of the user-defined state. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same - can you provide an example of the string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation-wise, looks pretty good

"configured value.")
.version("3.4.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64MB")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a general configuration for this later that applies to all Arrow batch (SPARK-23258). I think we should reuse spark.sql.execution.arrow.maxRecordsPerBatch for the time being.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batching has multiple purposes - here we do this for scalability, meaning it'd be closer to the purpose if we can batch with size rather than the number of rows. I'm OK with changing the condition on cutting out arrow batch to the number of rows, as it's configurable hence users can adjust it to smaller if they encounter the memory issue in any way.

cc. @alex-balikov Does this make sense to you?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, SPARK-23258 is about restricting arrow record batch to size, seems similar with what we propose in this PR. It's still questionable if we calculate in every addition of row (accurate but would be super bad on performance) or do sampling as we do here (cannot be accurate and err might be non-trivial with variable-length columns).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that expressing the limit in terms of bytes is more meaningful that records. However we estimate the bytes size efficiently. Specifically here I would rename 'softLimitSizePerBatch' by removing 'soft' - we can clarify in the comment about that and also including 'Bytes' - 'batchSizeLimitBytes' . Also wonder if we should have the property specific to applyInPandasWithState or should we make it general - remove the applyInPandasWithState scoping even if we do not support this limit initially, seems like generally meaningful and we can follow up fixing the other places as a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(closing the loop) We decided to simply use the number of rows for the condition of constructing Arrow RecordBatch. This will remove all new configs being introduced here, as well as reduce lots of complexity.

"complete the ArrowRecordBatch, which may hurt both throughput and latency.")
.version("3.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100ms")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, can we just leverage spark.sql.execution.pandas.udf.buffer.size (the feature this PR adds already respects it) if the flush time matters? That configuration is for the purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% clear how spark.sql.execution.pandas.udf.buffer.size works. Current logic won't work if this config is able to split an arrow record batch further down to multiple, as we rely on offset and the number of rows to split the range of data from overall arrow record batch. It relies on the fact that the logic has full control of constructing arrow record batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is to have two different aspects of closing the arrow record batch, 1) size 2) time on batching.

)
# the number of columns of result have to match the return type
# but it is fine for result to have no columns at all if it is empty
if not (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not ... ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is borrowed from above function - I think we took if not here because it's more intuitive and easier to think of "valid" case and apply "not" to reverse, rather than manually convert the conditions to be the contraposition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, nevermind, I just misread the code.

elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
soft_limit_bytes_per_batch = runner_conf.get(
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
(64 * 1024 * 1024),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the default be value be defined in some more prominent place? Also the property names.

ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name)
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
soft_limit_bytes_per_batch = runner_conf.get(
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think 'soft' is necessary i the parameter name. Leave that for the comment describing that this is a soft limit.

soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch)

min_data_count_for_sample = runner_conf.get(
"spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment about the property names and default values here and everywhere else - can they be defined in a more prominent place

min_data_count_for_sample = int(min_data_count_for_sample)

soft_timeout_millis_purge_batch = runner_conf.get(
"spark.sql.execution.applyInPandasWithState.softTimeoutPurgeBatch", 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

val aggsInQuery = collectStreamingAggregates(plan)

if (aggsInQuery.isEmpty) {
// applyInPandasWithState without aggregation: operation's output mode must
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we even have operation output mode. We are defining a new api, can we just drop this parameter from the api if we are going to be enforcing for it t match the output mode?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I can imagine the case which current requirement of providing separate output mode prevents the unintentional behavior:

  • They implemented the user function for flatMapGroupsWithState with append mode.
  • They ran the query with append mode.
  • After that, they changed the output mode for the query to update mode for some reason.
  • The user function is unchanged to account the change of update mode.

We haven't allowed the query to run as of now, and we are going to allow the query to run if we drop the parameter.

PS. I'm not a believer that end users can implement their user function accordingly based on output mode, but that is a fundamental API design issue of original flatMapGroupsWithState which is separate one.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 20, 2022

I tried to add method level doc as many as possible, except the case I think it's unnecessary (I might still miss some pieces).

I don't go with the approach trying to explain all of the parameters with types though, for reasons:

  • For Python code, it'd be really hard to maintain the doc in sync with the code.
  • For Scala/Java we have been omitting the doc of parameters or even entire class/method if it's obvious from the name.

In both languages, we strongly encourage to have method doc and parameter explanation for public APIs. Here we technically add only one public method applyInPandasWithState in group_ops, and a couple of public classes GroupState and GroupStateTimeout in PySpark. Others are all internal and private.

timeout configuration for groups that do not receive data for a while. valid values
are defined in :class:`pyspark.sql.streaming.state.GroupStateTimeout`.
# TODO: Examples
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I still need to do - let me come up with some examples. I guess we probably can't run automated test from the example section though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a simple example - let me come up with full example code in examples directory. I'll file a new JIRA ticket for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
# the number of columns of result have to match the return type
# but it is fine for result to have no columns at all if it is empty
if not (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, nevermind, I just misread the code.

* @param eventTimeWatermark event time watermark for the current batch
* @param child logical plan of the underlying data
*/
case class FlatMapGroupsInPandasWithStateExec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this can be merged with the regular FlatMapGroupsWithStateExec. Maybe as a followup cleanup.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always have a separate exec implementation for Scala/Java vs Python since the constructor parameters are different. (We are leveraging case class for logical/physical plan, so difference of the constructor parameters warrants a new class.) So this is intentional. As a compromise we did the refactor to have FlatMapGroupsWithStateExecBase as a base class.

val shouldWriteState = newGroupState.isUpdated || newGroupState.isRemoved ||
hasTimeoutChanged

if (shouldWriteState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if

newGroupState.isRemoved && newGroupState.getTimeoutTimestampMs.isPresent()

  • basically if the state was removed but there is still timeout set? Will you keep the user state object around till the timeout fires?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically if the state was removed but there is still timeout set? Will you keep the user state object around till the timeout fires?

I'm not 100% understanding the intention of the original codebase, but it seems so.

Here the removal of state is removal of "value object" of the state. We don't allow users to set "null" on value object, hence removal of state is the only way to clear the value object. In the meanwhile, we seem to still allow setting the timeout with state having undefined value object.

The status of the state would be the same when you start with new state and only set the timeout without setting the value object. Given we allow this, above case probably has to be allowed as well.

//
// ArrowStreamWriter supports only single VectorSchemaRoot, which means all Arrow RecordBatches
// being sent out from ArrowStreamWriter should have same schema. That said, we have to construct
// "an" Arrow schema to contain both types of data, and also construct Arrow RecordBatches to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to contain both data and state, and also construct ArrowBatches to contain both data and state.

}

object ApplyInPandasWithStateWriter {
val STATE_METADATA_SCHEMA: StructType = StructType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment on the semantics of each column. Specifically isLastChunk is not obvious but important for the operation of the protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Additionally explained why the state metadata has the metadata of chunk as well.

@HeartSaVioR
Copy link
Contributor Author

@HyukjinKwon @alex-balikov
Please go with another round of review, thanks in advance!

state will be saved across invocations.
The function should take parameters (key, Iterator[`pandas.DataFrame`], state) and
returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return another ...

3.A. Extract the data out from entire data via the information of data range.
3.B. Construct a new state instance if the state information is the first occurrence
for the current grouping key.
3.C. Leverage existing new state instance if the state instance is already available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leverage the existing state instance if it is already available for the current grouping key...

# the number of columns of result have to match the return type
# but it is fine for result to have no columns at all if it is empty
if not (
len(result.columns) == len(return_type) or len(result.columns) == 0 and result.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be it is just me but I would suggest adding parentheses so we do not rely on and/or priority

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's not just you. I planned it but forgot it. Thanks for the pointer.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 21, 2022

https://github.com/HeartSaVioR/spark/actions/runs/3098349789/jobs/5019498380

BasicSchedulerIntegrationSuite.super simple job
org.scalatest.exceptions.TestFailedException: Map() did not equal Map(0 -> 42, 5 -> 42, 1 -> 42, 6 -> 42, 9 -> 42, 2 -> 42, 7 -> 42, 3 -> 42, 8 -> 42, 4 -> 42)
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752580) 

YarnClusterSuite.run Spark in yarn-client mode with different configurations, ensuring redaction
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.001220665 minutes. Last failure message: handle.getState().isFinal() was false.
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752582) 

YarnClusterSuite.run Spark in yarn-cluster mode with different configurations, ensuring redaction
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.001046460266666 minutes. Last failure message: handle.getState().isFinal() was false.
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752584) 

YarnClusterSuite.yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414, SPARK-23630)
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.0008878969166664 minutes. Last failure message: handle.getState().isFinal() was false.
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752586) 


YarnClusterSuite.SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.0009406496 minutes. Last failure message: handle.getState().isFinal() was false.

None of test failures is related to the change of this PR. Since we updated the PR again via dd7a655, let's see the build.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

Comment on lines +234 to +236
For a streaming Dataset, the function will be invoked first for all input groups and then
for all timed out states where the input data is set to be empty. Updates to each group's
state will be saved across invocations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For a streaming Dataset, the function will be invoked first for all input groups and then
for all timed out states where the input data is set to be empty. Updates to each group's
state will be saved across invocations.
For a streaming :class:`DataFrame`, the function will be invoked first for all input groups
and then for all timed out states where the input data is set to be empty. Updates to
each group's state will be saved across invocations.

user-defined state. The value of the state will be presented as a tuple, as well as the
update should be performed with the tuple. The corresponding Python types for
:class:DataType are supported. Please refer to the page
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python tab).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python tab).
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (Python tab).

Comment on lines +262 to +263
The size of each DataFrame in both the input and output can be arbitrary. The number of
DataFrames in both the input and output can also be arbitrary.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The size of each DataFrame in both the input and output can be arbitrary. The number of
DataFrames in both the input and output can also be arbitrary.
The size of each `pandas.DataFrame` in both the input and output can be arbitrary.
The number of DataFrames in both the input and output can also be arbitrary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can extract some notes from the description to Notes section. But no biggie.

... for pdf in pdf_iter:
... total_len += len(pdf)
... state.update((total_len,))
... yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
... yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]})
... yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]})
...

count += 1
}

def sizeInBytes(): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need sizeInBytes and getSizeInBytes anymore

@HyukjinKwon
Copy link
Member

My comments are just nits. I will merge this in first to move forward.

Merged to master.

@HeartSaVioR
Copy link
Contributor Author

Thanks @HyukjinKwon and @alex-balikov for thoughtful reviewing and merging!
I'll handle the latest comments as a follow-up PR.

HeartSaVioR added a commit that referenced this pull request Sep 22, 2022
…in PySpark

### What changes were proposed in this pull request?

This PR adds the test suites for #37893, applyInPandasWithState. The new test suite mostly ports E2E test cases from existing [flatMapGroupsWithState](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala).

### Why are the changes needed?

Tests are missing in #37893 by intention to reduce the size of change, and this PR fills the gap.

### Does this PR introduce _any_ user-facing change?

No, test only.

### How was this patch tested?

New test suites.

Closes #37894 from HeartSaVioR/SPARK-40435-on-top-of-SPARK-40434-SPARK-40433-SPARK-40432.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR added a commit that referenced this pull request Sep 22, 2022
### What changes were proposed in this pull request?

This PR addresses the review comments from the last round of review from HyukjinKwon  in #37893.

### Why are the changes needed?

Better documentation and removing unnecessary code.

### Does this PR introduce _any_ user-facing change?

Slight documentation change.

### How was this patch tested?

N/A

Closes #37964 from HeartSaVioR/SPARK-40434-followup.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants