Skip to content

Commit

Permalink
Parquet to bypass Arrow (#3)
Browse files Browse the repository at this point in the history
* Introduce fastparquet on the Python side and parquet-wasm on the JS side to bypass the Arrow serialization for DataFrame

* Patch DataEditor not to use PyArrow

* Add setTimeout() so the import of parquet-wasm to work in @stlite/mountable

* Fix comments

* Fix incompatibilities with some column types

* Change logic to handle lists from fastparquet

* Move the decoding above the string parsing

---------

Co-authored-by: lukasmasuch <lukas.masuch@gmail.com>
  • Loading branch information
whitphx and lukasmasuch committed Mar 15, 2024
1 parent f7c9950 commit ce5d10e
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 23 deletions.
1 change: 1 addition & 0 deletions frontend/lib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"native-file-system-adapter": "^3.0.0",
"node-emoji": "^1.11.0",
"numbro": "^2.3.6",
"parquet-wasm": "^0.4.0",
"plotly.js": "^2.26.1",
"protobufjs": "^7.2.4",
"query-string": "^8.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ export function toSafeArray(data: any): any[] {
return [data]
}

if (data instanceof Uint8Array) {
// Stlite: Uint8Array is used for any list data in fastparquet.
// It stores a json string representation in the Uint8Array.
// We need to convert this to a string first
// to later have it load as json.
data = new TextDecoder("utf-8").decode(data)
}

