Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_macos_arm64_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
brew install ca-certificates lz4 mpdecimal openssl@3 readline sqlite xz z3 zstd
brew install --ignore-dependencies llvm@19
brew install git ninja libtool gettext gcc binutils grep findutils nasm
brew install --build-from-source ccache
brew install --build-from-source ccache || echo "ccache installation failed, continuing without it"
brew install go
cd /usr/local/opt/ && sudo rm -f llvm && sudo ln -sf llvm@19 llvm
export PATH=$(brew --prefix llvm@19)/bin:$PATH
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_macos_x86_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
brew install ca-certificates lz4 mpdecimal openssl@3 readline sqlite xz z3 zstd
brew install --ignore-dependencies llvm@19
brew install git ninja libtool gettext gcc binutils grep findutils nasm
brew install --build-from-source ccache
brew install --build-from-source ccache || echo "ccache installation failed, continuing without it"
brew install go
cd /usr/local/opt/ && sudo rm -f llvm && sudo ln -sf llvm@19 llvm
export PATH=$(brew --prefix llvm@19)/bin:$PATH
Expand Down
63 changes: 41 additions & 22 deletions chdb/state/sqlitelike.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
from typing import Optional, Any
from typing import Optional, Any, TYPE_CHECKING, List, Tuple
from chdb import _chdb

# try import pyarrow if failed, raise ImportError with suggestion
try:
import pyarrow as pa # noqa
except ImportError as e:
print(f"ImportError: {e}")
print('Please install pyarrow via "pip install pyarrow"')
raise ImportError("Failed to import pyarrow") from None
if TYPE_CHECKING:
import pyarrow as pa


def _import_pyarrow():
"""Lazy import pyarrow when needed."""
try:
import pyarrow as pa
return pa
except ImportError:
raise ImportError(
"PyArrow is required for this feature. "
"Install with: pip install pyarrow"
) from None


def _import_pandas():
"""Lazy import pandas when needed."""
try:
import pandas as pd
return pd
except ImportError:
raise ImportError(
"Pandas is required for DataFrame conversion. "
"Install with: pip install pandas"
) from None


_arrow_format = set({"dataframe", "arrowtable"})
Expand All @@ -32,11 +51,11 @@ def to_arrowTable(res):
pyarrow.Table: PyArrow Table containing the query results

Raises:
ImportError: If pyarrow or pandas packages are not installed
ImportError: If pyarrow package is not installed

.. note::
This function requires both pyarrow and pandas to be installed.
Install them with: ``pip install pyarrow pandas``
This function requires pyarrow to be installed.
Install with: ``pip install pyarrow``

.. warning::
Empty results return an empty PyArrow Table with no schema.
Expand All @@ -52,14 +71,7 @@ def to_arrowTable(res):
num text
0 1 hello
"""
# try import pyarrow and pandas, if failed, raise ImportError with suggestion
try:
import pyarrow as pa # noqa
import pandas as pd # noqa
except ImportError as e:
print(f"ImportError: {e}")
print('Please install pyarrow and pandas via "pip install pyarrow pandas"')
raise ImportError("Failed to import pyarrow or pandas") from None
pa = _import_pyarrow()
if len(res) == 0:
return pa.Table.from_batches([], schema=pa.schema([]))
return pa.RecordBatchFileReader(res.bytes()).read_all()
Expand Down Expand Up @@ -102,6 +114,7 @@ def to_df(r):
text object
dtype: object
"""
_import_pandas()
t = to_arrowTable(r)
return t.to_pandas(use_threads=True)

Expand Down Expand Up @@ -230,7 +243,7 @@ def cancel(self):
except Exception as e:
raise RuntimeError(f"Failed to cancel streaming query: {str(e)}") from e

def record_batch(self, rows_per_batch: int = 1000000) -> pa.RecordBatchReader:
def record_batch(self, rows_per_batch: int = 1000000) -> "pa.RecordBatchReader":
"""
Create a PyArrow RecordBatchReader from this StreamingResult.

Expand All @@ -242,17 +255,19 @@ def record_batch(self, rows_per_batch: int = 1000000) -> pa.RecordBatchReader:
rows_per_batch (int): Number of rows per batch. Defaults to 1000000.

Returns:
pa.RecordBatchReader: PyArrow RecordBatchReader for efficient streaming
pyarrow.RecordBatchReader: PyArrow RecordBatchReader for efficient streaming

Raises:
ValueError: If the StreamingResult was not created with arrow format
ImportError: If PyArrow is not installed
"""
if not self._supports_record_batch:
raise ValueError(
"record_batch() can only be used with arrow format. "
"Please use format='Arrow' when calling send_query."
)

pa = _import_pyarrow()
chdb_reader = ChdbRecordBatchReader(self, rows_per_batch)
return pa.RecordBatchReader.from_batches(chdb_reader.schema(), chdb_reader)

Expand All @@ -275,10 +290,12 @@ def __init__(self, chdb_stream_result, batch_size_rows):
self._current_rows = 0
self._first_batch = None
self._first_batch_consumed = True
self._pa = _import_pyarrow()
self._schema = self.schema()

def schema(self):
if self._schema is None:
pa = self._pa
# Get the first chunk to determine schema
chunk = self._stream_result.fetch()
if chunk is not None:
Expand All @@ -304,6 +321,8 @@ def schema(self):
return self._schema

def read_next_batch(self):
pa = self._pa

if self._accumulator:
result = self._accumulator.pop(0)
return result
Expand Down Expand Up @@ -600,7 +619,7 @@ class Cursor:
def __init__(self, connection):
self._conn = connection
self._cursor = self._conn.cursor()
self._current_table: Optional[pa.Table] = None
self._current_table: Optional[List[Tuple]] = None
self._current_row: int = 0

def execute(self, query: str) -> None:
Expand Down
Loading