Skip to content

Conversation

@HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR adds the Python version of Dataset.groupByKey(...).flatMapGroupsWithState(...) that is DataFrame.groupby(...).applyInPandasWithState(...) in PySpark.

TBD

Note that documentation will be done in a separate PR given the size of the PR.

Why are the changes needed?

TBD

Does this PR introduce any user-facing change?

Yes, this PR adds a new API DataFrame.groupby(...).applyInPandasWithState(...) in PySpark.

import typing

import pandas as pd

from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql.streaming.state import GroupStateTimeout, GroupStateImpl

output_type = StructType([
    StructField("key", LongType()),
    StructField("countAsString", StringType())])
state_type = StructType([StructField("count", LongType())])

# Type hints are optional in `func`.
def func(key: typing.Tuple, pdf: pd.DataFrame, state: GroupStateImpl) -> pd.DataFrame:
    count = state.getOption
    if count is None:
        count = 0
    else:
        count = count[0]
    count += len(pdf)
    state.update((count,))
    return pd.DataFrame({'key': [key[0]], 'countAsString': [str(count)]})


df = spark.readStream.format("rate").option("rowsPerSecond", 10).load().selectExpr("value % 3 as v")
df.groupBy(df["v"]).applyInPandasWithState(
    func, output_type, state_type, "Update", GroupStateTimeout.NoTimeout
).writeStream.format("console").queryName("test").start()

How was this patch tested?

Manually tested, unittests and e2e tests were added in both Python and Scala sides.

@HyukjinKwon HyukjinKwon marked this pull request as draft July 26, 2022 04:41
@HyukjinKwon HyukjinKwon force-pushed the flatmapgroupswithstate-pyspark branch 3 times, most recently from 876df13 to fe31805 Compare July 26, 2022 08:45
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 27, 2022

cc @viirya and @ueshin FYI who I think are the best ones who can review. I will attach some design docs, etc soon around next week likely.

@HyukjinKwon HyukjinKwon force-pushed the flatmapgroupswithstate-pyspark branch from 867f834 to e0a6c63 Compare July 31, 2022 07:13
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain the (de)serialization format.

@HyukjinKwon HyukjinKwon force-pushed the flatmapgroupswithstate-pyspark branch from e0a6c63 to 8f7afbb Compare August 11, 2022 01:37
@HyukjinKwon HyukjinKwon force-pushed the flatmapgroupswithstate-pyspark branch from 8f7afbb to 05fb4e4 Compare August 11, 2022 03:57
HyukjinKwon added a commit that referenced this pull request Sep 13, 2022
…t trait

### What changes were proposed in this pull request?

This PR proposes to factor the common attributes out from `FlatMapGroupsWithStateExec` to `FlatMapGroupsWithStateExecBase`.

### Why are the changes needed?

There are a lot of stuff to share if you implement another version of `FlatMapGroupsWithStateExec`.
Should better factor them out. This is also part of #37285 which demonstrates how the refactored trait is used.

### Does this PR introduce _any_ user-facing change?

No, this is refactoring-only.

### How was this patch tested?

Existing test cases should cover it.

Closes #37859 from HyukjinKwon/SPARK-40411.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HeartSaVioR
Copy link
Contributor

We can close this now.

@HyukjinKwon HyukjinKwon deleted the flatmapgroupswithstate-pyspark branch January 15, 2024 00:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants