Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions ydb/library/yql/providers/generic/connector/tests/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from utils.log import make_logger
from utils.schema import Schema
from utils.settings import Settings
from utils.sql import format_values_for_bulk_sql_insert

import test_cases.select_missing_database
import test_cases.select_missing_table
Expand All @@ -25,7 +26,7 @@ def prepare_table(
schema: Schema,
data_in: Sequence,
):
dbTable = f'{database.name}.{table_name}'
dbTable = f"{database.name}.{table_name}"

# create database
create_database_stmt = database.create(data_source_pb2.CLICKHOUSE)
Expand All @@ -43,26 +44,17 @@ def prepare_table(
return

# create table
create_table_stmt = f'CREATE TABLE {dbTable} ({schema.yql_column_list(data_source_pb2.CLICKHOUSE)}) ENGINE = Memory'
create_table_stmt = f"CREATE TABLE {dbTable} ({schema.yql_column_list(data_source_pb2.CLICKHOUSE)}) ENGINE = Memory"
LOGGER.debug(create_table_stmt)
client.command(create_table_stmt)

# write data
for row in data_in:
# prepare string with serialized data
values_dump = []
for val in row:
if isinstance(val, str):
values_dump.append(f"'{val}'")
elif val is None:
values_dump.append('NULL')
else:
values_dump.append(str(val))
values = ", ".join(values_dump)

insert_stmt = f"INSERT INTO {dbTable} (*) VALUES ({values})"
LOGGER.debug(insert_stmt)
client.command(insert_stmt)
values = format_values_for_bulk_sql_insert(data_in)
insert_stmt = f"INSERT INTO {dbTable} (*) VALUES {values}"
# TODO: these logs may be too big when working with big tables,
# dump insert statement via yatest into file.
LOGGER.debug(insert_stmt)
client.command(insert_stmt)


def select_positive(
Expand All @@ -82,21 +74,25 @@ def select_positive(

# NOTE: to assert equivalence we have to add explicit ORDER BY,
# because Clickhouse's output will be randomly ordered otherwise.
where_statement = ''
where_statement = ""
if test_case.select_where is not None:
where_statement = f'WHERE {test_case.select_where.filter_expression}'
order_by_expression = ''
where_statement = f"WHERE {test_case.select_where.filter_expression}"
order_by_expression = ""
order_by_column_name = test_case.select_what.order_by_column_name
if order_by_column_name:
order_by_expression = f'ORDER BY {order_by_column_name}'
yql_script = f'''
order_by_expression = f"ORDER BY {order_by_column_name}"
yql_script = f"""
{test_case.pragmas_sql_string}
SELECT {test_case.select_what.yql_select_names}
FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name}
{where_statement}
{order_by_expression}
'''
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
"""
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert result.returncode == 0, result.stderr

Expand All @@ -115,11 +111,15 @@ def select_missing_database(
test_case: test_cases.select_missing_database.TestCase,
):
# select table from the database that does not exist
yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name}
'''
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
"""
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_database_msg(data_source_pb2.CLICKHOUSE) in result.stderr, result.stderr

Expand All @@ -136,10 +136,14 @@ def select_missing_table(
LOGGER.debug(create_database_stmt)
client.command(create_database_stmt)

yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name}
'''
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
"""
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_table_msg(data_source_pb2.CLICKHOUSE) in result.stderr, result.stderr
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
version: '3.4'
services:
postgres:
image: "postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085"
postgresql:
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
environment:
POSTGRES_DB: db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
ports:
- "15432:5432"
- 15432:5432
clickhouse:
image: "clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06"
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
environment:
CLICKHOUSE_DB: db
CLICKHOUSE_USER: user
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: password
ports:
- "19000:9000"
- "18123:8123"
- 19000:9000
- 18123:8123
fq-connector-go:
image: ghcr.io/ydb-platform/fq-connector-go:v0.0.6-rc.8@sha256:74ebae0530d916c1842a7fddfbddc6c018763a0401f2f627a44e8829692fe41f
ports:
- 50051:50051
network_mode: host
61 changes: 41 additions & 20 deletions ydb/library/yql/providers/generic/connector/tests/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from utils.postgresql import Client
from utils.schema import Schema
from utils.settings import Settings
from utils.sql import format_values_for_bulk_sql_insert

import test_cases.select_missing_database
import test_cases.select_missing_table
Expand All @@ -28,7 +29,7 @@ def prepare_table(
pg_schema: str = None,
):
# create database
with client.get_cursor('postgres') as (conn, cur):
with client.get_cursor("postgres") as (conn, cur):
database_exists_stmt = database.exists(data_source_pb2.POSTGRESQL)
LOGGER.debug(database_exists_stmt)
cur.execute(database_exists_stmt)
Expand Down Expand Up @@ -62,15 +63,19 @@ def prepare_table(
create_schema_stmt = f"CREATE SCHEMA IF NOT EXISTS {pg_schema}"
LOGGER.debug(create_schema_stmt)
cur.execute(create_schema_stmt)
table_name = f'{pg_schema}.{table_name}'
table_name = f"{pg_schema}.{table_name}"

create_table_stmt = f'CREATE TABLE {table_name} ({schema.yql_column_list(data_source_pb2.POSTGRESQL)})'
create_table_stmt = f"CREATE TABLE {table_name} ({schema.yql_column_list(data_source_pb2.POSTGRESQL)})"
LOGGER.debug(create_table_stmt)
cur.execute(create_table_stmt)

insert_stmt = f'INSERT INTO {table_name} ({schema.columns.names_with_commas}) VALUES ({", ".join(["%s"] * len(data_in[0]))})'
values = format_values_for_bulk_sql_insert(data_in)

insert_stmt = f"INSERT INTO {table_name} ({schema.columns.names_with_commas}) VALUES {values}"
# TODO: these logs may be too big when working with big tables,
# dump insert statement via yatest into file.
LOGGER.debug(insert_stmt)
cur.executemany(insert_stmt, data_in)
cur.execute(insert_stmt)

conn.commit()
cur.close()
Expand All @@ -92,17 +97,21 @@ def select_positive(
)

# read data
where_statement = ''
where_statement = ""
if test_case.select_where is not None:
where_statement = f'WHERE {test_case.select_where.filter_expression}'
yql_script = f'''
where_statement = f"WHERE {test_case.select_where.filter_expression}"
yql_script = f"""
{test_case.pragmas_sql_string}
SELECT {test_case.select_what.yql_select_names}
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
{where_statement}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert result.returncode == 0, result.stderr

Expand All @@ -122,12 +131,16 @@ def select_missing_database(
):
# select table from database that does not exist

yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_database_msg(data_source_pb2.POSTGRESQL) in result.stderr, result.stderr

Expand All @@ -140,7 +153,7 @@ def select_missing_table(
test_case: test_cases.select_missing_table.TestCase,
):
# create database but don't create table
with client.get_cursor('postgres') as (conn, cur):
with client.get_cursor("postgres") as (conn, cur):
database_exists_stmt = test_case.database.exists(data_source_pb2.POSTGRESQL)
LOGGER.debug(database_exists_stmt)
cur.execute(database_exists_stmt)
Expand All @@ -155,12 +168,16 @@ def select_missing_table(
cur.close()

# read data
yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_table_msg(data_source_pb2.POSTGRESQL) in result.stderr, result.stderr

Expand All @@ -182,12 +199,16 @@ def select_pg_schema(
)

# read data
yql_script = f'''
yql_script = f"""
SELECT {test_case.select_what.yql_select_names}
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert result.returncode == 0, result.stderr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _pushdown(self) -> TestCase:

return [
TestCase(
name=f'pushdown_{data_source_kind}',
name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}',
data_in=data_in,
data_out_=data_out,
pragmas=dict({'generic.UsePredicatePushdown': 'true'}),
Expand Down Expand Up @@ -412,7 +412,7 @@ def make_test_cases(self) -> Sequence[TestCase]:
for base_tc in base_test_cases:
for protocol in protocols:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.name += f'_{EProtocol.Name(protocol)}'
tc.protocol = protocol
test_cases.append(tc)
return test_cases
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def _large_table(self) -> Sequence[TestCase]:
# TODO: assert connector stats when it will be accessible
'''

table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity
# FIXME: uncomment to debug YQ-2729
# table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity
table_size = self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity / 1000

schema = Schema(
columns=ColumnList(
Expand All @@ -239,7 +241,7 @@ def _large_table(self) -> Sequence[TestCase]:
test_cases = []
for data_source_kind in data_source_kinds:
tc = TestCase(
name=f'large_table_{data_source_kind}',
name=f'large_table',
data_source_kind=data_source_kind,
data_in=data_in,
data_out_=data_in,
Expand Down Expand Up @@ -273,7 +275,7 @@ def make_test_cases(self, data_source_kind: EDataSourceKind) -> Sequence[TestCas
continue
for protocol in protocols[base_tc.data_source_kind]:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.name += f'_{EProtocol.Name(protocol)}'
tc.protocol = protocol
test_cases.append(tc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def _pushdown(self) -> TestCase:

return [
TestCase(
name=f'pushdown_{data_source_kind}',
name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}',
data_in=data_in,
data_out_=data_out_1,
pragmas=dict({'generic.UsePredicatePushdown': 'true'}),
Expand All @@ -440,7 +440,7 @@ def _pushdown(self) -> TestCase:
database=Database.make_for_data_source_kind(data_source_kind),
),
TestCase(
name=f'pushdown_{data_source_kind}',
name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}',
data_in=data_in,
data_out_=data_out_2,
pragmas=dict({'generic.UsePredicatePushdown': 'true'}),
Expand Down Expand Up @@ -469,7 +469,7 @@ def make_test_cases(self) -> Sequence[TestCase]:
for base_tc in base_test_cases:
for protocol in protocols:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.name += f'_{EProtocol.Name(protocol)}'
tc.protocol = protocol
test_cases.append(tc)
return test_cases
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
import subprocess
from typing import Final
import json

import jinja2

Expand Down Expand Up @@ -213,6 +214,7 @@ def run(self, test_dir: Path, script: str, generic_settings: GenericSettings) ->
data_out = None
data_out_with_types = None
schema = None
unique_suffix = test_dir.name

if out.returncode == 0:
# Parse output
Expand All @@ -236,7 +238,6 @@ def run(self, test_dir: Path, script: str, generic_settings: GenericSettings) ->
for line in out.stderr.decode('utf-8').splitlines():
LOGGER.error(line)

unique_suffix = test_dir.name
err_file = yatest.common.output_path(f'dqrun-{unique_suffix}.err')
with open(err_file, "w") as f:
f.write(out.stderr.decode('utf-8'))
Expand Down
Loading