diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 949795f9a4..259a3af7d9 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -324,7 +324,9 @@ def offline_write_batch( f"feature view batch source is {type(feature_view.batch_source)} not bigquery source" ) - pa_schema, column_names = offline_utils.get_pyarrow_schema(config, feature_view) + pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source( + config, feature_view.batch_source + ) if column_names != table.column_names: raise ValueError( f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. " diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 1fc37b0a8a..75968146de 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -27,7 +27,7 @@ ) from feast.infra.offline_stores.offline_utils import ( DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, - get_pyarrow_schema, + get_pyarrow_schema_from_batch_source, ) from feast.infra.provider import ( _get_requested_feature_views_to_features_dict, @@ -425,7 +425,9 @@ def offline_write_batch( f"feature view batch source is {type(feature_view.batch_source)} not file source" ) - pa_schema, column_names = get_pyarrow_schema(config, feature_view) + pa_schema, column_names = get_pyarrow_schema_from_batch_source( + config, feature_view.batch_source + ) if column_names != table.column_names: raise ValueError( f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. " diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 917729f748..abe8d4e4e5 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -9,6 +9,7 @@ from jinja2 import BaseLoader, Environment from pandas import Timestamp +from feast.data_source import DataSource from feast.errors import ( EntityTimestampInferenceException, FeastEntityDFMissingColumnsError, @@ -222,13 +223,11 @@ def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore: return offline_store_class() -def get_pyarrow_schema( - config: RepoConfig, feature_view: FeatureView +def get_pyarrow_schema_from_batch_source( + config: RepoConfig, batch_source: DataSource ) -> Tuple[pa.Schema, List[str]]: - """Returns the pyarrow schema and column names for the specified feature view's batch source.""" - column_names_and_types = feature_view.batch_source.get_table_column_names_and_types( - config - ) + """Returns the pyarrow schema and column names for the given batch source.""" + column_names_and_types = batch_source.get_table_column_names_and_types(config) pa_schema = [] column_names = [] @@ -237,9 +236,7 @@ def get_pyarrow_schema( ( column_name, feast_value_type_to_pa( - feature_view.batch_source.source_datatype_to_feast_value_type()( - column_type - ) + batch_source.source_datatype_to_feast_value_type()(column_type) ), ) ) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 6151705e14..8667989268 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -318,7 +318,9 @@ def offline_write_batch( f"feature view batch source is {type(feature_view.batch_source)} not redshift source" ) - pa_schema, column_names = offline_utils.get_pyarrow_schema(config, feature_view) + pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source( + config, feature_view.batch_source + ) if column_names != table.column_names: raise ValueError( f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. " diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index f86fc8290f..ec06d8dce1 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -327,7 +327,9 @@ def offline_write_batch( f"feature view batch source is {type(feature_view.batch_source)} not snowflake source" ) - pa_schema, column_names = offline_utils.get_pyarrow_schema(config, feature_view) + pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source( + config, feature_view.batch_source + ) if column_names != table.column_names: raise ValueError( f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. "