diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py index 0945c0078a2a..c34a285144f0 100644 --- a/python/pyspark/sql/pandas/group_ops.py +++ b/python/pyspark/sql/pandas/group_ops.py @@ -231,9 +231,9 @@ def applyInPandasWithState( per-group state. The result Dataset will represent the flattened record returned by the function. - For a streaming Dataset, the function will be invoked first for all input groups and then - for all timed out states where the input data is set to be empty. Updates to each group's - state will be saved across invocations. + For a streaming :class:`DataFrame`, the function will be invoked first for all input groups + and then for all timed out states where the input data is set to be empty. Updates to each + group's state will be saved across invocations. The function should take parameters (key, Iterator[`pandas.DataFrame`], state) and return another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple @@ -257,10 +257,10 @@ def applyInPandasWithState( user-defined state. The value of the state will be presented as a tuple, as well as the update should be performed with the tuple. The corresponding Python types for :class:DataType are supported. Please refer to the page - https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python tab). + https://spark.apache.org/docs/latest/sql-ref-datatypes.html (Python tab). - The size of each DataFrame in both the input and output can be arbitrary. The number of - DataFrames in both the input and output can also be arbitrary. + The size of each `pandas.DataFrame` in both the input and output can be arbitrary. The + number of `pandas.DataFrame` in both the input and output can also be arbitrary. .. versionadded:: 3.4.0 @@ -294,6 +294,7 @@ def applyInPandasWithState( ... total_len += len(pdf) ... state.update((total_len,)) ... yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]}) + ... >>> df.groupby("id").applyInPandasWithState( ... count_fn, outputStructType="id long, countAsString string", ... stateStructType="len long", outputMode="Update", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 2988c0fb5187..34e128a4925f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -98,16 +98,6 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: Array[ArrowFieldWriter]) { count += 1 } - def sizeInBytes(): Int = { - var i = 0 - var bytes = 0 - while (i < fields.size) { - bytes += fields(i).getSizeInBytes() - i += 1 - } - bytes - } - def finish(): Unit = { root.setRowCount(count) fields.foreach(_.finish()) @@ -142,10 +132,6 @@ private[arrow] abstract class ArrowFieldWriter { count += 1 } - def getSizeInBytes(): Int = { - valueVector.getBufferSizeFor(count) - } - def finish(): Unit = { valueVector.setValueCount(count) }