From 94a3aae2e3718317eb9a56910c672fb334424ace Mon Sep 17 00:00:00 2001 From: "Yuichiro Tachibana (Tsuchiya)" Date: Wed, 23 Aug 2023 19:34:36 +0900 Subject: [PATCH] Parquet to bypass Arrow (#3), Replace deprecated APIs of Pandas (#13), Fix data_frame_to_bytes to catch TypeError in addition to ValueError (#14), Modification for Streamlit 1.27 (#7) * Introduce fastparquet on the Python side and parquet-wasm on the JS side to bypass the Arrow serialization for DataFrame * Patch DataEditor not to use PyArrow * Add setTimeout() so the import of parquet-wasm to work in @stlite/mountable * Fix comments * Fix incompatibilities with some column types * Change logic to handle lists from fastparquet * Move the decoding above the string parsing --------- Co-authored-by: lukasmasuch --- .../widgets/DataFrame/columns/utils.ts | 8 +++ frontend/lib/src/dataframes/Quiver.ts | 29 +++++++- lib/setup.py | 3 + lib/streamlit/dataframe_util.py | 56 ++++++++++++++-- .../elements/lib/column_config_utils.py | 4 +- lib/streamlit/elements/widgets/data_editor.py | 67 ++++++++++++++++--- 6 files changed, 149 insertions(+), 18 deletions(-) diff --git a/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts b/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts index 66117b4d09bf4..1e1f4c45900c2 100644 --- a/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts +++ b/frontend/lib/src/components/widgets/DataFrame/columns/utils.ts @@ -282,6 +282,14 @@ export function toSafeArray(data: any): any[] { return [data] } + if (data instanceof Uint8Array) { + // Stlite: Uint8Array is used for any list data in fastparquet. + // It stores a json string representation in the Uint8Array. + // We need to convert this to a string first + // to later have it load as json. + data = new TextDecoder("utf-8").decode(data) + } + if (typeof data === "string") { if (data === "") { // Empty string diff --git a/frontend/lib/src/dataframes/Quiver.ts b/frontend/lib/src/dataframes/Quiver.ts index edc1f14f3f912..87e947c4fcd03 100644 --- a/frontend/lib/src/dataframes/Quiver.ts +++ b/frontend/lib/src/dataframes/Quiver.ts @@ -44,6 +44,27 @@ import { import { IArrow, Styler as StylerProto } from "@streamlit/lib/src/proto" import { logWarning } from "@streamlit/lib/src/util/log" +import type { readParquet as readParquetType } from "parquet-wasm" + +// Stlite: Use parquet to bypass the Arrow implementation which is unavailable in the Wasm Python environment. +// See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887 +// NOTE: Async import is necessary for the `parquet-wasm` package to work. +// If it's imported statically, the following error will be thrown when `readParquet` is called: +// `TypeError: Cannot read properties of undefined (reading '__wbindgen_add_to_stack_pointer')` +// Ref: https://github.com/kylebarron/parquet-wasm/issues/27 +// HACK: Strictly speaking, there is no guarantee that the `readParquet` function +// async-imported in the following code will be ready when it's called in the `Quiver` class's constructor, +// but it seems to work fine in practice. +let readParquet: typeof readParquetType | undefined = undefined +setTimeout(() => + // `setTimeout()` is required for this lazy loading to work in the mountable package + // where `__webpack_public_path__` is set at runtime, as this `setTimeout()` ensures that + // this `import()` is run after `__webpack_public_path__` is patched. + import("parquet-wasm").then(parquet => { + readParquet = parquet.readParquet + }) +) + /** Data types used by ArrowJS. */ export type DataType = | null @@ -438,7 +459,9 @@ export class Quiver { private readonly _styler?: Styler constructor(element: IArrow) { - const table = tableFromIPC(element.data) + const table = tableFromIPC( + element.data ? readParquet!(element.data) : element.data + ) const schema = Quiver.parseSchema(table) const rawColumns = Quiver.getRawColumns(schema) const fields = Quiver.parseFields(table.schema) @@ -1052,7 +1075,9 @@ but was expecting \`${JSON.stringify(expectedIndexTypes)}\`. public static getTypeName(type: Type): IndexTypeName | string { // For `PeriodType` and `IntervalType` types are kept in `numpy_type`, // for the rest of the indexes in `pandas_type`. - return type.pandas_type === "object" ? type.numpy_type : type.pandas_type + const typeName = + type.pandas_type === "object" ? type.numpy_type : type.pandas_type + return typeName.toLowerCase().trim() } /** Takes the data and it's type and nicely formats it. */ diff --git a/lib/setup.py b/lib/setup.py index d20ab292e8da8..e0f57a26f8374 100644 --- a/lib/setup.py +++ b/lib/setup.py @@ -69,6 +69,9 @@ if not os.getenv("SNOWPARK_CONDA_BUILD"): INSTALL_REQUIRES.extend(SNOWPARK_CONDA_EXCLUDED_DEPENDENCIES) +# stlite: See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887 +INSTALL_REQUIRES.extend(["fastparquet"]) + EXTRA_REQUIRES = { "snowflake": [ "snowflake-snowpark-python[modin]>=1.17.0; python_version<'3.12'", diff --git a/lib/streamlit/dataframe_util.py b/lib/streamlit/dataframe_util.py index fa4de5bbb629f..d7900f69e5c9b 100644 --- a/lib/streamlit/dataframe_util.py +++ b/lib/streamlit/dataframe_util.py @@ -19,6 +19,7 @@ import contextlib import dataclasses import inspect +import io import math import re from collections import ChainMap, UserDict, UserList, deque @@ -796,8 +797,22 @@ def convert_arrow_table_to_arrow_bytes(table: pa.Table) -> bytes: return cast(bytes, sink.getvalue().to_pybytes()) +# `pd.DataFrame.to_parquet()` always closes the file handle, +# but we need to keep it open to get the written data. +# So we use this custom class to prevent the closing. +# https://github.com/dask/fastparquet/issues/868 +class UnclosableBytesIO(io.BytesIO): + def close(self): + pass + + def really_close(self): + super().close() + + def convert_pandas_df_to_arrow_bytes(df: DataFrame) -> bytes: """Serialize pandas.DataFrame to Arrow IPC bytes. + This function is customized from the original one to use Parquet instead of Arrow + for stlite. See https://github.com/whitphx/stlite/issues/509 Parameters ---------- @@ -809,20 +824,24 @@ def convert_pandas_df_to_arrow_bytes(df: DataFrame) -> bytes: bytes The serialized Arrow IPC bytes. """ - import pyarrow as pa + buf = UnclosableBytesIO() try: - table = pa.Table.from_pandas(df) - except (pa.ArrowTypeError, pa.ArrowInvalid, pa.ArrowNotImplementedError) as ex: + df.to_parquet(buf, engine="fastparquet") + except (ValueError, TypeError) as ex: _LOGGER.info( - "Serialization of dataframe to Arrow table was unsuccessful due to: %s. " + "Serialization of dataframe to Parquet table was unsuccessful due to: %s. " "Applying automatic fixes for column types to make the dataframe " "Arrow-compatible.", ex, ) df = fix_arrow_incompatible_column_types(df) - table = pa.Table.from_pandas(df) - return convert_arrow_table_to_arrow_bytes(table) + df.to_parquet(buf, engine="fastparquet") + + data = buf.getvalue() + buf.really_close() + + return data def convert_arrow_bytes_to_pandas_df(source: bytes) -> DataFrame: @@ -1029,6 +1048,7 @@ def _maybe_truncate_table( def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: """Return True if the column type is known to cause issues during Arrow conversion.""" + import pandas as pd from pandas.api.types import infer_dtype, is_dict_like, is_list_like if column.dtype.kind in [ @@ -1048,6 +1068,14 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: }: return True + # Stlite: not supported by fastparquet: + if isinstance(column.dtype, pd.IntervalDtype): + return True + + # Stlite: not supported by fastparquet: + if isinstance(column.dtype, pd.PeriodDtype): + return True + if column.dtype == "object": # The dtype of mixed type columns is always object, the actual type of the column # values can be determined via the infer_dtype function: @@ -1057,6 +1085,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: if inferred_type in [ "mixed-integer", "complex", + # Stlite: not supported by fastparquet (as object types): + "date", + "time", + "datetime", ]: return True elif inferred_type == "mixed": @@ -1078,6 +1110,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool: or is_dict_like(first_value) # Frozensets are list-like, but are not compatible with pyarrow. or isinstance(first_value, frozenset) + # Stlite: not supported by fastparquet: + or isinstance(first_value, set) + or isinstance(first_value, tuple) + or infer_dtype(first_value, skipna=True) in ["datetime"] ): # This seems to be an incompatible list-like type return True @@ -1110,6 +1146,7 @@ def fix_arrow_incompatible_column_types( The fixed dataframe. """ import pandas as pd + from pandas.api.types import infer_dtype # Make a copy, but only initialize if necessary to preserve memory. df_copy: DataFrame | None = None @@ -1133,6 +1170,13 @@ def fix_arrow_incompatible_column_types( if df_copy is None: df_copy = df.copy() df_copy.index = df.index.astype("string") + + # Stlite: fastparquet does not support non-string column names: + if infer_dtype(df.columns) != "string": + if df_copy is None: + df_copy = df.copy() + df_copy.columns = df.columns.astype("string") + return df_copy if df_copy is not None else df diff --git a/lib/streamlit/elements/lib/column_config_utils.py b/lib/streamlit/elements/lib/column_config_utils.py index 9ba9508743931..472b306a6dd94 100644 --- a/lib/streamlit/elements/lib/column_config_utils.py +++ b/lib/streamlit/elements/lib/column_config_utils.py @@ -361,7 +361,7 @@ def _determine_data_kind( def determine_dataframe_schema( - data_df: DataFrame, arrow_schema: pa.Schema + data_df: DataFrame, arrow_schema: pa.Schema | None ) -> DataframeSchema: """Determine the schema of a dataframe. @@ -390,7 +390,7 @@ def determine_dataframe_schema( for i, column in enumerate(data_df.items()): column_name, column_data = column dataframe_schema[column_name] = _determine_data_kind( - column_data, arrow_schema.field(i) + column_data, arrow_schema.field(i) if arrow_schema else None ) return dataframe_schema diff --git a/lib/streamlit/elements/widgets/data_editor.py b/lib/streamlit/elements/widgets/data_editor.py index 3965bdb89327a..9833f6e7c62c9 100644 --- a/lib/streamlit/elements/widgets/data_editor.py +++ b/lib/streamlit/elements/widgets/data_editor.py @@ -1,4 +1,5 @@ # Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2024) +# Copyright (c) Yuichiro Tachibana (Tsuchiya) (2022-2024) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -780,7 +781,7 @@ def data_editor( """ # Lazy-loaded import import pandas as pd - import pyarrow as pa + from pandas.api.types import infer_dtype, is_categorical_dtype key = to_key(key) @@ -821,8 +822,61 @@ def data_editor( # Convert the user provided column config into the frontend compatible format: column_config_mapping = process_config_mapping(column_config) + # Stlite: Fix non-string column names (not supported by fastparquet): + if infer_dtype(data_df.columns) != "string": + data_df.columns = data_df.columns.astype("string") + # Deactivate editing for columns that are not compatible with arrow for column_name, column_data in data_df.items(): + # Stlite: Configure column types for some aspects + # that are not working out of the box with the parquet serialization. + if column_name not in column_config_mapping: + if is_categorical_dtype(column_data.dtype): + update_column_config( + column_config_mapping, + column_name, + { + "type_config": { + "type": "selectbox", + "options": column_data.cat.categories.tolist(), + }, + }, + ) + if column_data.dtype == "object": + inferred_type = infer_dtype(column_data, skipna=True) + if inferred_type in ["string", "empty"]: + update_column_config( + column_config_mapping, + column_name, + {"type_config": {"type": "text"}}, + ) + elif inferred_type == "boolean": + update_column_config( + column_config_mapping, + column_name, + {"type_config": {"type": "checkbox"}}, + ) + elif inferred_type == "date": + data_df[column_name] = pd.to_datetime( + column_data.astype("string"), errors="coerce" + ) + column_data = data_df[column_name] + update_column_config( + column_config_mapping, + column_name, + {"type_config": {"type": "date"}}, + ) + continue + elif inferred_type == "time": + data_df[column_name] = pd.to_datetime( + column_data.astype("string"), errors="coerce" + ) + column_data = data_df[column_name] + update_column_config( + column_config_mapping, + column_name, + {"type_config": {"type": "time"}}, + ) if dataframe_util.is_colum_type_arrow_incompatible(column_data): update_column_config( column_config_mapping, column_name, {"disabled": True} @@ -863,20 +917,17 @@ def data_editor( for column in disabled: update_column_config(column_config_mapping, column, {"disabled": True}) - # Convert the dataframe to an arrow table which is used as the main - # serialization format for sending the data to the frontend. - # We also utilize the arrow schema to determine the data kinds of every column. - arrow_table = pa.Table.from_pandas(data_df) - # Determine the dataframe schema which is required for parsing edited values # and for checking type compatibilities. - dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema) + # Stlite: arrow_table.schema can't be used as Arrow is not available. + dataframe_schema = determine_dataframe_schema(data_df, None) # Check if all configured column types are compatible with the underlying data. # Throws an exception if any of the configured types are incompatible. _check_type_compatibilities(data_df, column_config_mapping, dataframe_schema) - arrow_bytes = dataframe_util.convert_arrow_table_to_arrow_bytes(arrow_table) + # Stlite: Don't use Arrow for conversion: + arrow_bytes = dataframe_util.convert_pandas_df_to_arrow_bytes(data_df) # We want to do this as early as possible to avoid introducing nondeterminism, # but it isn't clear how much processing is needed to have the data in a