Skip to content

Commit

Permalink
fix read_parquet/csv/delta and cache (letsql#83)
Browse files Browse the repository at this point in the history
closes letsql#82
  • Loading branch information
mesejo authored Jun 11, 2024
1 parent 5d705a0 commit 27fb007
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
30 changes: 30 additions & 0 deletions python/letsql/backends/let/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,36 @@ def register(

return registered_table

def read_parquet(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
registered_table = super().read_parquet(path, table_name=table_name, **kwargs)
self._sources[registered_table.op()] = registered_table.op()
return registered_table

def read_csv(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
registered_table = super().read_csv(path, table_name=table_name, **kwargs)
self._sources[registered_table.op()] = registered_table.op()
return registered_table

def read_json(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
registered_table = super().read_json(path, table_name=table_name, **kwargs)
self._sources[registered_table.op()] = registered_table.op()
return registered_table

def read_delta(
self, source_table: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
registered_table = super().read_delta(
source_table, table_name=table_name, **kwargs
)
self._sources[registered_table.op()] = registered_table.op()
return registered_table

def execute(self, expr: ir.Expr, **kwargs: Any):
not_multi_engine = self._get_source(expr) != self
if (
Expand Down
55 changes: 55 additions & 0 deletions python/letsql/backends/let/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pathlib
import time
import uuid

import ibis
import pytest
Expand Down Expand Up @@ -33,6 +34,20 @@ def pg_alltypes(pg):
return pg.table("functional_alltypes")


@pytest.fixture(scope="session")
def csv_dir():
root = pathlib.Path(__file__).absolute().parents[5]
data_dir = root / "ci" / "ibis-testing-data" / "csv"
return data_dir


@pytest.fixture(scope="session")
def parquet_dir():
root = pathlib.Path(__file__).absolute().parents[5]
data_dir = root / "ci" / "ibis-testing-data" / "parquet"
return data_dir


def test_cache_simple(con, alltypes, alltypes_df):
initial_tables = con.list_tables()

Expand Down Expand Up @@ -415,3 +430,43 @@ def test_caching_of_registered_arbitrary_expression(con, pg, tmp_path):

assert result is not None
assert_frame_equal(result, expected, check_like=True)


def test_read_parquet_and_cache(con, parquet_dir, tmp_path):
batting_path = parquet_dir / "batting.parquet"
t = con.read_parquet(batting_path, table_name=f"parquet_batting-{uuid.uuid4()}")
expr = t.cache(storage=ParquetCacheStorage(source=con, path=tmp_path))
assert expr.execute() is not None


def test_read_parquet_compute_and_cache(con, parquet_dir, tmp_path):
batting_path = parquet_dir / "batting.parquet"
t = con.read_parquet(batting_path, table_name=f"parquet_batting-{uuid.uuid4()}")
expr = (
t[t.yearID == 2015]
.cache(storage=ParquetCacheStorage(source=con, path=tmp_path))
.cache()
)
assert expr.execute() is not None


def test_read_csv_and_cache(con, csv_dir, tmp_path):
batting_path = csv_dir / "batting.csv"
t = con.read_csv(batting_path, table_name=f"csv_batting-{uuid.uuid4()}")
expr = t.cache(storage=ParquetCacheStorage(source=con, path=tmp_path))
assert expr.execute() is not None


def test_read_csv_compute_and_cache(con, csv_dir, tmp_path):
batting_path = csv_dir / "batting.csv"
t = con.read_csv(
batting_path,
table_name=f"csv_batting-{uuid.uuid4()}",
schema_infer_max_records=50_000,
)
expr = (
t[t.yearID == 2015]
.cache(storage=ParquetCacheStorage(source=con, path=tmp_path))
.cache()
)
assert expr.execute() is not None

0 comments on commit 27fb007

Please sign in to comment.