Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions python/pyspark/sql/pandas/group_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ def applyInPandasWithState(
per-group state. The result Dataset will represent the flattened record returned by the
function.

For a streaming Dataset, the function will be invoked first for all input groups and then
for all timed out states where the input data is set to be empty. Updates to each group's
state will be saved across invocations.
For a streaming :class:`DataFrame`, the function will be invoked first for all input groups
and then for all timed out states where the input data is set to be empty. Updates to each
group's state will be saved across invocations.

The function should take parameters (key, Iterator[`pandas.DataFrame`], state) and
return another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
Expand All @@ -257,10 +257,10 @@ def applyInPandasWithState(
user-defined state. The value of the state will be presented as a tuple, as well as the
update should be performed with the tuple. The corresponding Python types for
:class:DataType are supported. Please refer to the page
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python tab).
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (Python tab).

The size of each DataFrame in both the input and output can be arbitrary. The number of
DataFrames in both the input and output can also be arbitrary.
The size of each `pandas.DataFrame` in both the input and output can be arbitrary. The
number of `pandas.DataFrame` in both the input and output can also be arbitrary.

.. versionadded:: 3.4.0

Expand Down Expand Up @@ -294,6 +294,7 @@ def applyInPandasWithState(
... total_len += len(pdf)
... state.update((total_len,))
... yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]})
...
>>> df.groupby("id").applyInPandasWithState(
... count_fn, outputStructType="id long, countAsString string",
... stateStructType="len long", outputMode="Update",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,6 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: Array[ArrowFieldWriter]) {
count += 1
}

def sizeInBytes(): Int = {
var i = 0
var bytes = 0
while (i < fields.size) {
bytes += fields(i).getSizeInBytes()
i += 1
}
bytes
}

def finish(): Unit = {
root.setRowCount(count)
fields.foreach(_.finish())
Expand Down Expand Up @@ -142,10 +132,6 @@ private[arrow] abstract class ArrowFieldWriter {
count += 1
}

def getSizeInBytes(): Int = {
valueVector.getBufferSizeFor(count)
}

def finish(): Unit = {
valueVector.setValueCount(count)
}
Expand Down