Skip to content

Commit

Permalink
Parquet to bypass Arrow (#3), Replace deprecated APIs of Pandas (#13)…
Browse files Browse the repository at this point in the history
…, Fix data_frame_to_bytes to catch TypeError in addition to ValueError (#14), Modification for Streamlit 1.27 (#7)

* Introduce fastparquet on the Python side and parquet-wasm on the JS side to bypass the Arrow serialization for DataFrame

* Patch DataEditor not to use PyArrow

* Add setTimeout() so the import of parquet-wasm to work in @stlite/mountable

* Fix comments

* Fix incompatibilities with some column types

* Change logic to handle lists from fastparquet

* Move the decoding above the string parsing

---------

Co-authored-by: lukasmasuch <lukas.masuch@gmail.com>
  • Loading branch information
whitphx and lukasmasuch committed Aug 30, 2024
1 parent b35dad8 commit 94a3aae
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ export function toSafeArray(data: any): any[] {
return [data]
}

if (data instanceof Uint8Array) {
// Stlite: Uint8Array is used for any list data in fastparquet.
// It stores a json string representation in the Uint8Array.
// We need to convert this to a string first
// to later have it load as json.
data = new TextDecoder("utf-8").decode(data)
}

if (typeof data === "string") {
if (data === "") {
// Empty string
Expand Down
29 changes: 27 additions & 2 deletions frontend/lib/src/dataframes/Quiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ import {
import { IArrow, Styler as StylerProto } from "@streamlit/lib/src/proto"
import { logWarning } from "@streamlit/lib/src/util/log"

import type { readParquet as readParquetType } from "parquet-wasm"

// Stlite: Use parquet to bypass the Arrow implementation which is unavailable in the Wasm Python environment.
// See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887
// NOTE: Async import is necessary for the `parquet-wasm` package to work.
// If it's imported statically, the following error will be thrown when `readParquet` is called:
// `TypeError: Cannot read properties of undefined (reading '__wbindgen_add_to_stack_pointer')`
// Ref: https://github.com/kylebarron/parquet-wasm/issues/27
// HACK: Strictly speaking, there is no guarantee that the `readParquet` function
// async-imported in the following code will be ready when it's called in the `Quiver` class's constructor,
// but it seems to work fine in practice.
let readParquet: typeof readParquetType | undefined = undefined
setTimeout(() =>
// `setTimeout()` is required for this lazy loading to work in the mountable package
// where `__webpack_public_path__` is set at runtime, as this `setTimeout()` ensures that
// this `import()` is run after `__webpack_public_path__` is patched.
import("parquet-wasm").then(parquet => {
readParquet = parquet.readParquet
})
)

/** Data types used by ArrowJS. */
export type DataType =
| null
Expand Down Expand Up @@ -438,7 +459,9 @@ export class Quiver {
private readonly _styler?: Styler

constructor(element: IArrow) {
const table = tableFromIPC(element.data)
const table = tableFromIPC(
element.data ? readParquet!(element.data) : element.data
)
const schema = Quiver.parseSchema(table)
const rawColumns = Quiver.getRawColumns(schema)
const fields = Quiver.parseFields(table.schema)
Expand Down Expand Up @@ -1052,7 +1075,9 @@ but was expecting \`${JSON.stringify(expectedIndexTypes)}\`.
public static getTypeName(type: Type): IndexTypeName | string {
// For `PeriodType` and `IntervalType` types are kept in `numpy_type`,
// for the rest of the indexes in `pandas_type`.
return type.pandas_type === "object" ? type.numpy_type : type.pandas_type
const typeName =
type.pandas_type === "object" ? type.numpy_type : type.pandas_type
return typeName.toLowerCase().trim()
}

/** Takes the data and it's type and nicely formats it. */
Expand Down
3 changes: 3 additions & 0 deletions lib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
if not os.getenv("SNOWPARK_CONDA_BUILD"):
INSTALL_REQUIRES.extend(SNOWPARK_CONDA_EXCLUDED_DEPENDENCIES)

# stlite: See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887
INSTALL_REQUIRES.extend(["fastparquet"])

EXTRA_REQUIRES = {
"snowflake": [
"snowflake-snowpark-python[modin]>=1.17.0; python_version<'3.12'",
Expand Down
56 changes: 50 additions & 6 deletions lib/streamlit/dataframe_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import contextlib
import dataclasses
import inspect
import io
import math
import re
from collections import ChainMap, UserDict, UserList, deque
Expand Down Expand Up @@ -796,8 +797,22 @@ def convert_arrow_table_to_arrow_bytes(table: pa.Table) -> bytes:
return cast(bytes, sink.getvalue().to_pybytes())


# `pd.DataFrame.to_parquet()` always closes the file handle,
# but we need to keep it open to get the written data.
# So we use this custom class to prevent the closing.
# https://github.com/dask/fastparquet/issues/868
class UnclosableBytesIO(io.BytesIO):
def close(self):
pass

def really_close(self):
super().close()


def convert_pandas_df_to_arrow_bytes(df: DataFrame) -> bytes:
"""Serialize pandas.DataFrame to Arrow IPC bytes.
This function is customized from the original one to use Parquet instead of Arrow
for stlite. See https://github.com/whitphx/stlite/issues/509
Parameters
----------
Expand All @@ -809,20 +824,24 @@ def convert_pandas_df_to_arrow_bytes(df: DataFrame) -> bytes:
bytes
The serialized Arrow IPC bytes.
"""
import pyarrow as pa
buf = UnclosableBytesIO()

try:
table = pa.Table.from_pandas(df)
except (pa.ArrowTypeError, pa.ArrowInvalid, pa.ArrowNotImplementedError) as ex:
df.to_parquet(buf, engine="fastparquet")
except (ValueError, TypeError) as ex:
_LOGGER.info(
"Serialization of dataframe to Arrow table was unsuccessful due to: %s. "
"Serialization of dataframe to Parquet table was unsuccessful due to: %s. "
"Applying automatic fixes for column types to make the dataframe "
"Arrow-compatible.",
ex,
)
df = fix_arrow_incompatible_column_types(df)
table = pa.Table.from_pandas(df)
return convert_arrow_table_to_arrow_bytes(table)
df.to_parquet(buf, engine="fastparquet")

data = buf.getvalue()
buf.really_close()

return data


def convert_arrow_bytes_to_pandas_df(source: bytes) -> DataFrame:
Expand Down Expand Up @@ -1029,6 +1048,7 @@ def _maybe_truncate_table(
def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
"""Return True if the column type is known to cause issues during
Arrow conversion."""
import pandas as pd
from pandas.api.types import infer_dtype, is_dict_like, is_list_like

if column.dtype.kind in [
Expand All @@ -1048,6 +1068,14 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
}:
return True

# Stlite: not supported by fastparquet:
if isinstance(column.dtype, pd.IntervalDtype):
return True

# Stlite: not supported by fastparquet:
if isinstance(column.dtype, pd.PeriodDtype):
return True

if column.dtype == "object":
# The dtype of mixed type columns is always object, the actual type of the column
# values can be determined via the infer_dtype function:
Expand All @@ -1057,6 +1085,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
if inferred_type in [
"mixed-integer",
"complex",
# Stlite: not supported by fastparquet (as object types):
"date",
"time",
"datetime",
]:
return True
elif inferred_type == "mixed":
Expand All @@ -1078,6 +1110,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
or is_dict_like(first_value)
# Frozensets are list-like, but are not compatible with pyarrow.
or isinstance(first_value, frozenset)
# Stlite: not supported by fastparquet:
or isinstance(first_value, set)
or isinstance(first_value, tuple)
or infer_dtype(first_value, skipna=True) in ["datetime"]
):
# This seems to be an incompatible list-like type
return True
Expand Down Expand Up @@ -1110,6 +1146,7 @@ def fix_arrow_incompatible_column_types(
The fixed dataframe.
"""
import pandas as pd
from pandas.api.types import infer_dtype

# Make a copy, but only initialize if necessary to preserve memory.
df_copy: DataFrame | None = None
Expand All @@ -1133,6 +1170,13 @@ def fix_arrow_incompatible_column_types(
if df_copy is None:
df_copy = df.copy()
df_copy.index = df.index.astype("string")

# Stlite: fastparquet does not support non-string column names:
if infer_dtype(df.columns) != "string":
if df_copy is None:
df_copy = df.copy()
df_copy.columns = df.columns.astype("string")

return df_copy if df_copy is not None else df


Expand Down
4 changes: 2 additions & 2 deletions lib/streamlit/elements/lib/column_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def _determine_data_kind(


def determine_dataframe_schema(
data_df: DataFrame, arrow_schema: pa.Schema
data_df: DataFrame, arrow_schema: pa.Schema | None
) -> DataframeSchema:
"""Determine the schema of a dataframe.
Expand Down Expand Up @@ -390,7 +390,7 @@ def determine_dataframe_schema(
for i, column in enumerate(data_df.items()):
column_name, column_data = column
dataframe_schema[column_name] = _determine_data_kind(
column_data, arrow_schema.field(i)
column_data, arrow_schema.field(i) if arrow_schema else None
)
return dataframe_schema

Expand Down
67 changes: 59 additions & 8 deletions lib/streamlit/elements/widgets/data_editor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2024)
# Copyright (c) Yuichiro Tachibana (Tsuchiya) (2022-2024)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -780,7 +781,7 @@ def data_editor(
"""
# Lazy-loaded import
import pandas as pd
import pyarrow as pa
from pandas.api.types import infer_dtype, is_categorical_dtype

key = to_key(key)

Expand Down Expand Up @@ -821,8 +822,61 @@ def data_editor(
# Convert the user provided column config into the frontend compatible format:
column_config_mapping = process_config_mapping(column_config)

# Stlite: Fix non-string column names (not supported by fastparquet):
if infer_dtype(data_df.columns) != "string":
data_df.columns = data_df.columns.astype("string")

# Deactivate editing for columns that are not compatible with arrow
for column_name, column_data in data_df.items():
# Stlite: Configure column types for some aspects
# that are not working out of the box with the parquet serialization.
if column_name not in column_config_mapping:
if is_categorical_dtype(column_data.dtype):
update_column_config(
column_config_mapping,
column_name,
{
"type_config": {
"type": "selectbox",
"options": column_data.cat.categories.tolist(),
},
},
)
if column_data.dtype == "object":
inferred_type = infer_dtype(column_data, skipna=True)
if inferred_type in ["string", "empty"]:
update_column_config(
column_config_mapping,
column_name,
{"type_config": {"type": "text"}},
)
elif inferred_type == "boolean":
update_column_config(
column_config_mapping,
column_name,
{"type_config": {"type": "checkbox"}},
)
elif inferred_type == "date":
data_df[column_name] = pd.to_datetime(
column_data.astype("string"), errors="coerce"
)
column_data = data_df[column_name]
update_column_config(
column_config_mapping,
column_name,
{"type_config": {"type": "date"}},
)
continue
elif inferred_type == "time":
data_df[column_name] = pd.to_datetime(
column_data.astype("string"), errors="coerce"
)
column_data = data_df[column_name]
update_column_config(
column_config_mapping,
column_name,
{"type_config": {"type": "time"}},
)
if dataframe_util.is_colum_type_arrow_incompatible(column_data):
update_column_config(
column_config_mapping, column_name, {"disabled": True}
Expand Down Expand Up @@ -863,20 +917,17 @@ def data_editor(
for column in disabled:
update_column_config(column_config_mapping, column, {"disabled": True})

# Convert the dataframe to an arrow table which is used as the main
# serialization format for sending the data to the frontend.
# We also utilize the arrow schema to determine the data kinds of every column.
arrow_table = pa.Table.from_pandas(data_df)

# Determine the dataframe schema which is required for parsing edited values
# and for checking type compatibilities.
dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema)
# Stlite: arrow_table.schema can't be used as Arrow is not available.
dataframe_schema = determine_dataframe_schema(data_df, None)

# Check if all configured column types are compatible with the underlying data.
# Throws an exception if any of the configured types are incompatible.
_check_type_compatibilities(data_df, column_config_mapping, dataframe_schema)

arrow_bytes = dataframe_util.convert_arrow_table_to_arrow_bytes(arrow_table)
# Stlite: Don't use Arrow for conversion:
arrow_bytes = dataframe_util.convert_pandas_df_to_arrow_bytes(data_df)

# We want to do this as early as possible to avoid introducing nondeterminism,
# but it isn't clear how much processing is needed to have the data in a
Expand Down

0 comments on commit 94a3aae

Please sign in to comment.