diff --git a/.github/workflows/on_push.yml b/.github/workflows/on_push.yml index 3d3a590e..90e21fd7 100644 --- a/.github/workflows/on_push.yml +++ b/.github/workflows/on_push.yml @@ -51,8 +51,8 @@ jobs: clickhouse-version: - '22.8' - '23.3' - - '23.4' - - '23.5' + - '23.6' + - '23.7' - latest name: Local Tests Py=${{ matrix.python-version }} CH=${{ matrix.clickhouse-version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e771b2f..4764d49e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,15 @@ In any case, this should not affect the basic usage of Superset with ClickHouse. your Superset installation, the ClickHouse datasource will be available with either the enhanced connection dialog or a standard SqlAlchemy DSN in the form of `clickhousedb://{username}:{password}@{host}:{port}`. +## 0.6.11, 2023-08-30 +### Bug fixes +- Inserts using Pandas 2.1 would fail due to a removed method in the Pandas library. There is now a workaround/fix for +this. Closes https://github.com/ClickHouse/clickhouse-connect/issues/234 +- Inserts into a FixedString column that were not the expected size could cause corrupt insert blocksd and mysterious errors +from the ClickHouse server. Validation has been added so that more meaningful error messages are generated if a fixed string +value is an invalid size. A reminder that strings which are "too short" for a FixedString column will be padded with 0 bytes, while +strings that are "too long" will generate an exception during the insert. + ## 0.6.10, 2023-08-27 ### Improvement - Add support and tests for the `Object(Nullable('json'))` type, which is sometimes detected by schema inference. diff --git a/clickhouse_connect/__version__.py b/clickhouse_connect/__version__.py index ff5091b5..1e82f0f5 100644 --- a/clickhouse_connect/__version__.py +++ b/clickhouse_connect/__version__.py @@ -1 +1 @@ -version = '0.6.10' +version = '0.6.11' diff --git a/clickhouse_connect/datatypes/string.py b/clickhouse_connect/datatypes/string.py index 36ce575e..82bf5a70 100644 --- a/clickhouse_connect/datatypes/string.py +++ b/clickhouse_connect/datatypes/string.py @@ -3,6 +3,7 @@ from clickhouse_connect.driver.ctypes import data_conv from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef +from clickhouse_connect.driver.exceptions import DataError from clickhouse_connect.driver.insert import InsertContext from clickhouse_connect.driver.query import QueryContext from clickhouse_connect.driver.types import ByteSource @@ -79,7 +80,7 @@ def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryConte return source.read_fixed_str_col(self.byte_size, num_rows, ctx.encoding or self.encoding ) return source.read_bytes_col(self.byte_size, num_rows) - # pylint: disable=too-many-branches + # pylint: disable=too-many-branches,duplicate-code def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: bytearray, ctx: InsertContext): ext = dest.extend sz = self.byte_size @@ -97,6 +98,8 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: b b = str_enc(x, enc) except UnicodeEncodeError: b = empty + if len(b) > sz: + raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') ext(b) if len(b) < sz: ext(empty[:-len(b)]) @@ -106,15 +109,21 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: b b = str_enc(x, enc) except UnicodeEncodeError: b = empty + if len(b) > sz: + raise DataError(f'UTF-8 encoded FixedString value {b.hex(" ")} exceeds column size {sz}') ext(b) if len(b) < sz: ext(empty[:-len(b)]) elif self.nullable: - for x in column: - if not x: + for b in column: + if not b: ext(empty) + elif len(b) != sz: + raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') else: - ext(x) + ext(b) else: - for x in column: - ext(x) + for b in column: + if len(b) != sz: + raise DataError(f'Fixed String binary value {b.hex(" ")} does not match column size {sz}') + ext(b) diff --git a/clickhouse_connect/driver/insert.py b/clickhouse_connect/driver/insert.py index da606402..961cbdf5 100644 --- a/clickhouse_connect/driver/insert.py +++ b/clickhouse_connect/driver/insert.py @@ -6,7 +6,7 @@ from clickhouse_connect.driver.ctypes import data_conv from clickhouse_connect.driver.context import BaseQueryContext -from clickhouse_connect.driver.options import np, pd +from clickhouse_connect.driver.options import np, pd, pd_time_test from clickhouse_connect.driver.exceptions import ProgrammingError if TYPE_CHECKING: @@ -50,7 +50,7 @@ def __init__(self, self.column_oriented = False if column_oriented is None else column_oriented self.compression = compression self.req_block_size = block_size - self.block_size = DEFAULT_BLOCK_BYTES + self.block_row_count = DEFAULT_BLOCK_BYTES self.data = data self.insert_exception = None @@ -93,7 +93,7 @@ def data(self, data: Any): if self.column_count != len(self.column_names): raise ProgrammingError('Insert data column count does not match column names') self._data = data - self.block_size = self._calc_block_size() + self.block_row_count = self._calc_block_size() def _calc_block_size(self) -> int: if self.req_block_size: @@ -121,7 +121,7 @@ def _calc_block_size(self) -> int: def next_block(self) -> Generator[InsertBlock, None, None]: while True: - block_end = min(self.current_row + self.block_size, self.row_count) + block_end = min(self.current_row + self.block_row_count, self.row_count) row_count = block_end - self.current_row if row_count <= 0: return @@ -153,8 +153,7 @@ def _convert_pandas(self, df): df_col = df_col.round().astype(ch_type.base_type, copy=False) else: df_col = df_col.astype(ch_type.base_type, copy=False) - elif 'datetime' in ch_type.np_type and (pd.core.dtypes.common.is_datetime_or_timedelta_dtype(df_col) - or 'datetime64[ns' in d_type): + elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in d_type): div = ch_type.nano_divisor data.append([None if pd.isnull(x) else x.value // div for x in df_col]) self.column_formats[col_name] = 'int' diff --git a/clickhouse_connect/driver/options.py b/clickhouse_connect/driver/options.py index 3260fb74..4cec665c 100644 --- a/clickhouse_connect/driver/options.py +++ b/clickhouse_connect/driver/options.py @@ -1,5 +1,8 @@ from clickhouse_connect.driver.exceptions import NotSupportedError +pd_time_test = None +pd_extended_dtypes = False + try: import numpy as np except ImportError: @@ -8,9 +11,22 @@ try: import pandas as pd pd_extended_dtypes = not pd.__version__.startswith('0') + try: + from pandas.core.dtypes.common import is_datetime64_dtype + from pandas.core.dtypes.common import is_timedelta64_dtype + + def combined_test(arr_or_dtype): + return is_datetime64_dtype(arr_or_dtype) or is_timedelta64_dtype(arr_or_dtype) + + pd_time_test = combined_test + except ImportError: + try: + from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype + pd_time_test = is_datetime_or_timedelta_dtype + except ImportError as ex: + raise NotSupportedError('pandas version does not contain expected test for temporal types') from ex except ImportError: pd = None - pd_extended_dtypes = False try: import pyarrow as arrow diff --git a/clickhouse_connect/tools/testing.py b/clickhouse_connect/tools/testing.py index f12acaf8..f30b3f75 100644 --- a/clickhouse_connect/tools/testing.py +++ b/clickhouse_connect/tools/testing.py @@ -1,6 +1,7 @@ -from typing import Sequence, Optional, Union +from typing import Sequence, Optional, Union, Dict, Any from clickhouse_connect.driver import Client +from clickhouse_connect.driver.query import format_query_value class TableContext: @@ -9,9 +10,11 @@ def __init__(self, client: Client, columns: Union[str, Sequence[str]], column_types: Optional[Sequence[str]] = None, engine: str = 'MergeTree', - order_by: str = None): + order_by: str = None, + settings: Optional[Dict[str, Any]] = None): self.client = client self.table = table + self.settings = settings if isinstance(columns, str): columns = columns.split(',') if column_types is None: @@ -34,7 +37,15 @@ def __enter__(self): else: self.client.command(f'DROP TABLE IF EXISTS {self.table} SYNC') col_defs = ','.join(f'{name} {col_type}' for name, col_type in zip(self.column_names, self.column_types)) - self.client.command(f'CREATE TABLE {self.table} ({col_defs}) ENGINE {self.engine} ORDER BY {self.order_by}') + create_cmd = f'CREATE TABLE {self.table} ({col_defs}) ENGINE {self.engine} ORDER BY {self.order_by}' + if self.settings: + create_cmd += ' SETTINGS ' + for key, value in self.settings.items(): + + create_cmd += f'{key} = {format_query_value(value)}, ' + if create_cmd.endswith(', '): + create_cmd = create_cmd[:-2] + self.client.command(create_cmd) return self def __exit__(self, exc_type, exc_val, exc_tb): diff --git a/tests/integration_tests/test_inserts.py b/tests/integration_tests/test_inserts.py index d3535d2a..6a59bd32 100644 --- a/tests/integration_tests/test_inserts.py +++ b/tests/integration_tests/test_inserts.py @@ -32,3 +32,23 @@ def test_bad_data_insert(test_client: Client, table_context: Callable): test_client.insert('test_bad_insert', data) except DataError as ex: assert 'array' in str(ex) + + +def test_bad_strings(test_client: Client, table_context: Callable): + with table_context('test_bad_strings', 'key Int32, fs FixedString(6), nsf Nullable(FixedString(4))'): + try: + test_client.insert('test_bad_strings', [[1, b'\x0535', None]]) + except DataError as ex: + assert 'match' in str(ex) + try: + test_client.insert('test_bad_strings', [[1, b'\x0535abc', '😀🙃']]) + except DataError as ex: + assert 'encoded' in str(ex) + + +def test_low_card_dictionary_size(test_client: Client, table_context: Callable): + with table_context('test_low_card_dict', 'key Int32, lc LowCardinality(String)', + settings={'index_granularity': 65536 }): + data = [[x, str(x)] for x in range(30000)] + test_client.insert('test_low_card_dict', data) + assert 30000 == test_client.command('SELECT count() FROM test_low_card_dict') diff --git a/tests/unit_tests/test_driver/test_insert.py b/tests/unit_tests/test_driver/test_insert.py index 83018560..2b0fc86c 100644 --- a/tests/unit_tests/test_driver/test_insert.py +++ b/tests/unit_tests/test_driver/test_insert.py @@ -12,11 +12,11 @@ def test_block_size(): ['key', 'date_tuple'], [get_from_name('UInt64'), get_from_name('Tuple(Date, DateTime)')], data) - assert ctx.block_size == 2097152 + assert ctx.block_row_count == 2097152 data = [(x, fixed_len_ascii_str(400)) for x in range(5000)] ctx = InsertContext('fake_table', ['key', 'big_str'], [get_from_name('Int32'), get_from_name('String')], data) - assert ctx.block_size == 65536 + assert ctx.block_row_count == 65536