Skip to content

Commit 50a8aa8

Browse files
committed
feat(python/adbc_driver_manager): support more sans PyArrow
Fixes #2827.
1 parent 19ff899 commit 50a8aa8

File tree

3 files changed

+343
-30
lines changed

3 files changed

+343
-30
lines changed
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Backend-specific operations for the DB-API layer.
20+
21+
These are mostly functions that convert Python types to/from Arrow types.
22+
They are abstracted so that we can support multiple backends like PyArrow,
23+
polars, and nanoarrow.
24+
"""
25+
26+
import abc
27+
import typing
28+
29+
from . import _lib
30+
31+
if typing.TYPE_CHECKING:
32+
import pandas
33+
from typing_extensions import CapsuleType
34+
35+
36+
class RowIterator(abc.ABC):
37+
"""An iterator over a result set in DB-API style."""
38+
39+
def __init__(self, handle: _lib.ArrowArrayStreamHandle) -> None:
40+
self._handle = handle
41+
42+
@abc.abstractmethod
43+
def close(self) -> None:
44+
if self._handle is not None:
45+
handle = self._handle
46+
self._handle = None
47+
handle.release()
48+
49+
@property
50+
@abc.abstractmethod
51+
def description(self) -> list[tuple]: ...
52+
53+
@abc.abstractmethod
54+
def fetchone(self) -> tuple | None: ...
55+
56+
@abc.abstractmethod
57+
def fetchmany(self, size: int) -> list[tuple]: ...
58+
59+
@abc.abstractmethod
60+
def fetchall(self) -> list[tuple]: ...
61+
62+
@abc.abstractmethod
63+
def fetch_arrow_table(self) -> "pyarrow.Table": ...
64+
65+
@abc.abstractmethod
66+
def fetch_df(self) -> "pandas.DataFrame": ...
67+
68+
@abc.abstractmethod
69+
def fetch_polars(self) -> "polars.DataFrame": ...
70+
71+
@abc.abstractmethod
72+
def fetch_arrow(self) -> _lib.ArrowArrayStreamHandle:
73+
if self._handle is None:
74+
raise _lib.ProgrammingError(
75+
"Result set has been closed or consumed",
76+
status_code=_lib.AdbcStatusCode.INVALID_STATE,
77+
)
78+
handle, self._handle = self._handle, None
79+
return handle
80+
81+
82+
class DbapiBackend(abc.ABC):
83+
"""
84+
Python/Arrow type conversions that the DB-API layer needs.
85+
86+
The return types can and should vary based on the backend.
87+
"""
88+
89+
@abc.abstractmethod
90+
def convert_bind_parameters(self, parameters: typing.Any) -> "CapsuleType":
91+
"""Convert an arbitrary Python object into bind parameters.
92+
93+
Parameters
94+
----------
95+
parameters
96+
A sequence of bind parameters. For instance: a tuple, where each
97+
item is a bind parameter in sequence.
98+
99+
Returns
100+
-------
101+
parameters : CapsuleType
102+
This should be an Arrow stream capsule or an object implementing
103+
the Arrow PyCapsule interface.
104+
105+
See Also
106+
--------
107+
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
108+
109+
"""
110+
...
111+
112+
@abc.abstractmethod
113+
def convert_executemany_parameters(self, parameters: typing.Any) -> "CapsuleType":
114+
"""Convert an arbitrary Python sequence into bind parameters.
115+
116+
Parameters
117+
----------
118+
parameters
119+
A sequence of bind parameters. For instance: an iterable of
120+
tuples, where each tuple is a row of bind parameters.
121+
122+
Returns
123+
-------
124+
parameters : CapsuleType
125+
This should be an Arrow stream capsule or an object implementing
126+
the Arrow PyCapsule interface.
127+
128+
See Also
129+
--------
130+
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
131+
132+
"""
133+
...
134+
135+
# @abc.abstractmethod
136+
# def import_rowiterator(
137+
# self, stmt: _lib.AdbcStatement, handle: _lib.ArrowArrayStreamHandle
138+
# ) -> RowIterator:
139+
# """Wrap a result set in a DB-API-like interface."""
140+
# ...
141+
142+
@abc.abstractmethod
143+
def import_array_stream(self, handle: _lib.ArrowArrayStreamHandle) -> typing.Any:
144+
"""Import an Arrow stream."""
145+
...
146+
147+
@abc.abstractmethod
148+
def import_schema(self, handle: _lib.ArrowSchemaHandle) -> typing.Any:
149+
"""Import an Arrow schema."""
150+
...
151+
152+
153+
_ALL_BACKENDS: list[DbapiBackend] = []
154+
155+
156+
def default_backend() -> DbapiBackend:
157+
return _ALL_BACKENDS[-1]
158+
159+
160+
class _NoOpBackend(DbapiBackend):
161+
def convert_bind_parameters(self, parameters: typing.Any) -> "CapsuleType":
162+
raise _lib.ProgrammingError(
163+
"This API requires PyArrow or another suitable backend to be installed",
164+
status_code=_lib.AdbcStatusCode.INVALID_STATE,
165+
)
166+
167+
def convert_executemany_parameters(self, parameters: typing.Any) -> "CapsuleType":
168+
raise _lib.ProgrammingError(
169+
"This API requires PyArrow or another suitable backend to be installed",
170+
status_code=_lib.AdbcStatusCode.INVALID_STATE,
171+
)
172+
173+
def import_array_stream(
174+
self, handle: _lib.ArrowArrayStreamHandle
175+
) -> _lib.ArrowArrayStreamHandle:
176+
return handle
177+
178+
def import_schema(self, handle: _lib.ArrowSchemaHandle) -> _lib.ArrowSchemaHandle:
179+
return handle
180+
181+
182+
_ALL_BACKENDS.append(_NoOpBackend())
183+
184+
# Insert Nanoarrow, Polars
185+
186+
try:
187+
import polars
188+
189+
class _PolarsBackend(DbapiBackend):
190+
def convert_bind_parameters(self, parameters: typing.Any) -> "CapsuleType":
191+
return polars.DataFrame(
192+
{str(col_idx): x for col_idx, x in enumerate(parameters)},
193+
)
194+
195+
def convert_executemany_parameters(
196+
self, parameters: typing.Any
197+
) -> "CapsuleType":
198+
return polars.DataFrame(
199+
{
200+
str(col_idx): x
201+
for col_idx, x in enumerate(map(list, zip(*parameters)))
202+
},
203+
)
204+
205+
def import_array_stream(
206+
self, handle: _lib.ArrowSchemaHandle
207+
) -> polars.LazyFrame:
208+
return polars.from_arrow(handle)
209+
210+
def import_schema(self, handle: _lib.ArrowSchemaHandle) -> polars.Schema:
211+
raise _lib.NotSupportedError("Polars does not support __arrow_c_schema__")
212+
213+
_ALL_BACKENDS.append(_PolarsBackend())
214+
except ImportError:
215+
pass
216+
217+
# Keep PyArrow at the end so it stays default
218+
try:
219+
import pyarrow
220+
221+
class _PyArrowBackend(DbapiBackend):
222+
def convert_bind_parameters(self, parameters: typing.Any) -> "CapsuleType":
223+
return pyarrow.record_batch(
224+
[[param_value] for param_value in parameters],
225+
names=[str(i) for i in range(len(parameters))],
226+
)
227+
228+
def convert_executemany_parameters(
229+
self, parameters: typing.Any
230+
) -> "CapsuleType":
231+
return pyarrow.RecordBatch.from_pydict(
232+
{
233+
str(col_idx): pyarrow.array(x)
234+
for col_idx, x in enumerate(map(list, zip(*parameters)))
235+
},
236+
)
237+
238+
def import_array_stream(
239+
self, handle: _lib.ArrowSchemaHandle
240+
) -> pyarrow.RecordBatchReader:
241+
return pyarrow.RecordBatchReader._import_from_c(handle)
242+
243+
def import_schema(self, handle: _lib.ArrowSchemaHandle) -> pyarrow.Schema:
244+
return pyarrow.schema(handle)
245+
246+
_ALL_BACKENDS.append(_PyArrowBackend())
247+
248+
except ImportError:
249+
pass

0 commit comments

Comments
 (0)