From 6f80cf8d3e2615b32721a56d6dc9afd09480f9c5 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 19 Jul 2021 21:05:09 +0200 Subject: [PATCH] Some import/module adjustments (#992) * Added __all__ dunders to the __init__.py files to solve the problem mentioned above. This caused some issues with mypy, so I had to explicitly import Series, DataFrame, etc. in the top __init__.py (I filed a mypy issue for this... I believe there's a bug). * Removed the dtype_to_int function in the datatypes module. It was unused internally, and I see no use for this function. * I tried hiding the wrapping functions (wrap_s, wrap_df, etc.) from the main scope, as I believe users should never have to explicitly use these. But the Rust backend expects those functions to be there. I left them for now; something for the future maybe (possibly just rename them with a leading underscore or something). * Renamed lazy/expr_functions to lazy/functions (now possible thanks to the fix to the first mentioned issue). This conforms to the syntax people know from pyspark: use from polars.lazy import functions as F and then use F.col, F.sum, etc. Split up functions.py into io.py (for all the read functions like read_csv, etc.) and eager/functions.py (for concat, get_dummies, etc.). * Moved StringCache and toggle_string_cache to their own file. --- py-polars/polars/__init__.py | 19 + py-polars/polars/datatypes.py | 10 - py-polars/polars/eager/__init__.py | 3 + py-polars/polars/eager/frame.py | 72 +- py-polars/polars/eager/series.py | 7 +- py-polars/polars/functions.py | 633 ++---------------- py-polars/polars/io.py | 513 ++++++++++++++ py-polars/polars/lazy/__init__.py | 5 +- py-polars/polars/lazy/expr.py | 3 +- py-polars/polars/lazy/frame.py | 5 +- .../lazy/{expr_functions.py => functions.py} | 22 +- py-polars/polars/lazy/whenthen.py | 4 +- py-polars/polars/string_cache.py | 45 ++ py-polars/tests/test_df.py | 6 +- 14 files changed, 690 insertions(+), 657 deletions(-) create mode 100644 py-polars/polars/io.py rename py-polars/polars/lazy/{expr_functions.py => functions.py} (95%) create mode 100644 py-polars/polars/string_cache.py diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index 35c65523e135..102104cc2f87 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -1,8 +1,18 @@ # flake8: noqa + +# mypy needs these imported explicitly +from polars.eager.frame import DataFrame, wrap_df +from polars.eager.series import Series, wrap_s +from polars.lazy.expr import Expr, wrap_expr +from polars.lazy.frame import LazyFrame, wrap_ldf + +from . import datatypes, eager, functions, io, lazy, string_cache from .datatypes import * from .eager import * from .functions import * +from .io import * from .lazy import * +from .string_cache import * # during docs building the binary code is not yet available try: @@ -11,3 +21,12 @@ __version__ = version() except ImportError: pass + +__all__ = ( + datatypes.__all__ + + eager.__all__ + + functions.__all__ + + io.__all__ + + lazy.__all__ + + string_cache.__all__ +) diff --git a/py-polars/polars/datatypes.py b/py-polars/polars/datatypes.py index fb5e6ea9cf55..67f4dc3b8413 100644 --- a/py-polars/polars/datatypes.py +++ b/py-polars/polars/datatypes.py @@ -260,16 +260,6 @@ def dtype_to_ctype(dtype: Type[DataType]) -> Type[_SimpleCData]: # noqa: F821 return ptr_type -def dtype_to_int(dtype: Type[DataType]) -> int: - i = 0 - for dt in DTYPES: - if dt == dtype: - return i - i += 1 - else: - raise NotImplementedError - - def pytype_to_polars_type(data_type: Type[Any]) -> Type[DataType]: polars_type: Type[DataType] if data_type == int: diff --git a/py-polars/polars/eager/__init__.py b/py-polars/polars/eager/__init__.py index 27e8f85a5163..c879ec920349 100644 --- a/py-polars/polars/eager/__init__.py +++ b/py-polars/polars/eager/__init__.py @@ -1,3 +1,6 @@ # flake8: noqa +from . import frame, series from .frame import * from .series import * + +__all__ = frame.__all__ + series.__all__ diff --git a/py-polars/polars/eager/frame.py b/py-polars/polars/eager/frame.py index 673422e1d9df..c27f85cb2b96 100644 --- a/py-polars/polars/eager/frame.py +++ b/py-polars/polars/eager/frame.py @@ -5,7 +5,6 @@ import typing as tp from io import BytesIO, StringIO from pathlib import Path -from types import TracebackType from typing import ( Any, BinaryIO, @@ -34,13 +33,11 @@ try: from ..polars import PyDataFrame, PySeries - from ..polars import toggle_string_cache as pytoggle_string_cache except ImportError: import warnings warnings.warn("binary files missing") - try: import pandas as pd except ImportError: @@ -48,9 +45,6 @@ __all__ = [ "DataFrame", - "wrap_df", - "StringCache", - "toggle_string_cache", ] @@ -131,7 +125,7 @@ def _from_pydf(df: "PyDataFrame") -> "DataFrame": @staticmethod def from_rows( rows: Sequence[Sequence[Any]], - column_names: Optional[tp.List[str]] = None, + column_names: Optional[Sequence[str]] = None, column_name_mapping: Optional[Dict[int, str]] = None, ) -> "DataFrame": """ @@ -154,7 +148,7 @@ def from_rows( self = DataFrame.__new__(DataFrame) self._df = PyDataFrame.read_rows(rows) if column_names is not None: - self.columns = column_names + self.columns = list(column_names) if column_name_mapping is not None: for i, name in column_name_mapping.items(): s = self[:, i] @@ -608,7 +602,7 @@ def __getattr__(self, item: Any) -> "PySeries": Access columns as attribute. """ try: - return pl.wrap_s(self._df.column(item)) + return pl.eager.series.wrap_s(self._df.column(item)) except RuntimeError: raise AttributeError(f"{item} not found") @@ -673,7 +667,7 @@ def __getitem__(self, item: Any) -> Any: # df[:, unknown] series = self.__getitem__(col_selection) # s[:] - pl.wrap_s(series[row_selection]) + pl.eager.series.wrap_s(series[row_selection]) # df[2, :] (select row as df) if isinstance(row_selection, int): @@ -706,7 +700,7 @@ def __getitem__(self, item: Any) -> Any: # select single column # df["foo"] if isinstance(item, str): - return pl.wrap_s(self._df.column(item)) + return pl.eager.series.wrap_s(self._df.column(item)) # df[idx] if isinstance(item, int): @@ -1012,7 +1006,7 @@ def describe_cast(self: "DataFrame") -> "DataFrame": columns.append(s) return pl.DataFrame(columns) - summary = pl.concat( + summary = pl.functions.concat( [ describe_cast(self.mean()), describe_cast(self.std()), @@ -1503,7 +1497,7 @@ def apply( return_dtype Output type of the operation. If none given, Polars tries to infer the type. """ - return pl.wrap_s(self._df.apply(f, return_dtype)) + return pl.eager.series.wrap_s(self._df.apply(f, return_dtype)) def with_column(self, column: Union["pl.Series", "pl.Expr"]) -> "DataFrame": """ @@ -1608,7 +1602,7 @@ def drop_in_place(self, name: str) -> "pl.Series": name Column to drop. """ - return pl.wrap_s(self._df.drop_in_place(name)) + return pl.eager.series.wrap_s(self._df.drop_in_place(name)) def select_at_idx(self, idx: int) -> "pl.Series": """ @@ -1619,7 +1613,7 @@ def select_at_idx(self, idx: int) -> "pl.Series": idx Location of selection. """ - return pl.wrap_s(self._df.select_at_idx(idx)) + return pl.eager.series.wrap_s(self._df.select_at_idx(idx)) def clone(self) -> "DataFrame": """ @@ -1631,7 +1625,7 @@ def get_columns(self) -> tp.List["pl.Series"]: """ Get the DataFrame as a List of Series. """ - return list(map(lambda s: pl.wrap_s(s), self._df.get_columns())) + return list(map(lambda s: pl.eager.series.wrap_s(s), self._df.get_columns())) def fill_none(self, strategy: Union[str, "pl.Expr"]) -> "DataFrame": """ @@ -1737,13 +1731,13 @@ def is_duplicated(self) -> "pl.Series": """ Get a mask of all duplicated rows in this DataFrame. """ - return pl.wrap_s(self._df.is_duplicated()) + return pl.eager.series.wrap_s(self._df.is_duplicated()) def is_unique(self) -> "pl.Series": """ Get a mask of all unique rows in this DataFrame. """ - return pl.wrap_s(self._df.is_unique()) + return pl.eager.series.wrap_s(self._df.is_unique()) def lazy(self) -> "pl.LazyFrame": """ @@ -1759,7 +1753,7 @@ def lazy(self) -> "pl.LazyFrame": Lazy operations are advised because they allow for query optimization and more parallelization. """ - return pl.wrap_ldf(self._df.lazy()) + return pl.lazy.frame.wrap_ldf(self._df.lazy()) def select( self, exprs: Union[str, "pl.Expr", Sequence[str], Sequence["pl.Expr"]] @@ -1806,7 +1800,7 @@ def max(self, axis: int = 0) -> "DataFrame": if axis == 0: return wrap_df(self._df.max()) if axis == 1: - return pl.wrap_s(self._df.hmax()).to_frame() + return pl.eager.series.wrap_s(self._df.hmax()).to_frame() raise ValueError("Axis should be 0 or 1.") def min(self, axis: int = 0) -> "DataFrame": @@ -1816,7 +1810,7 @@ def min(self, axis: int = 0) -> "DataFrame": if axis == 0: return wrap_df(self._df.min()) if axis == 1: - return pl.wrap_s(self._df.hmin()).to_frame() + return pl.eager.series.wrap_s(self._df.hmin()).to_frame() raise ValueError("Axis should be 0 or 1.") def sum(self, axis: int = 0) -> "DataFrame": @@ -1826,7 +1820,7 @@ def sum(self, axis: int = 0) -> "DataFrame": if axis == 0: return wrap_df(self._df.sum()) if axis == 1: - return pl.wrap_s(self._df.hsum()).to_frame() + return pl.eager.series.wrap_s(self._df.hsum()).to_frame() raise ValueError("Axis should be 0 or 1.") def mean(self, axis: int = 0) -> "DataFrame": @@ -1836,7 +1830,7 @@ def mean(self, axis: int = 0) -> "DataFrame": if axis == 0: return wrap_df(self._df.mean()) if axis == 1: - return pl.wrap_s(self._df.hmean()).to_frame() + return pl.eager.series.wrap_s(self._df.hmean()).to_frame() raise ValueError("Axis should be 0 or 1.") def std(self) -> "DataFrame": @@ -2054,7 +2048,7 @@ def hash_rows( k3 seed parameter """ - return pl.wrap_s(self._df.hash_rows(k0, k1, k2, k3)) + return pl.eager.series.wrap_s(self._df.hash_rows(k0, k1, k2, k3)) class GroupBy: @@ -2551,33 +2545,3 @@ def apply( df[name] = s return df - - -class StringCache: - """ - Context manager that allows data sources to share the same categorical features. - This will temporarily cache the string categories until the context manager is finished. - """ - - def __init__(self) -> None: - pass - - def __enter__(self) -> "StringCache": - pytoggle_string_cache(True) - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - pytoggle_string_cache(False) - - -def toggle_string_cache(toggle: bool) -> None: - """ - Turn on/off the global string cache. This ensures that casts to Categorical types have the categories when string - values are equal. - """ - pytoggle_string_cache(toggle) diff --git a/py-polars/polars/eager/series.py b/py-polars/polars/eager/series.py index b04e5a1543f8..2bf63a07145b 100644 --- a/py-polars/polars/eager/series.py +++ b/py-polars/polars/eager/series.py @@ -49,7 +49,6 @@ __all__ = [ "Series", - "wrap_s", ] @@ -527,7 +526,7 @@ def to_frame(self) -> "pl.DataFrame": """ Cast this Series to a DataFrame. """ - return pl.wrap_df(PyDataFrame([self._s])) + return pl.eager.frame.wrap_df(PyDataFrame([self._s])) @property def dtype(self) -> Type[DataType]: @@ -640,13 +639,13 @@ def to_dummies(self) -> "pl.DataFrame": """ Get dummy variables. """ - return pl.wrap_df(self._s.to_dummies()) + return pl.eager.frame.wrap_df(self._s.to_dummies()) def value_counts(self) -> "pl.DataFrame": """ Count the unique values in a Series. """ - return pl.wrap_df(self._s.value_counts()) + return pl.eager.frame.wrap_df(self._s.value_counts()) @property def name(self) -> str: diff --git a/py-polars/polars/functions.py b/py-polars/polars/functions.py index 6f2a64f4016c..63951eb5fa42 100644 --- a/py-polars/polars/functions.py +++ b/py-polars/polars/functions.py @@ -1,140 +1,27 @@ -from contextlib import contextmanager -from io import BytesIO, StringIO -from pathlib import Path -from typing import ( - Any, - BinaryIO, - ContextManager, - Dict, - Iterator, - List, - Optional, - Sequence, - TextIO, - Type, - Union, - overload, -) -from urllib.request import urlopen +from typing import Any, Dict, Optional, Sequence, Union import numpy as np import pyarrow as pa -import pyarrow.compute -import pyarrow.csv -import pyarrow.parquet + +import polars as pl try: import pandas as pd except ImportError: pass -try: - import fsspec - from fsspec.implementations.local import make_path_posix - from fsspec.utils import infer_compression, infer_storage_options - - WITH_FSSPEC = True -except ImportError: - WITH_FSSPEC = False - -import polars as pl - __all__ = [ "get_dummies", "concat", "repeat", "arg_where", - "read_csv", - "read_parquet", - "read_json", - "read_sql", - "read_ipc", - "scan_csv", - "scan_parquet", + "from_rows", "from_arrow", "from_pandas", - "from_rows", "from_arrow_table", # deprecated ] -def _process_http_file(path: str) -> BytesIO: - with urlopen(path) as f: - return BytesIO(f.read()) - - -@overload -def _prepare_file_arg( - file: Union[str, List[str], Path, BinaryIO], **kwargs: Any -) -> ContextManager[Union[str, BinaryIO]]: - ... - - -@overload -def _prepare_file_arg( - file: Union[str, TextIO, Path, BinaryIO], **kwargs: Any -) -> ContextManager[Union[str, BinaryIO]]: - ... - - -@overload -def _prepare_file_arg( - file: Union[str, List[str], TextIO, Path, BinaryIO], **kwargs: Any -) -> ContextManager[Union[str, List[str], BinaryIO, List[BinaryIO]]]: - ... - - -def _prepare_file_arg( - file: Union[str, List[str], TextIO, Path, BinaryIO], **kwargs: Any -) -> ContextManager[Union[str, BinaryIO, List[str], List[BinaryIO]]]: - """ - Utility for read_[csv, parquet]. (not to be used by scan_[csv, parquet]). - Returned value is always usable as a context. - - A `StringIO`, `BytesIO` file is returned as a `BytesIO` - A local path is returned as a string - An http url is read into a buffer and returned as a `BytesIO` - - When fsspec is installed, except for `StringIO`, `BytesIO` and local - uncompressed files, the file is opened with `fsspec.open(file, **kwargs)`, - in which case, the compression is inferred. - """ - - compression = kwargs.pop("compression", "infer") - - # Small helper to use a variable as context - @contextmanager - def managed_file(file: Any) -> Iterator[Any]: - try: - yield file - finally: - pass - - if isinstance(file, StringIO): - return BytesIO(file.read().encode("utf8")) - if isinstance(file, BytesIO): - return file - if isinstance(file, Path): - return managed_file(str(file)) - if isinstance(file, str): - if WITH_FSSPEC: - compressed = infer_compression(file) is not None - local = infer_storage_options(file)["protocol"] == "file" - if local and not compressed: - return managed_file(make_path_posix(file)) - return fsspec.open(file, compression=compression, **kwargs) - if file.startswith("http"): - return _process_http_file(file) - if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file): - if WITH_FSSPEC: - compressed = any(infer_compression(f) is not None for f in file) - local = all(infer_storage_options(f)["protocol"] == "file" for f in file) - if local and not compressed: - return managed_file(list(map(make_path_posix, file))) - return fsspec.open_files(file, compression=compression, **kwargs) - return managed_file(file) - - def get_dummies(df: "pl.DataFrame") -> "pl.DataFrame": """ Convert categorical variables into dummy/indicator variables. @@ -147,342 +34,54 @@ def get_dummies(df: "pl.DataFrame") -> "pl.DataFrame": return df.to_dummies() -def read_csv( - file: Union[str, TextIO, Path, BinaryIO], - infer_schema_length: int = 100, - batch_size: int = 8192, - has_headers: bool = True, - ignore_errors: bool = False, - stop_after_n_rows: Optional[int] = None, - skip_rows: int = 0, - projection: Optional[List[int]] = None, - sep: str = ",", - columns: Optional[List[str]] = None, - rechunk: bool = True, - encoding: str = "utf8", - n_threads: Optional[int] = None, - dtype: Optional[Dict[str, Type["pl.DataType"]]] = None, - new_columns: Optional[List[str]] = None, - use_pyarrow: bool = False, - low_memory: bool = False, - comment_char: Optional[str] = None, - storage_options: Optional[Dict] = None, - null_values: Optional[Union[str, List[str], Dict[str, str]]] = None, -) -> "pl.DataFrame": +def concat(dfs: Sequence["pl.DataFrame"], rechunk: bool = True) -> "pl.DataFrame": """ - Read into a DataFrame from a csv file. + Aggregate all the Dataframes in a List of DataFrames to a single DataFrame. Parameters ---------- - file - Path to a file or a file like object. - By file-like object, we refer to objects with a ``read()`` method, - such as a file handler (e.g. via builtin ``open`` function) - or ``StringIO`` or ``BytesIO``. - If ``fsspec`` is installed, it will be used to open non-local or compressed files - infer_schema_length - Maximum number of lines to read to infer schema. - batch_size - Number of lines to read into the buffer at once. Modify this to change performance. - has_headers - Indicate if first row of dataset is header or not. If set to False first row will be set to `column_x`, - `x` being an enumeration over every column in the dataset. - ignore_errors - Try to keep reading lines if some lines yield errors. - stop_after_n_rows - After n rows are read from the CSV, it stops reading. - During multi-threaded parsing, an upper bound of `n` rows - cannot be guaranteed. - skip_rows - Start reading after `skip_rows`. - projection - Indexes of columns to select. Note that column indexes count from zero. - sep - Delimiter/ value separator. - columns - Columns to project/ select. + dfs + DataFrames to concatenate. rechunk - Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. - encoding - - "utf8" - - "utf8-lossy" - n_threads - Number of threads to use in csv parsing. Defaults to the number of physical cpu's of your system. - dtype - Overwrite the dtypes during inference. - new_columns - Rename columns to these right after parsing. Note that the length of this list must equal the width of the DataFrame - that's parsed. - use_pyarrow - Try to use pyarrow's native CSV parser. This is not always possible. The set of arguments given to this function - determine if it is possible to use pyarrows native parser. Note that pyarrow and polars may have a different - strategy regarding type inference. - low_memory - Reduce memory usage in expense of performance. - comment_char - character that indicates the start of a comment line, for instance '#'. - storage_options - Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc. - null_values - Values to interpret as null values. You can provide a: - - - str -> all values encountered equal to this string will be null - - List[str] -> A null value per column. - - Dict[str, str] -> A dictionary that maps column name to a null value string. - - Returns - ------- - DataFrame + rechunk the final DataFrame. """ + assert len(dfs) > 0 + df = dfs[0].clone() + for i in range(1, len(dfs)): + try: + df = df.vstack(dfs[i], in_place=False) # type: ignore[assignment] + # could have a double borrow (one mutable one ref) + except RuntimeError: + df.vstack(dfs[i].clone(), in_place=True) - storage_options = storage_options or {} - - if columns and not has_headers: - for column in columns: - if not column.startswith("column_"): - raise ValueError( - 'Specified column names do not start with "column_", ' - "but autogenerated header names were requested." - ) - - if ( - use_pyarrow - and dtype is None - and stop_after_n_rows is None - and n_threads is None - and encoding == "utf8" - and not low_memory - and null_values is None - ): - include_columns = None - - if columns: - if not has_headers: - # Convert 'column_1', 'column_2', ... column names to 'f0', 'f1', ... column names for pyarrow, - # if CSV file does not contain a header. - include_columns = [f"f{int(column[7:]) - 1}" for column in columns] - else: - include_columns = columns - - if not columns and projection: - # Convert column indices from projection to 'f0', 'f1', ... column names for pyarrow. - include_columns = [f"f{column_idx}" for column_idx in projection] - - with _prepare_file_arg(file, **storage_options) as data: - tbl = pa.csv.read_csv( - data, - pa.csv.ReadOptions( - skip_rows=skip_rows, autogenerate_column_names=not has_headers - ), - pa.csv.ParseOptions(delimiter=sep), - pa.csv.ConvertOptions( - column_types=None, - include_columns=include_columns, - include_missing_columns=ignore_errors, - ), - ) - - if new_columns: - tbl = tbl.rename_columns(new_columns) - elif not has_headers: - # Rename 'f0', 'f1', ... columns names autogenated by pyarrow to 'column_1', 'column_2', ... - tbl = tbl.rename_columns( - [f"column_{int(column[1:]) + 1}" for column in tbl.column_names] - ) - - return from_arrow(tbl, rechunk) # type: ignore[return-value] - - with _prepare_file_arg(file, **storage_options) as data: - df = pl.DataFrame.read_csv( - file=data, - infer_schema_length=infer_schema_length, - batch_size=batch_size, - has_headers=has_headers, - ignore_errors=ignore_errors, - stop_after_n_rows=stop_after_n_rows, - skip_rows=skip_rows, - projection=projection, - sep=sep, - columns=columns, - rechunk=rechunk, - encoding=encoding, - n_threads=n_threads, - dtype=dtype, - low_memory=low_memory, - comment_char=comment_char, - null_values=null_values, - ) - - if new_columns: - df.columns = new_columns + if rechunk: + return df.rechunk() return df -def scan_csv( - file: Union[str, Path], - has_headers: bool = True, - ignore_errors: bool = False, - sep: str = ",", - skip_rows: int = 0, - stop_after_n_rows: Optional[int] = None, - cache: bool = True, - dtype: Optional[Dict[str, Type["pl.DataType"]]] = None, - low_memory: bool = False, - comment_char: Optional[str] = None, - null_values: Optional[Union[str, List[str], Dict[str, str]]] = None, -) -> "pl.LazyFrame": - """ - Lazily read from a csv file. - - This allows the query optimizer to push down predicates and projections to the scan level, - thereby potentially reducing memory overhead. - - Parameters - ---------- - file - Path to a file. - has_headers - If the CSV file has headers or not. - ignore_errors - Try to keep reading lines if some lines yield errors. - sep - Delimiter/ value separator. - skip_rows - Start reading after `skip_rows`. - stop_after_n_rows - After n rows are read from the CSV, it stops reading. - During multi-threaded parsing, an upper bound of `n` rows - cannot be guaranteed. - cache - Cache the result after reading. - dtype - Overwrite the dtypes during inference. - low_memory - Reduce memory usage in expense of performance. - comment_char - character that indicates the start of a comment line, for instance '#'. - null_values - Values to interpret as null values. You can provide a: - - - str -> all values encountered equal to this string will be null - - List[str] -> A null value per column. - - Dict[str, str] -> A dictionary that maps column name to a null value string. - """ - if isinstance(file, Path): - file = str(file) - return pl.LazyFrame.scan_csv( - file=file, - has_headers=has_headers, - sep=sep, - ignore_errors=ignore_errors, - skip_rows=skip_rows, - stop_after_n_rows=stop_after_n_rows, - cache=cache, - dtype=dtype, - low_memory=low_memory, - comment_char=comment_char, - null_values=null_values, - ) - - -def scan_parquet( - file: Union[str, Path], - stop_after_n_rows: Optional[int] = None, - cache: bool = True, -) -> "pl.LazyFrame": - """ - Lazily read from a parquet file. - - This allows the query optimizer to push down predicates and projections to the scan level, - thereby potentially reducing memory overhead. - - Parameters - ---------- - file - Path to a file. - stop_after_n_rows - After n rows are read from the parquet, it stops reading. - cache - Cache the result after reading. - """ - if isinstance(file, Path): - file = str(file) - return pl.LazyFrame.scan_parquet( - file=file, stop_after_n_rows=stop_after_n_rows, cache=cache - ) - - -def read_ipc( - file: Union[str, BinaryIO, Path], - use_pyarrow: bool = True, - storage_options: Optional[Dict] = None, -) -> "pl.DataFrame": - """ - Read into a DataFrame from Arrow IPC stream format. This is also called the feather format. - - Parameters - ---------- - file - Path to a file or a file like object. - If ``fsspec`` is installed, it will be used to open non-local or compressed files - use_pyarrow - Use pyarrow or rust arrow backend. - storage_options - Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc. - - Returns - ------- - DataFrame - """ - storage_options = storage_options or {} - with _prepare_file_arg(file, **storage_options) as data: - return pl.DataFrame.read_ipc(data, use_pyarrow) - - -def read_parquet( - source: Union[str, List[str], Path, BinaryIO], - stop_after_n_rows: Optional[int] = None, - memory_map: bool = True, - columns: Optional[List[str]] = None, - storage_options: Optional[Dict] = None, - **kwargs: Any, -) -> "pl.DataFrame": +def repeat( + val: Union[int, float, str], n: int, name: Optional[str] = None +) -> "pl.Series": """ - Read into a DataFrame from a parquet file. + Repeat a single value n times and collect into a Series. Parameters ---------- - source - Path to a file | list of files, or a file like object. If the path is a directory, that directory will be used - as partition aware scan. - If ``fsspec`` is installed, it will be used to open non-local or compressed files - stop_after_n_rows - After n rows are read from the parquet, it stops reading. Note: this cannot be used in partition aware parquet - reads. - memory_map - Memory map underlying file. This will likely increase performance. - columns - Columns to project/ select. - storage_options - Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc. - **kwargs - kwargs for [pyarrow.parquet.read_table](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html) - - Returns - ------- - DataFrame + val + Value to repeat. + n + Number of repeats. + name + Optional name of the Series. """ - storage_options = storage_options or {} - with _prepare_file_arg(source, **storage_options) as source_prep: - if stop_after_n_rows is not None: - return pl.DataFrame.read_parquet( - source_prep, stop_after_n_rows=stop_after_n_rows - ) - return from_arrow( # type: ignore[return-value] - pa.parquet.read_table( - source_prep, memory_map=memory_map, columns=columns, **kwargs - ) - ) + if name is None: + name = "" + if isinstance(val, str): + s = pl.Series._repeat(name, val, n) + s.rename(name) + return s + else: + return pl.Series.from_arrow(name, pa.repeat(val, n)) def arg_where(mask: "pl.Series") -> "pl.Series": @@ -501,21 +100,28 @@ def arg_where(mask: "pl.Series") -> "pl.Series": return mask.arg_true() -def from_arrow_table(table: pa.Table, rechunk: bool = True) -> "pl.DataFrame": +def from_rows( + rows: Sequence[Sequence[Any]], + column_names: Optional[Sequence[str]] = None, + column_name_mapping: Optional[Dict[int, str]] = None, +) -> "pl.DataFrame": """ - .. deprecated:: 7.3 - use `from_arrow` - - Create a DataFrame from an arrow Table. - + Create a DataFrame from rows. This should only be used as a last resort, as this is more expensive than + creating from columnar data. Parameters ---------- - a - Arrow Table. - rechunk - Make sure that all data is contiguous. + rows + rows. + column_names + column names to use for the DataFrame. + column_name_mapping + map column index to a new name: + Example: + ```python + column_mapping: {0: "first_column, 3: "fourth column"} + ``` """ - return pl.DataFrame.from_arrow(table, rechunk) + return pl.DataFrame.from_rows(rows, column_names, column_name_mapping) def from_arrow( @@ -589,129 +195,18 @@ def from_pandas( return from_arrow(table, rechunk) -def concat(dfs: Sequence["pl.DataFrame"], rechunk: bool = True) -> "pl.DataFrame": - """ - Aggregate all the Dataframes in a List of DataFrames to a single DataFrame. - - Parameters - ---------- - dfs - DataFrames to concatenate. - rechunk - rechunk the final DataFrame. - """ - assert len(dfs) > 0 - df = dfs[0].clone() - for i in range(1, len(dfs)): - try: - df = df.vstack(dfs[i], in_place=False) # type: ignore[assignment] - # could have a double borrow (one mutable one ref) - except RuntimeError: - df.vstack(dfs[i].clone(), in_place=True) - - if rechunk: - return df.rechunk() - return df - - -def repeat( - val: Union[int, float, str], n: int, name: Optional[str] = None -) -> "pl.Series": - """ - Repeat a single value n times and collect into a Series. - - Parameters - ---------- - val - Value to repeat. - n - Number of repeats. - name - Optional name of the Series. - """ - if name is None: - name = "" - if isinstance(val, str): - s = pl.Series._repeat(name, val, n) - s.rename(name) - return s - else: - return pl.Series.from_arrow(name, pa.repeat(val, n)) - - -def read_json(source: Union[str, BytesIO]) -> "pl.DataFrame": - """ - Read into a DataFrame from JSON format. - - Parameters - ---------- - source - Path to a file or a file like object. - """ - return pl.DataFrame.read_json(source) - - -def from_rows( - rows: Sequence[Sequence[Any]], - column_names: Optional[List[str]] = None, - column_name_mapping: Optional[Dict[int, str]] = None, -) -> "pl.DataFrame": - """ - Create a DataFrame from rows. This should only be used as a last resort, as this is more expensive than - creating from columnar data. - - Parameters - ---------- - rows - rows. - column_names - column names to use for the DataFrame. - column_name_mapping - map column index to a new name: - Example: - ```python - column_mapping: {0: "first_column, 3: "fourth column"} - ``` - """ - return pl.DataFrame.from_rows(rows, column_names, column_name_mapping) - - -def read_sql(sql: str, engine: Any) -> "pl.DataFrame": +def from_arrow_table(table: pa.Table, rechunk: bool = True) -> "pl.DataFrame": """ - # Preface - Deprecated by design. Will not have a long future support and no guarantees given whatsoever. - Want backwards compatibility? - - Use: - - ```python - df = pl.from_pandas(pd.read_sql(sql, engine)) - ``` - - The support is limited because I want something better. + .. deprecated:: 7.3 + use `from_arrow` - # Docstring - Load a DataFrame from a database by sending a raw sql query. - Make sure to install sqlalchemy ^1.4 + Create a DataFrame from an arrow Table. Parameters ---------- - sql - raw sql query - engine : sqlalchemy engine - make sure to install sqlalchemy ^1.4 + a + Arrow Table. + rechunk + Make sure that all data is contiguous. """ - try: - # pandas sql loading is faster. - # conversion from pandas to arrow is very cheap compared to db driver - import pandas as pd - - return from_pandas(pd.read_sql(sql, engine)) # type: ignore[return-value] - except ImportError: - from sqlalchemy import text - - with engine.connect() as con: - result = con.execute(text(sql)) - - rows = result.fetchall() - return from_rows(rows, list(result.keys())) + return pl.DataFrame.from_arrow(table, rechunk) diff --git a/py-polars/polars/io.py b/py-polars/polars/io.py new file mode 100644 index 000000000000..666e9644678d --- /dev/null +++ b/py-polars/polars/io.py @@ -0,0 +1,513 @@ +from contextlib import contextmanager +from io import BytesIO, StringIO +from pathlib import Path +from typing import ( + Any, + BinaryIO, + ContextManager, + Dict, + Iterator, + List, + Optional, + TextIO, + Type, + Union, + overload, +) +from urllib.request import urlopen + +import pyarrow as pa +import pyarrow.compute +import pyarrow.csv +import pyarrow.parquet + +import polars as pl + +from .functions import from_arrow, from_pandas + +try: + import fsspec + from fsspec.implementations.local import make_path_posix + from fsspec.utils import infer_compression, infer_storage_options + + WITH_FSSPEC = True +except ImportError: + WITH_FSSPEC = False + +__all__ = [ + "read_csv", + "read_parquet", + "read_json", + "read_sql", + "read_ipc", + "scan_csv", + "scan_parquet", +] + + +def _process_http_file(path: str) -> BytesIO: + with urlopen(path) as f: + return BytesIO(f.read()) + + +@overload +def _prepare_file_arg( + file: Union[str, List[str], Path, BinaryIO], **kwargs: Any +) -> ContextManager[Union[str, BinaryIO]]: + ... + + +@overload +def _prepare_file_arg( + file: Union[str, TextIO, Path, BinaryIO], **kwargs: Any +) -> ContextManager[Union[str, BinaryIO]]: + ... + + +@overload +def _prepare_file_arg( + file: Union[str, List[str], TextIO, Path, BinaryIO], **kwargs: Any +) -> ContextManager[Union[str, List[str], BinaryIO, List[BinaryIO]]]: + ... + + +def _prepare_file_arg( + file: Union[str, List[str], TextIO, Path, BinaryIO], **kwargs: Any +) -> ContextManager[Union[str, BinaryIO, List[str], List[BinaryIO]]]: + """ + Utility for read_[csv, parquet]. (not to be used by scan_[csv, parquet]). + Returned value is always usable as a context. + + A `StringIO`, `BytesIO` file is returned as a `BytesIO` + A local path is returned as a string + An http url is read into a buffer and returned as a `BytesIO` + + When fsspec is installed, except for `StringIO`, `BytesIO` and local + uncompressed files, the file is opened with `fsspec.open(file, **kwargs)`, + in which case, the compression is inferred. + """ + + compression = kwargs.pop("compression", "infer") + + # Small helper to use a variable as context + @contextmanager + def managed_file(file: Any) -> Iterator[Any]: + try: + yield file + finally: + pass + + if isinstance(file, StringIO): + return BytesIO(file.read().encode("utf8")) + if isinstance(file, BytesIO): + return file + if isinstance(file, Path): + return managed_file(str(file)) + if isinstance(file, str): + if WITH_FSSPEC: + compressed = infer_compression(file) is not None + local = infer_storage_options(file)["protocol"] == "file" + if local and not compressed: + return managed_file(make_path_posix(file)) + return fsspec.open(file, compression=compression, **kwargs) + if file.startswith("http"): + return _process_http_file(file) + if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file): + if WITH_FSSPEC: + compressed = any(infer_compression(f) is not None for f in file) + local = all(infer_storage_options(f)["protocol"] == "file" for f in file) + if local and not compressed: + return managed_file(list(map(make_path_posix, file))) + return fsspec.open_files(file, compression=compression, **kwargs) + return managed_file(file) + + +def read_csv( + file: Union[str, TextIO, Path, BinaryIO], + infer_schema_length: int = 100, + batch_size: int = 8192, + has_headers: bool = True, + ignore_errors: bool = False, + stop_after_n_rows: Optional[int] = None, + skip_rows: int = 0, + projection: Optional[List[int]] = None, + sep: str = ",", + columns: Optional[List[str]] = None, + rechunk: bool = True, + encoding: str = "utf8", + n_threads: Optional[int] = None, + dtype: Optional[Dict[str, Type["pl.DataType"]]] = None, + new_columns: Optional[List[str]] = None, + use_pyarrow: bool = False, + low_memory: bool = False, + comment_char: Optional[str] = None, + storage_options: Optional[Dict] = None, + null_values: Optional[Union[str, List[str], Dict[str, str]]] = None, +) -> "pl.DataFrame": + """ + Read into a DataFrame from a csv file. + + Parameters + ---------- + file + Path to a file or a file like object. + By file-like object, we refer to objects with a ``read()`` method, + such as a file handler (e.g. via builtin ``open`` function) + or ``StringIO`` or ``BytesIO``. + If ``fsspec`` is installed, it will be used to open non-local or compressed files + infer_schema_length + Maximum number of lines to read to infer schema. + batch_size + Number of lines to read into the buffer at once. Modify this to change performance. + has_headers + Indicate if first row of dataset is header or not. If set to False first row will be set to `column_x`, + `x` being an enumeration over every column in the dataset. + ignore_errors + Try to keep reading lines if some lines yield errors. + stop_after_n_rows + After n rows are read from the CSV, it stops reading. + During multi-threaded parsing, an upper bound of `n` rows + cannot be guaranteed. + skip_rows + Start reading after `skip_rows`. + projection + Indexes of columns to select. Note that column indexes count from zero. + sep + Delimiter/ value separator. + columns + Columns to project/ select. + rechunk + Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. + encoding + - "utf8" + - "utf8-lossy" + n_threads + Number of threads to use in csv parsing. Defaults to the number of physical cpu's of your system. + dtype + Overwrite the dtypes during inference. + new_columns + Rename columns to these right after parsing. Note that the length of this list must equal the width of the DataFrame + that's parsed. + use_pyarrow + Try to use pyarrow's native CSV parser. This is not always possible. The set of arguments given to this function + determine if it is possible to use pyarrows native parser. Note that pyarrow and polars may have a different + strategy regarding type inference. + low_memory + Reduce memory usage in expense of performance. + comment_char + character that indicates the start of a comment line, for instance '#'. + storage_options + Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc. + null_values + Values to interpret as null values. You can provide a: + + - str -> all values encountered equal to this string will be null + - List[str] -> A null value per column. + - Dict[str, str] -> A dictionary that maps column name to a null value string. + + Returns + ------- + DataFrame + """ + + storage_options = storage_options or {} + + if columns and not has_headers: + for column in columns: + if not column.startswith("column_"): + raise ValueError( + 'Specified column names do not start with "column_", ' + "but autogenerated header names were requested." + ) + + if ( + use_pyarrow + and dtype is None + and stop_after_n_rows is None + and n_threads is None + and encoding == "utf8" + and not low_memory + and null_values is None + ): + include_columns = None + + if columns: + if not has_headers: + # Convert 'column_1', 'column_2', ... column names to 'f0', 'f1', ... column names for pyarrow, + # if CSV file does not contain a header. + include_columns = [f"f{int(column[7:]) - 1}" for column in columns] + else: + include_columns = columns + + if not columns and projection: + # Convert column indices from projection to 'f0', 'f1', ... column names for pyarrow. + include_columns = [f"f{column_idx}" for column_idx in projection] + + with _prepare_file_arg(file, **storage_options) as data: + tbl = pa.csv.read_csv( + data, + pa.csv.ReadOptions( + skip_rows=skip_rows, autogenerate_column_names=not has_headers + ), + pa.csv.ParseOptions(delimiter=sep), + pa.csv.ConvertOptions( + column_types=None, + include_columns=include_columns, + include_missing_columns=ignore_errors, + ), + ) + + if new_columns: + tbl = tbl.rename_columns(new_columns) + elif not has_headers: + # Rename 'f0', 'f1', ... columns names autogenated by pyarrow to 'column_1', 'column_2', ... + tbl = tbl.rename_columns( + [f"column_{int(column[1:]) + 1}" for column in tbl.column_names] + ) + + return from_arrow(tbl, rechunk) # type: ignore[return-value] + + with _prepare_file_arg(file, **storage_options) as data: + df = pl.DataFrame.read_csv( + file=data, + infer_schema_length=infer_schema_length, + batch_size=batch_size, + has_headers=has_headers, + ignore_errors=ignore_errors, + stop_after_n_rows=stop_after_n_rows, + skip_rows=skip_rows, + projection=projection, + sep=sep, + columns=columns, + rechunk=rechunk, + encoding=encoding, + n_threads=n_threads, + dtype=dtype, + low_memory=low_memory, + comment_char=comment_char, + null_values=null_values, + ) + + if new_columns: + df.columns = new_columns + return df + + +def scan_csv( + file: Union[str, Path], + has_headers: bool = True, + ignore_errors: bool = False, + sep: str = ",", + skip_rows: int = 0, + stop_after_n_rows: Optional[int] = None, + cache: bool = True, + dtype: Optional[Dict[str, Type["pl.DataType"]]] = None, + low_memory: bool = False, + comment_char: Optional[str] = None, + null_values: Optional[Union[str, List[str], Dict[str, str]]] = None, +) -> "pl.LazyFrame": + """ + Lazily read from a csv file. + + This allows the query optimizer to push down predicates and projections to the scan level, + thereby potentially reducing memory overhead. + + Parameters + ---------- + file + Path to a file. + has_headers + If the CSV file has headers or not. + ignore_errors + Try to keep reading lines if some lines yield errors. + sep + Delimiter/ value separator. + skip_rows + Start reading after `skip_rows`. + stop_after_n_rows + After n rows are read from the CSV, it stops reading. + During multi-threaded parsing, an upper bound of `n` rows + cannot be guaranteed. + cache + Cache the result after reading. + dtype + Overwrite the dtypes during inference. + low_memory + Reduce memory usage in expense of performance. + comment_char + character that indicates the start of a comment line, for instance '#'. + null_values + Values to interpret as null values. You can provide a: + + - str -> all values encountered equal to this string will be null + - List[str] -> A null value per column. + - Dict[str, str] -> A dictionary that maps column name to a null value string. + """ + if isinstance(file, Path): + file = str(file) + return pl.LazyFrame.scan_csv( + file=file, + has_headers=has_headers, + sep=sep, + ignore_errors=ignore_errors, + skip_rows=skip_rows, + stop_after_n_rows=stop_after_n_rows, + cache=cache, + dtype=dtype, + low_memory=low_memory, + comment_char=comment_char, + null_values=null_values, + ) + + +def scan_parquet( + file: Union[str, Path], + stop_after_n_rows: Optional[int] = None, + cache: bool = True, +) -> "pl.LazyFrame": + """ + Lazily read from a parquet file. + + This allows the query optimizer to push down predicates and projections to the scan level, + thereby potentially reducing memory overhead. + + Parameters + ---------- + file + Path to a file. + stop_after_n_rows + After n rows are read from the parquet, it stops reading. + cache + Cache the result after reading. + """ + if isinstance(file, Path): + file = str(file) + return pl.LazyFrame.scan_parquet( + file=file, stop_after_n_rows=stop_after_n_rows, cache=cache + ) + + +def read_ipc( + file: Union[str, BinaryIO, Path], + use_pyarrow: bool = True, + storage_options: Optional[Dict] = None, +) -> "pl.DataFrame": + """ + Read into a DataFrame from Arrow IPC stream format. This is also called the feather format. + + Parameters + ---------- + file + Path to a file or a file like object. + If ``fsspec`` is installed, it will be used to open non-local or compressed files + use_pyarrow + Use pyarrow or rust arrow backend. + storage_options + Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc. + + Returns + ------- + DataFrame + """ + storage_options = storage_options or {} + with _prepare_file_arg(file, **storage_options) as data: + return pl.DataFrame.read_ipc(data, use_pyarrow) + + +def read_parquet( + source: Union[str, List[str], Path, BinaryIO], + stop_after_n_rows: Optional[int] = None, + memory_map: bool = True, + columns: Optional[List[str]] = None, + storage_options: Optional[Dict] = None, + **kwargs: Any, +) -> "pl.DataFrame": + """ + Read into a DataFrame from a parquet file. + + Parameters + ---------- + source + Path to a file | list of files, or a file like object. If the path is a directory, that directory will be used + as partition aware scan. + If ``fsspec`` is installed, it will be used to open non-local or compressed files + stop_after_n_rows + After n rows are read from the parquet, it stops reading. Note: this cannot be used in partition aware parquet + reads. + memory_map + Memory map underlying file. This will likely increase performance. + columns + Columns to project/ select. + storage_options + Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc. + **kwargs + kwargs for [pyarrow.parquet.read_table](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html) + + Returns + ------- + DataFrame + """ + storage_options = storage_options or {} + with _prepare_file_arg(source, **storage_options) as source_prep: + if stop_after_n_rows is not None: + return pl.DataFrame.read_parquet( + source_prep, stop_after_n_rows=stop_after_n_rows + ) + return from_arrow( # type: ignore[return-value] + pa.parquet.read_table( + source_prep, memory_map=memory_map, columns=columns, **kwargs + ) + ) + + +def read_json(source: Union[str, BytesIO]) -> "pl.DataFrame": + """ + Read into a DataFrame from JSON format. + + Parameters + ---------- + source + Path to a file or a file like object. + """ + return pl.DataFrame.read_json(source) + + +def read_sql(sql: str, engine: Any) -> "pl.DataFrame": + """ + # Preface + Deprecated by design. Will not have a long future support and no guarantees given whatsoever. + Want backwards compatibility? + + Use: + + ```python + df = pl.from_pandas(pd.read_sql(sql, engine)) + ``` + + The support is limited because I want something better. + + # Docstring + Load a DataFrame from a database by sending a raw sql query. + Make sure to install sqlalchemy ^1.4 + + Parameters + ---------- + sql + raw sql query + engine : sqlalchemy engine + make sure to install sqlalchemy ^1.4 + """ + try: + # pandas sql loading is faster. + # conversion from pandas to arrow is very cheap compared to db driver + import pandas as pd + + return from_pandas(pd.read_sql(sql, engine)) # type: ignore[return-value] + except ImportError: + from sqlalchemy import text + + with engine.connect() as con: + result = con.execute(text(sql)) + + rows = result.fetchall() + return pl.DataFrame.from_rows(rows, list(result.keys())) diff --git a/py-polars/polars/lazy/__init__.py b/py-polars/polars/lazy/__init__.py index bb8049c8830a..90cf84fb38e3 100644 --- a/py-polars/polars/lazy/__init__.py +++ b/py-polars/polars/lazy/__init__.py @@ -1,5 +1,8 @@ # flake8: noqa +from . import expr, frame, functions, whenthen from .expr import * -from .expr_functions import * from .frame import * +from .functions import * from .whenthen import * + +__all__ = expr.__all__ + functions.__all__ + frame.__all__ + whenthen.__all__ diff --git a/py-polars/polars/lazy/expr.py b/py-polars/polars/lazy/expr.py index 7d3dd0a506ce..ccfeaf95534b 100644 --- a/py-polars/polars/lazy/expr.py +++ b/py-polars/polars/lazy/expr.py @@ -5,7 +5,7 @@ import polars as pl from ..datatypes import Boolean, DataType, Date32, Date64, Float64, Int64, Utf8 -from .expr_functions import UDF, col, lit +from .functions import UDF, col, lit try: from ..polars import PyExpr @@ -16,7 +16,6 @@ __pdoc__ = {"wrap_expr": False} __all__ = [ - "wrap_expr", "Expr", "ExprStringNameSpace", "ExprDateTimeNameSpace", diff --git a/py-polars/polars/lazy/frame.py b/py-polars/polars/lazy/frame.py index e76f61a085fb..5f1baa358dd3 100644 --- a/py-polars/polars/lazy/frame.py +++ b/py-polars/polars/lazy/frame.py @@ -23,7 +23,6 @@ __pdoc__ = {"wrap_ldf": False} __all__ = [ - "wrap_ldf", "LazyFrame", ] @@ -267,7 +266,7 @@ def collect( simplify_expression, string_cache, ) - return pl.wrap_df(ldf.collect()) + return pl.eager.frame.wrap_df(ldf.collect()) def fetch( self, @@ -320,7 +319,7 @@ def fetch( simplify_expression, string_cache, ) - return pl.wrap_df(ldf.fetch(n_rows)) + return pl.eager.frame.wrap_df(ldf.fetch(n_rows)) def cache( self, diff --git a/py-polars/polars/lazy/expr_functions.py b/py-polars/polars/lazy/functions.py similarity index 95% rename from py-polars/polars/lazy/expr_functions.py rename to py-polars/polars/lazy/functions.py index b7e477784c72..671022a7f5c6 100644 --- a/py-polars/polars/lazy/expr_functions.py +++ b/py-polars/polars/lazy/functions.py @@ -63,7 +63,7 @@ def col(name: str) -> "pl.Expr": """ A column in a DataFrame. """ - return pl.wrap_expr(pycol(name)) + return pl.lazy.expr.wrap_expr(pycol(name)) def except_(name: str) -> "pl.Expr": @@ -102,7 +102,7 @@ def except_(name: str) -> "pl.Expr": ╰─────┴─────╯ ``` """ - return pl.wrap_expr(pyexcept(name)) + return pl.lazy.expr.wrap_expr(pyexcept(name)) def count(column: Union[str, "pl.Series"] = "") -> Union["pl.Expr", int]: @@ -348,14 +348,14 @@ def lit( if isinstance(value, pl.Series): name = value.name value = value._s - return pl.wrap_expr(pylit(value)).alias(name) + return pl.lazy.expr.wrap_expr(pylit(value)).alias(name) if isinstance(value, np.ndarray): return lit(pl.Series("", value)) if dtype: - return pl.wrap_expr(pylit(value)).cast(dtype) - return pl.wrap_expr(pylit(value)) + return pl.lazy.expr.wrap_expr(pylit(value)).cast(dtype) + return pl.lazy.expr.wrap_expr(pylit(value)) def pearson_corr( @@ -376,7 +376,7 @@ def pearson_corr( a = col(a) if isinstance(b, str): b = col(b) - return pl.wrap_expr(pypearson_corr(a._pyexpr, b._pyexpr)) + return pl.lazy.expr.wrap_expr(pypearson_corr(a._pyexpr, b._pyexpr)) def cov( @@ -397,7 +397,7 @@ def cov( a = col(a) if isinstance(b, str): b = col(b) - return pl.wrap_expr(pycov(a._pyexpr, b._pyexpr)) + return pl.lazy.expr.wrap_expr(pycov(a._pyexpr, b._pyexpr)) def map_binary( @@ -424,7 +424,9 @@ def map_binary( a = col(a) if isinstance(b, str): b = col(b) - return pl.wrap_expr(pybinary_function(a._pyexpr, b._pyexpr, f, return_dtype)) + return pl.lazy.expr.wrap_expr( + pybinary_function(a._pyexpr, b._pyexpr, f, return_dtype) + ) def fold( @@ -582,7 +584,7 @@ def argsort_by( if not isinstance(reverse, list): reverse = [reverse] exprs = pl.lazy.expr._selection_to_pyexpr_list(exprs) - return pl.wrap_expr(pyargsort_by(exprs, reverse)) + return pl.lazy.expr.wrap_expr(pyargsort_by(exprs, reverse)) def concat_str(exprs: tp.List["pl.Expr"], delimiter: str = "") -> "pl.Expr": @@ -597,4 +599,4 @@ def concat_str(exprs: tp.List["pl.Expr"], delimiter: str = "") -> "pl.Expr": String value that will be used to separate the values. """ exprs = pl.lazy.expr._selection_to_pyexpr_list(exprs) - return pl.wrap_expr(_concat_str(exprs, delimiter)) + return pl.lazy.expr.wrap_expr(_concat_str(exprs, delimiter)) diff --git a/py-polars/polars/lazy/whenthen.py b/py-polars/polars/lazy/whenthen.py index c89fdfd08b5f..f6e19c3c9304 100644 --- a/py-polars/polars/lazy/whenthen.py +++ b/py-polars/polars/lazy/whenthen.py @@ -44,7 +44,7 @@ def otherwise(self, expr: Union["pl.Expr", int, float, str]) -> "pl.Expr": See Also: the `when` function. """ expr = expr_to_lit_or_expr(expr) - return pl.wrap_expr(self.pywenthenthen.otherwise(expr._pyexpr)) + return pl.lazy.expr.wrap_expr(self.pywenthenthen.otherwise(expr._pyexpr)) class WhenThen: @@ -68,7 +68,7 @@ def otherwise(self, expr: Union["pl.Expr", int, float, str]) -> "pl.Expr": See Also: the `when` function. """ expr = expr_to_lit_or_expr(expr) - return pl.wrap_expr(self._pywhenthen.otherwise(expr._pyexpr)) + return pl.lazy.expr.wrap_expr(self._pywhenthen.otherwise(expr._pyexpr)) class When: diff --git a/py-polars/polars/string_cache.py b/py-polars/polars/string_cache.py new file mode 100644 index 000000000000..bf9b7130f197 --- /dev/null +++ b/py-polars/polars/string_cache.py @@ -0,0 +1,45 @@ +from types import TracebackType +from typing import Optional, Type + +try: + from .polars import toggle_string_cache as pytoggle_string_cache +except ImportError: + import warnings + + warnings.warn("binary files missing") + + +__all__ = [ + "StringCache", + "toggle_string_cache", +] + + +class StringCache: + """ + Context manager that allows data sources to share the same categorical features. + This will temporarily cache the string categories until the context manager is finished. + """ + + def __init__(self) -> None: + pass + + def __enter__(self) -> "StringCache": + pytoggle_string_cache(True) + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + pytoggle_string_cache(False) + + +def toggle_string_cache(toggle: bool) -> None: + """ + Turn on/off the global string cache. This ensures that casts to Categorical types have the categories when string + values are equal. + """ + pytoggle_string_cache(toggle) diff --git a/py-polars/tests/test_df.py b/py-polars/tests/test_df.py index fed965ecbc7a..ff330087c482 100644 --- a/py-polars/tests/test_df.py +++ b/py-polars/tests/test_df.py @@ -700,12 +700,14 @@ def test_to_json(df): def test_from_rows(): - df = pl.from_rows([[1, 2, "foo"], [2, 3, "bar"]], column_name_mapping={1: "foo"}) + df = pl.DataFrame.from_rows( + [[1, 2, "foo"], [2, 3, "bar"]], column_name_mapping={1: "foo"} + ) assert df.frame_equal( pl.DataFrame({"column_0": [1, 2], "foo": [2, 3], "column_2": ["foo", "bar"]}) ) - df = pl.from_rows( + df = pl.DataFrame.from_rows( [[1, datetime.fromtimestamp(100)], [2, datetime.fromtimestamp(2398754908)]], column_name_mapping={1: "foo"}, )