From ce5d10e82cb59a99f6020ce5b54cae93599f93cd Mon Sep 17 00:00:00 2001 From: "Yuichiro Tachibana (Tsuchiya)" Date: Wed, 23 Aug 2023 19:34:36 +0900 Subject: [PATCH] Parquet to bypass Arrow (#3) * Introduce fastparquet on the Python side and parquet-wasm on the JS side to bypass the Arrow serialization for DataFrame * Patch DataEditor not to use PyArrow * Add setTimeout() so the import of parquet-wasm to work in @stlite/mountable * Fix comments * Fix incompatibilities with some column types * Change logic to handle lists from fastparquet * Move the decoding above the string parsing --------- Co-authored-by: lukasmasuch --- frontend/lib/package.json | 1 + .../widgets/DataFrame/columns/utils.ts | 8 +++ frontend/lib/src/dataframes/Quiver.ts | 29 ++++++++- lib/setup.py | 3 + .../elements/lib/column_config_utils.py | 59 ++++++++++++++++- lib/streamlit/elements/widgets/data_editor.py | 30 +++++---- lib/streamlit/type_util.py | 65 ++++++++++++++++--- 7 files changed, 172 insertions(+), 23 deletions(-) diff --git a/frontend/lib/package.json b/frontend/lib/package.json index 1ee79674633c..5cffc4be6e5c 100644 --- a/frontend/lib/package.json +++ b/frontend/lib/package.json @@ -70,6 +70,7 @@ "native-file-system-adapter": "^3.0.0", "node-emoji": "^1.11.0", "numbro": "^2.3.6", + "parquet-wasm": "^0.4.0", "plotly.js": "^2.26.1", "protobufjs": "^7.2.4", "query-string": "^8.1.0", diff --git a/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts b/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts index a37738de26f7..4bbcf4ab1fd6 100644 --- a/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts +++ b/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts @@ -279,6 +279,14 @@ export function toSafeArray(data: any): any[] { return [data] } + if (data instanceof Uint8Array) { + // Stlite: Uint8Array is used for any list data in fastparquet. + // It stores a json string representation in the Uint8Array. + // We need to convert this to a string first + // to later have it load as json. + data = new TextDecoder("utf-8").decode(data) + } + if (typeof data === "string") { if (data === "") { // Empty string diff --git a/frontend/lib/src/dataframes/Quiver.ts b/frontend/lib/src/dataframes/Quiver.ts index 479bb7cb8dee..69b7d078885e 100644 --- a/frontend/lib/src/dataframes/Quiver.ts +++ b/frontend/lib/src/dataframes/Quiver.ts @@ -40,6 +40,27 @@ import numbro from "numbro" import { IArrow, Styler as StylerProto } from "@streamlit/lib/src/proto" import { notNullOrUndefined } from "@streamlit/lib/src/util/utils" +import type { readParquet as readParquetType } from "parquet-wasm" + +// Stlite: Use parquet to bypass the Arrow implementation which is unavailable in the Wasm Python environment. +// See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887 +// NOTE: Async import is necessary for the `parquet-wasm` package to work. +// If it's imported statically, the following error will be thrown when `readParquet` is called: +// `TypeError: Cannot read properties of undefined (reading '__wbindgen_add_to_stack_pointer')` +// Ref: https://github.com/kylebarron/parquet-wasm/issues/27 +// HACK: Strictly speaking, there is no guarantee that the `readParquet` function +// async-imported in the following code will be ready when it's called in the `Quiver` class's constructor, +// but it seems to work fine in practice. +let readParquet: typeof readParquetType | undefined = undefined +setTimeout(() => + // `setTimeout()` is required for this lazy loading to work in the mountable package + // where `__webpack_public_path__` is set at runtime, as this `setTimeout()` ensures that + // this `import()` is run after `__webpack_public_path__` is patched. + import("parquet-wasm").then(parquet => { + readParquet = parquet.readParquet + }) +) + /** Data types used by ArrowJS. */ export type DataType = | null @@ -434,7 +455,9 @@ export class Quiver { private readonly _styler?: Styler constructor(element: IArrow) { - const table = tableFromIPC(element.data) + const table = tableFromIPC( + element.data ? readParquet!(element.data) : element.data + ) const schema = Quiver.parseSchema(table) const rawColumns = Quiver.getRawColumns(schema) const fields = Quiver.parseFields(table.schema) @@ -1048,7 +1071,9 @@ but was expecting \`${JSON.stringify(expectedIndexTypes)}\`. public static getTypeName(type: Type): IndexTypeName | string { // For `PeriodType` and `IntervalType` types are kept in `numpy_type`, // for the rest of the indexes in `pandas_type`. - return type.pandas_type === "object" ? type.numpy_type : type.pandas_type + const typeName = + type.pandas_type === "object" ? type.numpy_type : type.pandas_type + return typeName.toLowerCase().trim() } /** Takes the data and it's type and nicely formats it. */ diff --git a/lib/setup.py b/lib/setup.py index 157a79c02a50..c9ee5feefdbe 100644 --- a/lib/setup.py +++ b/lib/setup.py @@ -72,6 +72,9 @@ if not os.getenv("SNOWPARK_CONDA_BUILD"): INSTALL_REQUIRES.extend(SNOWPARK_CONDA_EXCLUDED_DEPENDENCIES) +# stlite: See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887 +INSTALL_REQUIRES.extend(["fastparquet"]) + EXTRA_REQUIRES = { "snowflake": [ "snowflake-snowpark-python>=0.9.0; python_version=='3.8'", diff --git a/lib/streamlit/elements/lib/column_config_utils.py b/lib/streamlit/elements/lib/column_config_utils.py index df4bf72bc317..167b72307bd0 100644 --- a/lib/streamlit/elements/lib/column_config_utils.py +++ b/lib/streamlit/elements/lib/column_config_utils.py @@ -359,7 +359,7 @@ def _determine_data_kind( def determine_dataframe_schema( - data_df: DataFrame, arrow_schema: pa.Schema + data_df: DataFrame, arrow_schema: pa.Schema | None ) -> DataframeSchema: """Determine the schema of a dataframe. @@ -388,7 +388,7 @@ def determine_dataframe_schema( for i, column in enumerate(data_df.items()): column_name, column_data = column dataframe_schema[column_name] = _determine_data_kind( - column_data, arrow_schema.field(i) + column_data, arrow_schema.field(i) if arrow_schema else None ) return dataframe_schema @@ -486,10 +486,65 @@ def apply_data_specific_configs( Whether to check if the data is compatible with arrow. """ import pandas as pd + from pandas.api.types import infer_dtype, is_categorical_dtype # Deactivate editing for columns that are not compatible with arrow if check_arrow_compatibility: + # Stlite: Fix non-string column names (not supported by fastparquet): + if infer_dtype(data_df.columns) != "string": + data_df.columns = data_df.columns.astype("string") + for column_name, column_data in data_df.items(): + # Stlite: Configure column types for some aspects + # that are not working out of the box with the parquet serialization. + if column_name not in columns_config: + if is_categorical_dtype(column_data.dtype): + update_column_config( + columns_config, + column_name, + { + "type_config": { + "type": "selectbox", + "options": column_data.cat.categories.tolist(), + }, + }, + ) + if column_data.dtype == "object": + inferred_type = infer_dtype(column_data, skipna=True) + if inferred_type in ["string", "empty"]: + update_column_config( + columns_config, + column_name, + {"type_config": {"type": "text"}}, + ) + elif inferred_type == "boolean": + update_column_config( + columns_config, + column_name, + {"type_config": {"type": "checkbox"}}, + ) + elif inferred_type == "date": + data_df[column_name] = pd.to_datetime( + column_data.astype("string"), errors="coerce" + ) + column_data = data_df[column_name] + update_column_config( + columns_config, + column_name, + {"type_config": {"type": "date"}}, + ) + continue + elif inferred_type == "time": + data_df[column_name] = pd.to_datetime( + column_data.astype("string"), errors="coerce" + ) + column_data = data_df[column_name] + update_column_config( + columns_config, + column_name, + {"type_config": {"type": "time"}}, + ) + if is_colum_type_arrow_incompatible(column_data): update_column_config(columns_config, column_name, {"disabled": True}) # Convert incompatible type to string diff --git a/lib/streamlit/elements/widgets/data_editor.py b/lib/streamlit/elements/widgets/data_editor.py index 843774db78ed..37d7dee96212 100644 --- a/lib/streamlit/elements/widgets/data_editor.py +++ b/lib/streamlit/elements/widgets/data_editor.py @@ -833,20 +833,25 @@ def data_editor( for column in disabled: update_column_config(column_config_mapping, column, {"disabled": True}) - # Convert the dataframe to an arrow table which is used as the main - # serialization format for sending the data to the frontend. - # We also utilize the arrow schema to determine the data kinds of every column. - arrow_table = pa.Table.from_pandas(data_df) - - # Determine the dataframe schema which is required for parsing edited values - # and for checking type compatibilities. - dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema) + # stlite: Don't use Arrow + # # Convert the dataframe to an arrow table which is used as the main + # # serialization format for sending the data to the frontend. + # # We also utilize the arrow schema to determine the data kinds of every column. + # arrow_table = pa.Table.from_pandas(data_df) + + # stlite: arrow_table.schema can't be used as Arrow is not available. + # # Determine the dataframe schema which is required for parsing edited values + # # and for checking type compatibilities. + # dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema) + dataframe_schema = determine_dataframe_schema(data_df, None) # Check if all configured column types are compatible with the underlying data. # Throws an exception if any of the configured types are incompatible. _check_type_compatibilities(data_df, column_config_mapping, dataframe_schema) - arrow_bytes = type_util.pyarrow_table_to_bytes(arrow_table) + # stlite: Don't use Arrow + # arrow_bytes = type_util.pyarrow_table_to_bytes(arrow_table) + df_bytes = type_util.data_frame_to_bytes(data_df) # We want to do this as early as possible to avoid introducing nondeterminism, # but it isn't clear how much processing is needed to have the data in a @@ -856,7 +861,8 @@ def data_editor( id = compute_widget_id( "data_editor", user_key=key, - data=arrow_bytes, + # data=arrow_bytes, + data=df_bytes, width=width, height=height, use_container_width=use_container_width, @@ -907,7 +913,9 @@ def data_editor( data.set_uuid(styler_uuid) marshall_styler(proto, data, styler_uuid) - proto.data = arrow_bytes + # stlite: Don't use Arrow. `type_util.data_frame_to_bytes` is polyfilled to use Parquet instead for stlite. + # proto.data = arrow_bytes + proto.data = df_bytes marshall_column_config(proto, column_config_mapping) diff --git a/lib/streamlit/type_util.py b/lib/streamlit/type_util.py index 8364134e9622..fa193b224534 100644 --- a/lib/streamlit/type_util.py +++ b/lib/streamlit/type_util.py @@ -18,6 +18,7 @@ import contextlib import copy +import io import math import re import types @@ -848,13 +849,27 @@ def pyarrow_table_to_bytes(table: pa.Table) -> bytes: def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: """Return True if the column type is known to cause issues during Arrow conversion.""" - from pandas.api.types import infer_dtype, is_dict_like, is_list_like + from pandas.api.types import ( + infer_dtype, + is_dict_like, + is_interval_dtype, + is_list_like, + is_period_dtype, + ) if column.dtype.kind in [ "c", # complex64, complex128, complex256 ]: return True + # Stlite: not supported by fastparquet: + if is_interval_dtype(column.dtype): + return True + + # Stlite: not supported by fastparquet: + if is_period_dtype(column.dtype): + return True + if column.dtype == "object": # The dtype of mixed type columns is always object, the actual type of the column # values can be determined via the infer_dtype function: @@ -864,6 +879,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: if inferred_type in [ "mixed-integer", "complex", + # Stlite: not supported by fastparquet (as object types): + "date", + "time", + "datetime", ]: return True elif inferred_type == "mixed": @@ -883,6 +902,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: or is_dict_like(first_value) # Frozensets are list-like, but are not compatible with pyarrow. or isinstance(first_value, frozenset) + # Stlite: not supported by fastparquet: + or isinstance(first_value, set) + or isinstance(first_value, tuple) + or infer_dtype(first_value, skipna=True) in ["datetime"] ): # This seems to be an incompatible list-like type return True @@ -915,6 +938,7 @@ def fix_arrow_incompatible_column_types( The fixed dataframe. """ import pandas as pd + from pandas.api.types import infer_dtype # Make a copy, but only initialize if necessary to preserve memory. df_copy: DataFrame | None = None @@ -938,11 +962,32 @@ def fix_arrow_incompatible_column_types( if df_copy is None: df_copy = df.copy() df_copy.index = df.index.astype("string") + + # Stlite: fastparquet does not support non-string column names: + if infer_dtype(df.columns) != "string": + if df_copy is None: + df_copy = df.copy() + df_copy.columns = df.columns.astype("string") + return df_copy if df_copy is not None else df +# `pd.DataFrame.to_parquet()` always closes the file handle, +# but we need to keep it open to get the written data. +# So we use this custom class to prevent the closing. +# https://github.com/dask/fastparquet/issues/868 +class UnclosableBytesIO(io.BytesIO): + def close(self): + pass + + def really_close(self): + super().close() + + def data_frame_to_bytes(df: DataFrame) -> bytes: - """Serialize pandas.DataFrame to bytes using Apache Arrow. + """Serialize pandas.DataFrame to bytes using Apache ~~Arrow~~ Parquet. + This function is customized from the original one to use Parquet instead of Arrow + for stlite. See https://github.com/whitphx/stlite/issues/509 Parameters ---------- @@ -950,19 +995,23 @@ def data_frame_to_bytes(df: DataFrame) -> bytes: A dataframe to convert. """ - import pyarrow as pa + buf = UnclosableBytesIO() try: - table = pa.Table.from_pandas(df) - except (pa.ArrowTypeError, pa.ArrowInvalid, pa.ArrowNotImplementedError) as ex: + df.to_parquet(buf, engine="fastparquet") + except ValueError as ex: _LOGGER.info( - "Serialization of dataframe to Arrow table was unsuccessful due to: %s. " + "Serialization of dataframe to Parquet table was unsuccessful due to: %s. " "Applying automatic fixes for column types to make the dataframe Arrow-compatible.", ex, ) df = fix_arrow_incompatible_column_types(df) - table = pa.Table.from_pandas(df) - return pyarrow_table_to_bytes(table) + df.to_parquet(buf, engine="fastparquet") + + data = buf.getvalue() + buf.really_close() + + return data def bytes_to_data_frame(source: bytes) -> DataFrame: