From b7de4b2c0fb6fd6e2519b3bb8f9ed976bf499c71 Mon Sep 17 00:00:00 2001 From: Job Almekinders Date: Fri, 21 Jun 2024 13:47:45 +0200 Subject: [PATCH] Lint: Raise exceptions if cursor returned no columns or rows Add log statement Lint: Fix _to_arrow_internal Lint: Fix _get_entity_df_event_timestamp_range Update exception Use ZeroColumnQueryResult Signed-off-by: Job Almekinders --- sdk/python/feast/errors.py | 10 ++++++++ .../postgres_offline_store/postgres.py | 24 ++++++++++++------- .../postgres_offline_store/postgres_source.py | 9 +++++-- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 22de402f20..3970382dbb 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -401,3 +401,13 @@ def __init__(self, input_dict: dict): super().__init__( f"Failed to serialize the provided dictionary into a pandas DataFrame: {input_dict.keys()}" ) + + +class ZeroRowsQueryResult(Exception): + def __init__(self, query: str): + super().__init__(f"This query returned zero rows:\n{query}") + + +class ZeroColumnQueryResult(Exception): + def __init__(self, query: str): + super().__init__(f"This query returned zero columns:\n{query}") diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 6a8686d282..caaafb6d29 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -1,4 +1,5 @@ import contextlib +import logging from dataclasses import asdict from datetime import datetime from typing import ( @@ -23,7 +24,7 @@ from pytz import utc from feast.data_source import DataSource -from feast.errors import InvalidEntityType +from feast.errors import InvalidEntityType, ZeroColumnQueryResult, ZeroRowsQueryResult from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import ( @@ -276,6 +277,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur: conn.read_only = True cur.execute(query) + if not cur.description: + raise ZeroColumnQueryResult(query) fields = [ (c.name, pg_type_code_to_arrow(c.type_code)) for c in cur.description @@ -331,16 +334,19 @@ def _get_entity_df_event_timestamp_range( entity_df_event_timestamp.max().to_pydatetime(), ) elif isinstance(entity_df, str): - # If the entity_df is a string (SQL query), determine range - # from table + # If the entity_df is a string (SQL query), determine range from table with _get_conn(config.offline_store) as conn, conn.cursor() as cur: - ( - cur.execute( - f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM ({entity_df}) as tmp_alias" - ), - ) + query = f""" + SELECT + MIN({entity_df_event_timestamp_col}) AS min, + MAX({entity_df_event_timestamp_col}) AS max + FROM ({entity_df}) AS tmp_alias + """ + cur.execute(query) res = cur.fetchone() - entity_df_event_timestamp_range = (res[0], res[1]) + if not res: + raise ZeroRowsQueryResult(query) + entity_df_event_timestamp_range = (res[0], res[1]) else: raise InvalidEntityType(type(entity_df)) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py index bbb3f768fd..9805c65d50 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py @@ -1,10 +1,11 @@ import json +import logging from typing import Callable, Dict, Iterable, Optional, Tuple from typeguard import typechecked from feast.data_source import DataSource -from feast.errors import DataSourceNoNameException +from feast.errors import DataSourceNoNameException, ZeroColumnQueryResult from feast.infra.utils.postgres.connection_utils import _get_conn from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.SavedDataset_pb2 import ( @@ -111,7 +112,11 @@ def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: with _get_conn(config.offline_store) as conn, conn.cursor() as cur: - cur.execute(f"SELECT * FROM {self.get_table_query_string()} AS sub LIMIT 0") + query = f"SELECT * FROM {self.get_table_query_string()} AS sub LIMIT 0" + cur.execute(query) + if not cur.description: + raise ZeroColumnQueryResult(query) + return ( (c.name, pg_type_code_to_pg_type(c.type_code)) for c in cur.description )