Skip to content

Commit

Permalink
add negative tests
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jan 17, 2025
1 parent 6d9c31b commit ea34d15
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 53 deletions.
11 changes: 9 additions & 2 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ibis.formats
import pandas as pd
import sqlglot.expressions as sge
from duckdb import HTTPException
from google.cloud import bigquery
from google.oauth2 import service_account
from ibis import BaseBackend
Expand Down Expand Up @@ -163,10 +164,16 @@ def __init__(self, connection_info: ConnectionInfo):
init_duckdb_s3(self.connection, connection_info)

def query(self, sql: str, limit: int) -> pd.DataFrame:
return self.connection.execute(sql).fetch_df().head(limit)
try:
return self.connection.execute(sql).fetch_df().head(limit)
except HTTPException as e:
raise UnprocessableEntityError(f"Failed to execute query: {e!s}")

def dry_run(self, sql: str) -> None:
self.connection.execute(sql)
try:
self.connection.execute(sql)
except HTTPException as e:
raise QueryDryRunError(f"Failed to execute query: {e!s}")


@cache
Expand Down
101 changes: 54 additions & 47 deletions ibis-server/app/model/metadata/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import opendal
from loguru import logger

from app.model import LocalFileConnectionInfo, S3FileConnectionInfo
from app.model import (
LocalFileConnectionInfo,
S3FileConnectionInfo,
UnprocessableEntityError,
)
from app.model.metadata.dto import (
Column,
RustWrenEngineColumnType,
Expand All @@ -23,54 +27,57 @@ def get_table_list(self) -> list[Table]:
op = self._get_dal_operator()
conn = self._get_connection()
unique_tables = {}
for file in op.list("/"):
if file.path != "/":
stat = op.stat(file.path)
if stat.mode.is_dir():
# if the file is a directory, use the directory name as the table name
table_name = os.path.basename(os.path.normpath(file.path))
full_path = f"{self.connection_info.url.get_secret_value()}/{table_name}/*.{self.connection_info.format}"
else:
# if the file is a file, use the file name as the table name
table_name = os.path.splitext(os.path.basename(file.path))[0]
full_path = (
f"{self.connection_info.url.get_secret_value()}/{file.path}"
)
try:
for file in op.list("/"):
if file.path != "/":
stat = op.stat(file.path)
if stat.mode.is_dir():
# if the file is a directory, use the directory name as the table name
table_name = os.path.basename(os.path.normpath(file.path))
full_path = f"{self.connection_info.url.get_secret_value()}/{table_name}/*.{self.connection_info.format}"
else:
# if the file is a file, use the file name as the table name
table_name = os.path.splitext(os.path.basename(file.path))[0]
full_path = (
f"{self.connection_info.url.get_secret_value()}/{file.path}"
)

# add required prefix for object storage
full_path = self._get_full_path(full_path)
# read the file with the target format if unreadable, skip the file
df = self._read_df(conn, full_path)
if df is None:
continue
columns = []
try:
for col in df.columns:
duckdb_type = df[col].dtypes[0]
columns.append(
Column(
name=col,
type=self._to_column_type(duckdb_type.__str__()),
notNull=False,
# add required prefix for object storage
full_path = self._get_full_path(full_path)
# read the file with the target format if unreadable, skip the file
df = self._read_df(conn, full_path)
if df is None:
continue
columns = []
try:
for col in df.columns:
duckdb_type = df[col].dtypes[0]
columns.append(
Column(
name=col,
type=self._to_column_type(duckdb_type.__str__()),
notNull=False,
)
)
)
except Exception as e:
logger.debug(f"Failed to read column types: {e}")
continue

unique_tables[table_name] = Table(
name=table_name,
description=None,
columns=[],
properties=TableProperties(
table=table_name,
schema=None,
catalog=None,
path=full_path,
),
primaryKey=None,
)
unique_tables[table_name].columns = columns
except Exception as e:
logger.debug(f"Failed to read column types: {e}")
continue

unique_tables[table_name] = Table(
name=table_name,
description=None,
columns=[],
properties=TableProperties(
table=table_name,
schema=None,
catalog=None,
path=full_path,
),
primaryKey=None,
)
unique_tables[table_name].columns = columns
except Exception as e:
raise UnprocessableEntityError(f"Failed to list files: {e!s}")

return list(unique_tables.values())

Expand Down
11 changes: 7 additions & 4 deletions ibis-server/app/model/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from duckdb import DuckDBPyConnection
from duckdb import DuckDBPyConnection, HTTPException

from app.model import S3FileConnectionInfo

Expand All @@ -14,6 +14,9 @@ def init_duckdb_s3(
REGION '{connection_info.region.get_secret_value()}'
)
"""
result = connection.execute(create_secret).fetchone()
if result is None or not result[0]:
raise Exception("Failed to create secret")
try:
result = connection.execute(create_secret).fetchone()
if result is None or not result[0]:
raise Exception("Failed to create secret")
except HTTPException as e:
raise Exception("Failed to create secret", e)
34 changes: 34 additions & 0 deletions ibis-server/tests/routers/v2/connector/test_s3_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,40 @@ async def test_dry_run(client, manifest_str, connection_info):
assert response.text is not None


async def test_query_with_invalid_connection_info(client, manifest_str):
response = await client.post(
f"{base_url}/query",
json={
"manifestStr": manifest_str,
"sql": 'SELECT * FROM "Orders" LIMIT 1',
"connectionInfo": {
"url": "/tpch/data",
"format": "parquet",
"bucket": bucket,
"region": region,
"access_key": "invalid",
"secret_key": "invalid",
},
},
)
assert response.status_code == 422

response = await client.post(
url=f"{base_url}/metadata/tables",
json={
"connectionInfo": {
"url": "/tpch/data",
"format": "parquet",
"bucket": bucket,
"region": region,
"access_key": "invalid",
"secret_key": "invalid",
},
},
)
assert response.status_code == 422


async def test_metadata_list_tables(client, connection_info):
response = await client.post(
url=f"{base_url}/metadata/tables",
Expand Down

0 comments on commit ea34d15

Please sign in to comment.