Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet to bypass Arrow #3

Merged
merged 10 commits into from
Aug 23, 2023
1 change: 1 addition & 0 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"moment-timezone": "^0.5.40",
"node-emoji": "^1.11.0",
"numbro": "^2.3.6",
"parquet-wasm": "^0.4.0",
"plotly.js": "^2.18.1",
"prismjs": "^1.29.0",
"protobufjs": "^7.2.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ export function toSafeArray(data: any): any[] {
return [data]
}

if (data instanceof Uint8Array) {
// Stlite: Uint8Array is used for any list data in fastparquet.
// It stores a json string representation in the Uint8Array.
// We need to convert this to a string first
// to later have it load as json.
data = new TextDecoder("utf-8").decode(data)
}

if (typeof data === "string") {
if (data === "") {
// Empty string
Expand Down
29 changes: 27 additions & 2 deletions frontend/src/lib/dataframes/Quiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@ import numbro from "numbro"
import { IArrow, Styler as StylerProto } from "src/lib/proto"
import { notNullOrUndefined } from "src/lib/util/utils"

import type { readParquet as readParquetType } from "parquet-wasm"

// Stlite: Use parquet to bypass the Arrow implementation which is unavailable in the Wasm Python environment.
// See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887
// NOTE: Async import is necessary for the `parquet-wasm` package to work.
// If it's imported statically, the following error will be thrown when `readParquet` is called:
// `TypeError: Cannot read properties of undefined (reading '__wbindgen_add_to_stack_pointer')`
// Ref: https://github.com/kylebarron/parquet-wasm/issues/27
// HACK: Strictly speaking, there is no guarantee that the `readParquet` function
// async-imported in the following code will be ready when it's called in the `Quiver` class's constructor,
// but it seems to work fine in practice.
let readParquet: typeof readParquetType | undefined = undefined
setTimeout(() =>
// `setTimeout()` is required for this lazy loading to work in the mountable package
// where `__webpack_public_path__` is set at runtime, as this `setTimeout()` ensures that
// this `import()` is run after `__webpack_public_path__` is patched.
import("parquet-wasm").then(parquet => {
readParquet = parquet.readParquet
})
)

/** Data types used by ArrowJS. */
export type DataType =
| null
Expand Down Expand Up @@ -384,7 +405,9 @@ export class Quiver {
private readonly _styler?: Styler

constructor(element: IArrow) {
const table = tableFromIPC(element.data)
const table = tableFromIPC(
element.data ? readParquet!(element.data) : element.data
)
const schema = Quiver.parseSchema(table)
const rawColumns = Quiver.getRawColumns(schema)
const fields = Quiver.parseFields(table.schema)
Expand Down Expand Up @@ -937,7 +960,9 @@ but was expecting \`${JSON.stringify(expectedIndexTypes)}\`.
public static getTypeName(type: Type): IndexTypeName | string {
// For `PeriodType` and `IntervalType` types are kept in `numpy_type`,
// for the rest of the indexes in `pandas_type`.
return type.pandas_type === "object" ? type.numpy_type : type.pandas_type
const typeName =
type.pandas_type === "object" ? type.numpy_type : type.pandas_type
return typeName.toLowerCase().trim()
}

/** Takes the data and it's type and nicely formats it. */
Expand Down
3 changes: 3 additions & 0 deletions lib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
if not os.getenv("SNOWPARK_CONDA_BUILD"):
INSTALL_REQUIRES.extend(SNOWPARK_CONDA_EXCLUDED_DEPENDENCIES)

# stlite: See https://github.com/whitphx/stlite/issues/509#issuecomment-1657957887
INSTALL_REQUIRES.extend(["fastparquet"])

EXTRA_REQUIRES = {"snowflake": ["snowflake-snowpark-python; python_version=='3.8'"]}


Expand Down
23 changes: 14 additions & 9 deletions lib/streamlit/elements/data_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,14 +772,17 @@ def data_editor(
for column in disabled:
update_column_config(column_config_mapping, column, {"disabled": True})

# Convert the dataframe to an arrow table which is used as the main
# serialization format for sending the data to the frontend.
# We also utilize the arrow schema to determine the data kinds of every column.
arrow_table = pa.Table.from_pandas(data_df)

# Determine the dataframe schema which is required for parsing edited values
# and for checking type compatibilities.
dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema)
# stlite: Don't use Arrow
# # Convert the dataframe to an arrow table which is used as the main
# # serialization format for sending the data to the frontend.
# # We also utilize the arrow schema to determine the data kinds of every column.
# arrow_table = pa.Table.from_pandas(data_df)

# stlite: arrow_table.schema can't be used as Arrow is not available.
# # Determine the dataframe schema which is required for parsing edited values
# # and for checking type compatibilities.
# dataframe_schema = determine_dataframe_schema(data_df, arrow_table.schema)
dataframe_schema = determine_dataframe_schema(data_df, None)

# Check if all configured column types are compatible with the underlying data.
# Throws an exception if any of the configured types are incompatible.
Expand Down Expand Up @@ -815,7 +818,9 @@ def data_editor(
default_uuid = str(hash(delta_path))
marshall_styler(proto, data, default_uuid)

proto.data = type_util.pyarrow_table_to_bytes(arrow_table)
# stlite: Don't use Arrow. `type_util.data_frame_to_bytes` is polyfilled to use Parquet instead for stlite.
# proto.data = type_util.pyarrow_table_to_bytes(arrow_table)
proto.data = type_util.data_frame_to_bytes(data_df)

marshall_column_config(proto, column_config_mapping)

Expand Down
61 changes: 58 additions & 3 deletions lib/streamlit/elements/lib/column_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pandas as pd
import pyarrow as pa
from pandas.api.types import infer_dtype, is_categorical_dtype
from typing_extensions import Final, Literal, TypeAlias

from streamlit.elements.lib.column_types import ColumnConfig, ColumnType
Expand Down Expand Up @@ -347,7 +348,7 @@ def _determine_data_kind(


def determine_dataframe_schema(
data_df: pd.DataFrame, arrow_schema: pa.Schema
data_df: pd.DataFrame, arrow_schema: Optional[pa.Schema]
) -> DataframeSchema:
"""Determine the schema of a dataframe.

Expand Down Expand Up @@ -376,7 +377,7 @@ def determine_dataframe_schema(
for i, column in enumerate(data_df.items()):
column_name, column_data = column
dataframe_schema[column_name] = _determine_data_kind(
column_data, arrow_schema.field(i)
column_data, arrow_schema.field(i) if arrow_schema else None
)
return dataframe_schema

Expand Down Expand Up @@ -475,11 +476,65 @@ def apply_data_specific_configs(
"""
# Deactivate editing for columns that are not compatible with arrow
if check_arrow_compatibility:
# Stlite: Fix non-string column names (not supported by fastparquet):
if infer_dtype(data_df.columns) != "string":
data_df.columns = data_df.columns.astype("string")

for column_name, column_data in data_df.items():
# Stlite: Configure column types for some aspects
# that are not working out of the box with the parquet serialization.
if column_name not in columns_config:
if is_categorical_dtype(column_data.dtype):
update_column_config(
columns_config,
column_name,
{
"type_config": {
"type": "selectbox",
"options": column_data.cat.categories.tolist(),
},
},
)
if column_data.dtype == "object":
inferred_type = infer_dtype(column_data, skipna=True)
if inferred_type in ["string", "empty"]:
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "text"}},
)
elif inferred_type == "boolean":
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "checkbox"}},
)
elif inferred_type == "date":
data_df[column_name] = pd.to_datetime(
column_data.astype("string"), errors="coerce"
)
column_data = data_df[column_name]
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "date"}},
)
continue
elif inferred_type == "time":
data_df[column_name] = pd.to_datetime(
column_data.astype("string"), errors="coerce"
)
column_data = data_df[column_name]
update_column_config(
columns_config,
column_name,
{"type_config": {"type": "time"}},
)

if is_colum_type_arrow_incompatible(column_data):
update_column_config(columns_config, column_name, {"disabled": True})
# Convert incompatible type to string
data_df[column_name] = column_data.astype(str)
data_df[column_name] = column_data.astype("string")

# Pandas adds a range index as default to all datastructures
# but for most of the non-pandas data objects it is unnecessary
Expand Down
68 changes: 59 additions & 9 deletions lib/streamlit/type_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import contextlib
import io
import re
import types
from enum import Enum, auto
Expand All @@ -40,7 +41,13 @@
import numpy as np
import pyarrow as pa
from pandas import DataFrame, Index, MultiIndex, Series
from pandas.api.types import infer_dtype, is_dict_like, is_list_like
from pandas.api.types import (
infer_dtype,
is_dict_like,
is_interval_dtype,
is_list_like,
is_period_dtype,
)
from typing_extensions import Final, Literal, Protocol, TypeAlias, TypeGuard, get_args

import streamlit as st
Expand Down Expand Up @@ -658,6 +665,14 @@ def is_colum_type_arrow_incompatible(column: Union[Series, Index]) -> bool:
]:
return True

# Stlite: not supported by fastparquet:
if is_interval_dtype(column.dtype):
return True

# Stlite: not supported by fastparquet:
if is_period_dtype(column.dtype):
return True

if column.dtype == "object":
# The dtype of mixed type columns is always object, the actual type of the column
# values can be determined via the infer_dtype function:
Expand All @@ -669,6 +684,10 @@ def is_colum_type_arrow_incompatible(column: Union[Series, Index]) -> bool:
"complex",
"timedelta",
"timedelta64",
# Stlite: not supported by fastparquet (as object types):
"date",
"time",
"datetime",
]:
return True
elif inferred_type == "mixed":
Expand All @@ -688,6 +707,10 @@ def is_colum_type_arrow_incompatible(column: Union[Series, Index]) -> bool:
or is_dict_like(first_value)
# Frozensets are list-like, but are not compatible with pyarrow.
or isinstance(first_value, frozenset)
# Stlite: not supported by fastparquet:
or isinstance(first_value, set)
or isinstance(first_value, tuple)
or infer_dtype(first_value, skipna=True) in ["datetime"]
):
# This seems to be an incompatible list-like type
return True
Expand Down Expand Up @@ -725,7 +748,7 @@ def fix_arrow_incompatible_column_types(
if is_colum_type_arrow_incompatible(df[col]):
if df_copy is None:
df_copy = df.copy()
df_copy[col] = df[col].astype(str)
df_copy[col] = df[col].astype("string")

# The index can also contain mixed types
# causing Arrow issues during conversion.
Expand All @@ -740,30 +763,57 @@ def fix_arrow_incompatible_column_types(
):
if df_copy is None:
df_copy = df.copy()
df_copy.index = df.index.astype(str)
df_copy.index = df.index.astype("string")

# Stlite: fastparquet does not support non-string column names:
if infer_dtype(df.columns) != "string":
if df_copy is None:
df_copy = df.copy()
df_copy.columns = df.columns.astype("string")

return df_copy if df_copy is not None else df


# `pd.DataFrame.to_parquet()` always closes the file handle,
# but we need to keep it open to get the written data.
# So we use this custom class to prevent the closing.
# https://github.com/dask/fastparquet/issues/868
class UnclosableBytesIO(io.BytesIO):
def close(self):
pass

def really_close(self):
super().close()


def data_frame_to_bytes(df: DataFrame) -> bytes:
"""Serialize pandas.DataFrame to bytes using Apache Arrow.
"""Serialize pandas.DataFrame to bytes using Apache ~~Arrow~~ Parquet.
This function is customized from the original one to use Parquet instead of Arrow
for stlite. See https://github.com/whitphx/stlite/issues/509

Parameters
----------
df : pandas.DataFrame
A dataframe to convert.

"""
buf = UnclosableBytesIO()

try:
table = pa.Table.from_pandas(df)
except (pa.ArrowTypeError, pa.ArrowInvalid, pa.ArrowNotImplementedError) as ex:
df.to_parquet(buf, engine="fastparquet")
except ValueError as ex:
_LOGGER.info(
"Serialization of dataframe to Arrow table was unsuccessful due to: %s. "
"Serialization of dataframe to Parquet table was unsuccessful due to: %s. "
"Applying automatic fixes for column types to make the dataframe Arrow-compatible.",
ex,
)
df = fix_arrow_incompatible_column_types(df)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note: parquet might have other incompatibilities compared to the arrow serialization, which might be good to add here once we have identified some column types that don't work.

table = pa.Table.from_pandas(df)
return pyarrow_table_to_bytes(table)
df.to_parquet(buf, engine="fastparquet")

data = buf.getvalue()
buf.really_close()

return data


def bytes_to_data_frame(source: bytes) -> DataFrame:
Expand Down