Skip to content
This repository has been archived by the owner on Feb 20, 2025. It is now read-only.

Commit

Permalink
Merge pull request #24 from JasperHG90/feat/pandas
Browse files Browse the repository at this point in the history
Add Pandas IO manager
  • Loading branch information
JasperHG90 authored Nov 18, 2024
2 parents 3d98df2 + 60a98e6 commit b284435
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 16 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ daft = [
polars = [
"polars>=1.13.1",
]
pandas = [
"pandas>=2.2.3",
]

[tool.black]
line-length = 88
Expand Down
1 change: 0 additions & 1 deletion src/dagster_pyiceberg/_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dagster_pyiceberg._utils.io import CatalogTypes as CatalogTypes
from dagster_pyiceberg._utils.io import table_writer as table_writer
from dagster_pyiceberg._utils.partitions import (
DagsterPartitionToDaftSqlPredicateMapper as DagsterPartitionToDaftSqlPredicateMapper,
Expand Down
15 changes: 6 additions & 9 deletions src/dagster_pyiceberg/_utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from pyiceberg import __version__ as iceberg_version
from pyiceberg import expressions as E
from pyiceberg import table as iceberg_table
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import CommitFailedException, TableAlreadyExistsError
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
Expand All @@ -19,13 +18,11 @@
from dagster_pyiceberg._utils.schema import update_table_schema
from dagster_pyiceberg.version import __version__ as dagster_pyiceberg_version

CatalogTypes = Union[SqlCatalog, RestCatalog]


def table_writer(
table_slice: TableSlice,
data: pa.Table,
catalog: CatalogTypes,
catalog: Catalog,
schema_update_mode: str,
partition_spec_update_mode: str,
dagster_run_id: str,
Expand All @@ -38,7 +35,7 @@ def table_writer(
table_slice (TableSlice): dagster database IO manager table slice. This
contains information about dagster partitions.
data (pa.Table): PyArrow table
catalog (CatalogTypes): PyIceberg catalogs supported by this library
catalog (Catalog): PyIceberg catalogs supported by this library
schema_update_mode (str): Whether to process schema updates on existing
tables or error, value is either 'error' or 'update'
Expand Down Expand Up @@ -149,15 +146,15 @@ def get_expression_row_filter(


def create_table_if_not_exists(
catalog: CatalogTypes,
catalog: Catalog,
table_path: str,
schema: pa.Schema,
properties: Dict[str, str],
) -> iceberg_table.Table:
"""Creates an iceberg table and retries on failure
Args:
catalog (CatalogTypes): PyIceberg catalogs supported by this library
catalog (Catalog): PyIceberg catalogs supported by this library
table_path (str): Table path
schema (pa.Schema): PyArrow schema
properties (Dict[str, str]): Table properties
Expand All @@ -177,7 +174,7 @@ def create_table_if_not_exists(

class PyIcebergCreateTableIfNotExistsWithRetry(PyIcebergOperationWithRetry):

def __init__(self, catalog: CatalogTypes):
def __init__(self, catalog: Catalog):
self.catalog = catalog

def refresh(self):
Expand Down
7 changes: 4 additions & 3 deletions src/dagster_pyiceberg/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice
from pyiceberg import table as ibt
from pyiceberg.catalog import Catalog
from pyiceberg.table.snapshots import Snapshot

from dagster_pyiceberg._utils import CatalogTypes, table_writer
from dagster_pyiceberg._utils import table_writer

U = TypeVar("U")

Expand All @@ -31,7 +32,7 @@ def handle_output(
context: OutputContext,
table_slice: TableSlice,
obj: U,
connection: CatalogTypes,
connection: Catalog,
):
"""Stores pyarrow types in Iceberg table"""
metadata = context.definition_metadata or {} # noqa
Expand Down Expand Up @@ -90,7 +91,7 @@ def load_input(
self,
context: InputContext,
table_slice: TableSlice,
connection: CatalogTypes,
connection: Catalog,
) -> U:
"""Loads the input using a dataframe implmentation"""
return self.to_data_frame(
Expand Down
5 changes: 2 additions & 3 deletions src/dagster_pyiceberg/io_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from dagster_pyiceberg._db_io_manager import CustomDbIOManager
from dagster_pyiceberg.config import IcebergCatalogConfig # noqa
from dagster_pyiceberg.handler import CatalogTypes


class PartitionSpecUpdateMode(enum.Enum):
Expand Down Expand Up @@ -53,12 +52,12 @@ class IcebergDbClient(DbClient):

@staticmethod
def delete_table_slice(
context: OutputContext, table_slice: TableSlice, connection: CatalogTypes
context: OutputContext, table_slice: TableSlice, connection: Catalog
) -> None: ...

@staticmethod
def ensure_schema_exists(
context: OutputContext, table_slice: TableSlice, connection: CatalogTypes
context: OutputContext, table_slice: TableSlice, connection: Catalog
) -> None: ...

@staticmethod
Expand Down
115 changes: 115 additions & 0 deletions src/dagster_pyiceberg/io_manager/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Sequence, Type

try:
import pandas as pd
except ImportError as e:
raise ImportError(
"Please install dagster-pyiceberg with the 'pandas' extra."
) from e
import pyarrow as pa
from dagster import InputContext
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice
from pyiceberg.catalog import Catalog

from dagster_pyiceberg import io_manager as _io_manager
from dagster_pyiceberg.io_manager.arrow import _IcebergPyArrowTypeHandler


class _IcebergPandasTypeHandler(_IcebergPyArrowTypeHandler):
"""Type handler that converts data between Iceberg tables and pyarrow Tables"""

def to_arrow(self, obj: pd.DataFrame) -> pa.Table:
return pa.Table.from_pandas(obj)

def load_input(
self,
context: InputContext,
table_slice: TableSlice,
connection: Catalog,
) -> pd.DataFrame:
"""Loads the input using a dataframe implmentation"""
tbl: pa.Table = self.to_data_frame(
table=connection.load_table(f"{table_slice.schema}.{table_slice.table}"),
table_slice=table_slice,
target_type=pa.RecordBatchReader,
)
return tbl.read_pandas()

@property
def supported_types(self) -> Sequence[Type[object]]:
return [pd.DataFrame]


class IcebergPandasIOManager(_io_manager.IcebergIOManager):
"""An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Pandas.
Examples:
.. code-block:: python
import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": IcebergPyarrowIOManager(
name="test",
config=IcebergSqlCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
schema="dagster",
)
}
@asset
def iris_dataset() -> pd.DataFrame:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using
the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example.
For ops, the schema can be specified by including a "schema" entry in output metadata. If none
of these is provided, the schema will default to "public".
.. code-block:: python
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the
In or AssetIn.
.. code-block:: python
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
"""

@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [_IcebergPandasTypeHandler()]
Loading

0 comments on commit b284435

Please sign in to comment.