Skip to content

Commit

Permalink
feature: Add Mariadb offline store
Browse files Browse the repository at this point in the history
Added MariaDB template

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
  • Loading branch information
tmihalac committed May 3, 2024
1 parent 07bf02d commit 226e965
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def pull_latest_from_table_or_query(
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression} inner_t
WHERE {timestamp_field} BETWEEN CONVERT(DATETIMEOFFSET, '{start_date}', 120) AND CONVERT(DATETIMEOFFSET, '{end_date}', 120)
WHERE {timestamp_field} BETWEEN CONVERT('{start_date}', DATETIME) AND CONVERT('{end_date}', DATETIME)
) outer_t
WHERE outer_t._feast_row = 1
"""
Expand Down Expand Up @@ -217,7 +217,7 @@ def get_historical_features(
engine=engine,
config=config.offline_store,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(project),
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(feature_refs, project, registry),
)
return job

Expand All @@ -232,13 +232,34 @@ def write_logged_features(
raise NotImplementedError()

@staticmethod
@log_exceptions_and_usage(offline_store="mariadb")
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
raise NotImplementedError()
assert isinstance(config.offline_store, MariaDBOfflineStoreConfig)
assert type(feature_view.batch_source).__name__ == "MariaDBSource"

(
pa_schema,
column_names,
) = offline_utils.get_pyarrow_schema_from_batch_source(
config, feature_view.batch_source
)
if column_names != table.column_names:
raise ValueError(
f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. "
f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}."
)

if table.schema != pa_schema:
table = table.cast(pa_schema)

engine = make_engine(config.offline_store)

df_to_mariadb_table(table.to_pandas(), feature_view.batch_source.name, engine)


def _assert_expected_columns_in_dataframe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def get_historical_features(
engine=engine,
config=config.offline_store,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(project),
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(feature_refs, project, registry),
)
return job

Expand Down

0 comments on commit 226e965

Please sign in to comment.