From df427da8a1c6578094afce03e1a8aad7b858f5d8 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 26 Nov 2024 12:13:11 +0800 Subject: [PATCH 1/2] Add read API to convert result to DuckDB and Ray --- dev/dev-requirements.txt | 2 + paimon_python_api/table_read.py | 17 ++++++- paimon_python_java/pypaimon.py | 15 ++++++ .../tests/test_write_and_read.py | 49 ++++++++++++++----- 4 files changed, 71 insertions(+), 12 deletions(-) diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt index 7fd1aeb..4ed964e 100755 --- a/dev/dev-requirements.txt +++ b/dev/dev-requirements.txt @@ -26,3 +26,5 @@ numpy>=1.22.4 python-dateutil>=2.8.0,<3 pytz>=2018.3 pytest~=7.0 +duckdb>=0.5.0,<2.0.0 +ray~=2.10.0 diff --git a/paimon_python_api/table_read.py b/paimon_python_api/table_read.py index 24095b4..a82eb10 100644 --- a/paimon_python_api/table_read.py +++ b/paimon_python_api/table_read.py @@ -18,10 +18,12 @@ import pandas as pd import pyarrow as pa +import ray from abc import ABC, abstractmethod +from duckdb.duckdb import DuckDBPyConnection from paimon_python_api import Split -from typing import List +from typing import List, Optional class TableRead(ABC): @@ -38,3 +40,16 @@ def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader: @abstractmethod def to_pandas(self, splits: List[Split]) -> pd.DataFrame: """Read data from splits and converted to pandas.DataFrame format.""" + + @abstractmethod + def to_duckdb( + self, + splits: List[Split], + table_name: str, + connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Convert splits into an in-memory DuckDB table which can be queried.""" + + @abstractmethod + def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: + """Convert splits into a Ray dataset format.""" + diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py index b884fa4..803540c 100644 --- a/paimon_python_java/pypaimon.py +++ b/paimon_python_java/pypaimon.py @@ -16,9 +16,12 @@ # limitations under the License. ################################################################################ +import duckdb import pandas as pd import pyarrow as pa +import ray +from duckdb.duckdb import DuckDBPyConnection from paimon_python_java.java_gateway import get_gateway from paimon_python_java.util import java_utils, constants from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read, @@ -161,6 +164,18 @@ def to_arrow_batch_reader(self, splits): def to_pandas(self, splits: List[Split]) -> pd.DataFrame: return self.to_arrow(splits).to_pandas() + def to_duckdb( + self, + splits: List[Split], + table_name: str, + connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow(splits)) + return con + + def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: + return ray.data.from_arrow(self.to_arrow(splits)) + def _init(self): if self._j_bytes_reader is None: # get thread num diff --git a/paimon_python_java/tests/test_write_and_read.py b/paimon_python_java/tests/test_write_and_read.py index 337b9f5..e2c631d 100644 --- a/paimon_python_java/tests/test_write_and_read.py +++ b/paimon_python_java/tests/test_write_and_read.py @@ -267,6 +267,12 @@ def testAllWriteAndReadApi(self): table_write.close() table_commit.close() + all_data = pd.DataFrame({ + 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9], + 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], + }) + all_data['f0'] = all_data['f0'].astype('int32') + read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() @@ -274,10 +280,7 @@ def testAllWriteAndReadApi(self): # to_arrow actual = table_read.to_arrow(splits) - expected = pa.Table.from_pydict({ - 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9], - 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], - }, schema=self.simple_pa_schema) + expected = pa.Table.from_pandas(all_data, schema=self.simple_pa_schema) self.assertEqual(actual, expected) # to_arrow_batch_reader @@ -286,18 +289,42 @@ def testAllWriteAndReadApi(self): for batch in table_read.to_arrow_batch_reader(splits) ] actual = pd.concat(data_frames) - expected = pd.DataFrame({ - 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9], - 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], - }) - expected['f0'] = expected['f0'].astype('int32') pd.testing.assert_frame_equal( - actual.reset_index(drop=True), expected.reset_index(drop=True)) + actual.reset_index(drop=True), all_data.reset_index(drop=True)) # to_pandas actual = table_read.to_pandas(splits) pd.testing.assert_frame_equal( - actual.reset_index(drop=True), expected.reset_index(drop=True)) + actual.reset_index(drop=True), all_data.reset_index(drop=True)) + + # to_duckdb + duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') + # select * + result1 = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf() + pd.testing.assert_frame_equal( + result1.reset_index(drop=True), all_data.reset_index(drop=True)) + # select * where + result2 = duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 < 4").fetchdf() + expected2 = pd.DataFrame({ + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'] + }) + expected2['f0'] = expected2['f0'].astype('int32') + pd.testing.assert_frame_equal( + result2.reset_index(drop=True), expected2.reset_index(drop=True)) + # select f0 where + result3 = duckdb_con.query("SELECT f0 FROM duckdb_table WHERE f0 < 4").fetchdf() + expected3 = pd.DataFrame({ + 'f0': [1, 2, 3] + }) + expected3['f0'] = expected3['f0'].astype('int32') + pd.testing.assert_frame_equal( + result3.reset_index(drop=True), expected3.reset_index(drop=True)) + + # to_ray + ray_dataset = table_read.to_ray(splits) + pd.testing.assert_frame_equal( + ray_dataset.to_pandas().reset_index(drop=True), all_data.reset_index(drop=True)) def test_overwrite(self): schema = Schema(self.simple_pa_schema, partition_keys=['f0'], From 6e5351e6627ecca6e73e5c1472dfa46747a46439 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 26 Nov 2024 12:58:47 +0800 Subject: [PATCH 2/2] fix --- paimon_python_api/table_read.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon_python_api/table_read.py b/paimon_python_api/table_read.py index a82eb10..f0a7b59 100644 --- a/paimon_python_api/table_read.py +++ b/paimon_python_api/table_read.py @@ -52,4 +52,3 @@ def to_duckdb( @abstractmethod def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: """Convert splits into a Ray dataset format.""" -