if (typeof data === "string") {
if (data === "") {
// Empty string
Expand Down
29 changes: 27 additions & 2 deletions frontend/lib/src/dataframes/Quiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ import numbro from "numbro"
import { IArrow, Styler as StylerProto } from "@streamlit/lib/src/proto"
import { notNullOrUndefined } from "@streamlit/lib/src/util/utils"

import type { readParquet as readParquetType } from "parquet-wasm"

// Stlite: Use parquet to bypass the Arrow implementation which is unavailable in the Wasm Python environment.
// See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887
// NOTE: Async import is necessary for the `parquet-wasm` package to work.
// If it's imported statically, the following error will be thrown when `readParquet` is called:
// `TypeError: Cannot read properties of undefined (reading '__wbindgen_add_to_stack_pointer')`
// Ref: https://github.com/kylebarron/parquet-wasm/issues/27
// HACK: Strictly speaking, there is no guarantee that the `readParquet` function
// async-imported in the following code will be ready when it's called in the `Quiver` class's constructor,
// but it seems to work fine in practice.
let readParquet: typeof readParquetType | undefined = undefined
setTimeout(() =>
// `setTimeout()` is required for this lazy loading to work in the mountable package
// where `__webpack_public_path__` is set at runtime, as this `setTimeout()` ensures that
// this `import()` is run after `__webpack_public_path__` is patched.
import("parquet-wasm").then(parquet => {
readParquet = parquet.readParquet
})
)

/** Data types used by ArrowJS. */
export type DataType =
| null
Expand Down Expand Up @@ -434,7 +455,9 @@ export class Quiver {
private readonly _styler?: Styler

constructor(element: IArrow) {
const table = tableFromIPC(element.data)
const table = tableFromIPC(
element.data ? readParquet!(element.data) : element.data
)
const schema = Quiver.parseSchema(table)
const rawColumns = Quiver.getRawColumns(schema)
const fields = Quiver.parseFields(table.schema)
Expand Down Expand Up @@ -1048,7 +1071,9 @@ but was expecting \`${JSON.stringify(expectedIndexTypes)}\`.
public static getTypeName(type: Type): IndexTypeName | string {
// For `PeriodType` and `IntervalType` types are kept in `numpy_type`,
// for the rest of the indexes in `pandas_type`.
return type.pandas_type === "object" ? type.numpy_type : type.pandas_type
const typeName =
type.pandas_type === "object" ? type.numpy_type : type.pandas_type
return typeName.toLowerCase().trim()
}

/** Takes the data and it's type and nicely formats it. */
Expand Down
3 changes: 3 additions & 0 deletions lib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
if not os.getenv("SNOWPARK_CONDA_BUILD"):
INSTALL_REQUIRES.extend(SNOWPARK_CONDA_EXCLUDED_DEPENDENCIES)

# stlite: See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887
INSTALL_REQUIRES.extend(["fastparquet"])

EXTRA_REQUIRES = {
"snowflake": [
"snowflake-snowpark-python>=0.9.0; python_version=='3.8'",
Expand Down
59 changes: 57 additions & 2 deletions lib/streamlit/elements/lib/column_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def _determine_data_kind(


def determine_dataframe_schema(
data_df: DataFrame, arrow_schema: pa.Schema
data_df: DataFrame, arrow_schema: pa.Schema | None
) -> DataframeSchema:
"""Determine the schema of a dataframe.
Expand Down Expand Up @@ -388,7 +388,7 @@ def determine_dataframe_schema(
for i, column in enumerate(data_df.items()):
column_name, column_data = column
dataframe_schema[column_name] = _determine_data_kind(
column_data, arrow_schema.field(i)
column_data, arrow_schema.field(i) if arrow_schema else None
)
return dataframe_schema

Expand Down Expand Up @@ -486,10 +486,65 @@ def apply_data_specific_configs(
Whether to check if the data is compatible with arrow.
"""
import pandas as pd
from pandas.api.types import infer_dtype, is_categorical_dtype

# Deactivate editing for columns that are not compatible with arrow
if check_arrow_compatibility:
# Stlite: Fix non-string column names (not supported by fastparquet):
if infer_dtype(data_df.columns) != "string":
data_df.columns = data_df.columns.astype("string")

for column_name, column_data in data_df.items():
# Stlite: Configure column types for some aspects
# that are not working out of the box with the parquet serialization.
if column_name not in columns_config:
if is_categorical_dtype(column_data.dtype):
update_column_config(
columns_config,
column_name,
{
"type_config": {
"type": "selectbox",
"options": column_data.cat.categories.tolist(),
},
},
)
if column_data.dtype == "object":
inferred_type = infer_dtype(column_data, skipna=True)
if inferred_type in ["string", "empty"]:
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "text"}},
)
elif inferred_type == "boolean":
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "checkbox"}},
)
elif inferred_type == "date":
data_df[column_name] = pd.to_datetime(
column_data.astype("string"), errors="coerce"
)
column_data = data_df[column_name]
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "date"}},
)
continue
elif inferred_type == "time":
data_df[column_name] = pd.to_datetime(
column_data.astype("string"), errors="coerce"
)
column_data = data_df[column_name]
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "time"}},
)

if is_colum_type_arrow_incompatible(column_data):
update_column_config(columns_config, column_name, {"disabled": True})
# Convert incompatible type to string
Expand Down
30 changes: 19 additions & 11 deletions lib/streamlit/elements/widgets/data_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,20 +833,25 @@ def data_editor(
for column in disabled:
update_column_config(column_config_mapping, column, {"disabled": True})

# Convert the dataframe to an arrow table which is used as the main
# serialization format for sending the data to the frontend.
# We also utilize the arrow schema to determine the data kinds of every column.
arrow_table = pa.Table.from_pandas(data_df)

# Determine the dataframe schema which is required for parsing edited values
# and for checking type compatibilities.
dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema)
# stlite: Don't use Arrow
# # Convert the dataframe to an arrow table which is used as the main
# # serialization format for sending the data to the frontend.
# # We also utilize the arrow schema to determine the data kinds of every column.
# arrow_table = pa.Table.from_pandas(data_df)

# stlite: arrow_table.schema can't be used as Arrow is not available.
# # Determine the dataframe schema which is required for parsing edited values
# # and for checking type compatibilities.
# dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema)
dataframe_schema = determine_dataframe_schema(data_df, None)

# Check if all configured column types are compatible with the underlying data.
# Throws an exception if any of the configured types are incompatible.
_check_type_compatibilities(data_df, column_config_mapping, dataframe_schema)

arrow_bytes = type_util.pyarrow_table_to_bytes(arrow_table)
# stlite: Don't use Arrow
# arrow_bytes = type_util.pyarrow_table_to_bytes(arrow_table)
df_bytes = type_util.data_frame_to_bytes(data_df)

# We want to do this as early as possible to avoid introducing nondeterminism,
# but it isn't clear how much processing is needed to have the data in a
Expand All @@ -856,7 +861,8 @@ def data_editor(
id = compute_widget_id(
"data_editor",
user_key=key,
data=arrow_bytes,
# data=arrow_bytes,
data=df_bytes,
width=width,
height=height,
use_container_width=use_container_width,
Expand Down Expand Up @@ -907,7 +913,9 @@ def data_editor(
data.set_uuid(styler_uuid)
marshall_styler(proto, data, styler_uuid)

proto.data = arrow_bytes
# stlite: Don't use Arrow. `type_util.data_frame_to_bytes` is polyfilled to use Parquet instead for stlite.
# proto.data = arrow_bytes
proto.data = df_bytes

marshall_column_config(proto, column_config_mapping)

Expand Down
65 changes: 57 additions & 8 deletions lib/streamlit/type_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import contextlib
import copy
import io
import math
import re
import types
Expand Down Expand Up @@ -848,13 +849,27 @@ def pyarrow_table_to_bytes(table: pa.Table) -> bytes:

def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
"""Return True if the column type is known to cause issues during Arrow conversion."""
from pandas.api.types import infer_dtype, is_dict_like, is_list_like
from pandas.api.types import (
infer_dtype,
is_dict_like,
is_interval_dtype,
is_list_like,
is_period_dtype,
)

if column.dtype.kind in [
"c", # complex64, complex128, complex256
]:
return True

# Stlite: not supported by fastparquet:
if is_interval_dtype(column.dtype):
return True

# Stlite: not supported by fastparquet:
if is_period_dtype(column.dtype):
return True

if column.dtype == "object":
# The dtype of mixed type columns is always object, the actual type of the column
# values can be determined via the infer_dtype function:
Expand All @@ -864,6 +879,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
if inferred_type in [
"mixed-integer",
"complex",
# Stlite: not supported by fastparquet (as object types):
"date",
"time",
"datetime",
]:
return True
elif inferred_type == "mixed":
Expand All @@ -883,6 +902,10 @@ def is_colum_type_arrow_incompatible(column: Series[Any] | Index) -> bool:
or is_dict_like(first_value)
# Frozensets are list-like, but are not compatible with pyarrow.
or isinstance(first_value, frozenset)
# Stlite: not supported by fastparquet:
or isinstance(first_value, set)
or isinstance(first_value, tuple)
or infer_dtype(first_value, skipna=True) in ["datetime"]
):
# This seems to be an incompatible list-like type
return True
Expand Down Expand Up @@ -915,6 +938,7 @@ def fix_arrow_incompatible_column_types(
The fixed dataframe.
"""
import pandas as pd
from pandas.api.types import infer_dtype

# Make a copy, but only initialize if necessary to preserve memory.
df_copy: DataFrame | None = None
Expand All @@ -938,31 +962,56 @@ def fix_arrow_incompatible_column_types(
if df_copy is None:
df_copy = df.copy()
df_copy.index = df.index.astype("string")

# Stlite: fastparquet does not support non-string column names:
if infer_dtype(df.columns) != "string":
if df_copy is None:
df_copy = df.copy()
df_copy.columns = df.columns.astype("string")

return df_copy if df_copy is not None else df


# `pd.DataFrame.to_parquet()` always closes the file handle,
# but we need to keep it open to get the written data.
# So we use this custom class to prevent the closing.
# https://github.com/dask/fastparquet/issues/868
class UnclosableBytesIO(io.BytesIO):
def close(self):
pass

def really_close(self):
super().close()


def data_frame_to_bytes(df: DataFrame) -> bytes:
"""Serialize pandas.DataFrame to bytes using Apache Arrow.
"""Serialize pandas.DataFrame to bytes using Apache ~~Arrow~~ Parquet.
This function is customized from the original one to use Parquet instead of Arrow
for stlite. See https://github.com/whitphx/stlite/issues/509
Parameters
----------
df : pandas.DataFrame
A dataframe to convert.
"""
import pyarrow as pa
buf = UnclosableBytesIO()

try:
table = pa.Table.from_pandas(df)
except (pa.ArrowTypeError, pa.ArrowInvalid, pa.ArrowNotImplementedError) as ex:
df.to_parquet(buf, engine="fastparquet")
except ValueError as ex:
_LOGGER.info(
"Serialization of dataframe to Arrow table was unsuccessful due to: %s. "
"Serialization of dataframe to Parquet table was unsuccessful due to: %s. "
"Applying automatic fixes for column types to make the dataframe Arrow-compatible.",
ex,
)
df = fix_arrow_incompatible_column_types(df)
table = pa.Table.from_pandas(df)
return pyarrow_table_to_bytes(table)
df.to_parquet(buf, engine="fastparquet")

data = buf.getvalue()
buf.really_close()

return data


def bytes_to_data_frame(source: bytes) -> DataFrame:
Expand Down

0 comments on commit ce5d10e

Please sign in to comment.