diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 400058a762..8f64e6c493 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -221,24 +221,18 @@ def as_of(self, wallclock_time=None, exclude_until=None): when calling the `insert()` method. # Arguments - wallclock_time: Datetime string. The String should be formatted in one of the + wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`. - exclude_until: Datetime string. The String should be formatted in one of the + exclude_until: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`. # Returns `Query`. The query object with the applied time travel condition. """ - wallclock_timestamp = ( - util.get_timestamp_from_date_string(wallclock_time) - if wallclock_time - else None - ) - exclude_until_timestamp = ( - util.get_timestamp_from_date_string(exclude_until) - if exclude_until - else None - ) + wallclock_timestamp = util.convert_event_time_to_timestamp(wallclock_time) + + exclude_until_timestamp = util.convert_event_time_to_timestamp(exclude_until) + for join in self._joins: join.query.left_feature_group_end_time = wallclock_timestamp join.query.left_feature_group_start_time = exclude_until_timestamp @@ -252,10 +246,10 @@ def pull_changes(self, wallclock_start_time, wallclock_end_time): `pull_changes` method is deprecated. Use `as_of(end_wallclock_time, exclude_until=start_wallclock_time) instead. """ - self.left_feature_group_start_time = util.get_timestamp_from_date_string( + self.left_feature_group_start_time = util.convert_event_time_to_timestamp( wallclock_start_time ) - self.left_feature_group_end_time = util.get_timestamp_from_date_string( + self.left_feature_group_end_time = util.convert_event_time_to_timestamp( wallclock_end_time ) return self diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index b96d774f8f..3f0199d4e2 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -134,11 +134,8 @@ def commit_details(self, feature_group, wallclock_time, limit): "commit_details can only be used on time travel enabled feature groups" ) - wallclock_timestamp = ( - util.get_timestamp_from_date_string(wallclock_time) - if wallclock_time is not None - else None - ) + wallclock_timestamp = util.convert_event_time_to_timestamp(wallclock_time) + feature_group_commits = self._feature_group_api.get_commit_details( feature_group, wallclock_timestamp, limit ) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index d34f090b74..a3852076e5 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -106,8 +106,8 @@ def get_batch_query( return self._feature_view_api.get_batch_query( feature_view_obj.name, feature_view_obj.version, - start_time, - end_time, + util.convert_event_time_to_timestamp(start_time), + util.convert_event_time_to_timestamp(end_time), training_dataset_version=training_dataset_version, is_python_engine=engine.get_type() == "python", with_label=with_label, @@ -119,8 +119,8 @@ def get_batch_query_string( query_obj = self._feature_view_api.get_batch_query( feature_view_obj.name, feature_view_obj.version, - start_time, - end_time, + util.convert_event_time_to_timestamp(start_time), + util.convert_event_time_to_timestamp(end_time), training_dataset_version=training_dataset_version, is_python_engine=engine.get_type() == "python", ) diff --git a/python/hsfs/core/statistics_engine.py b/python/hsfs/core/statistics_engine.py index 743ce90ea3..661ef76100 100644 --- a/python/hsfs/core/statistics_engine.py +++ b/python/hsfs/core/statistics_engine.py @@ -176,7 +176,7 @@ def get( training_dataset_version=None, ): """Get Statistics with the specified commit time of an entity.""" - commit_timestamp = util.get_timestamp_from_date_string(commit_time) + commit_timestamp = util.convert_event_time_to_timestamp(commit_time) return self._statistics_api.get( metadata_instance, commit_timestamp, diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 8117c5f4ee..42e58267e8 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -611,7 +611,7 @@ def _time_series_split( result_df = df[ [ split.start_time - <= self._convert_to_unix_timestamp(t) + <= util.convert_event_time_to_timestamp(t) < split.end_time for t in df[event_time] ] @@ -624,16 +624,6 @@ def _time_series_split( result_dfs[split.name] = result_df return result_dfs - def _convert_to_unix_timestamp(self, t): - if isinstance(t, pd._libs.tslibs.timestamps.Timestamp): - # pandas.timestamp represents millisecond in decimal - return t.timestamp() * 1000 - elif isinstance(t, str): - return util.get_timestamp_from_date_string(t) - else: - # jdbc supports timestamp precision up to second only. - return t * 1000 - def write_training_dataset( self, training_dataset, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index e7d1678ef7..4e01e9914a 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -24,6 +24,8 @@ import avro.schema from typing import Optional, Union, Any, Dict, List, TypeVar +from datetime import datetime, date + from hsfs import util, engine, feature, user, storage_connector as sc from hsfs.core import ( feature_group_engine, @@ -578,13 +580,15 @@ def primary_key(self): def primary_key(self, new_primary_key): self._primary_key = [pk.lower() for pk in new_primary_key] - def get_statistics(self, commit_time: str = None): + def get_statistics(self, commit_time: Union[str, int, datetime, date] = None): """Returns the statistics for this feature group at a specific time. If `commit_time` is `None`, the most recent statistics are returned. # Arguments - commit_time: Commit time in the format `YYYYMMDDhhmmss`, defaults to `None`. + commit_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. Defaults to `None`. Defaults to `None`. # Returns `Statistics`. Statistics object. @@ -772,7 +776,7 @@ def __init__( def read( self, - wallclock_time: Optional[str] = None, + wallclock_time: Optional[Union[str, int, datetime, date]] = None, online: Optional[bool] = False, dataframe_type: Optional[str] = "default", read_options: Optional[dict] = {}, @@ -800,7 +804,9 @@ def read( fg.read("2020-10-20 07:34:11") ``` # Arguments - wallclock_time: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss". + wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. If Specified will retrieve feature group as of specific point in time. If not specified will return as of most recent time. Defaults to `None`. online: bool, optional. If `True` read from online feature store, defaults @@ -844,8 +850,8 @@ def read( def read_changes( self, - start_wallclock_time: str, - end_wallclock_time: str, + start_wallclock_time: Union[str, int, datetime, date], + end_wallclock_time: Union[str, int, datetime, date], read_options: Optional[dict] = {}, ): """Reads updates of this feature that occurred between specified points in time. @@ -865,10 +871,12 @@ def read_changes( ``` # Arguments - start_wallclock_time: Date string in the format of "YYYYMMDD" or - "YYYYMMDDhhmmss". - end_wallclock_time: Date string in the format of "YYYYMMDD" or - "YYYYMMDDhhmmss". + start_wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. + The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, + `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + end_wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. + The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, + `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. read_options: User provided read options. Defaults to `{}`. # Returns @@ -1185,7 +1193,9 @@ def commit_details( # Arguments wallclock_time: Commit details as of specific point in time. Defaults to `None`. - limit: Number of commits to retrieve. Defaults to `None`. + limit: Number of commits to retrieve. Defaults to `None`. datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. + The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, + `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. # Returns `Dict[str, Dict[str, str]]`. Dictionary object of commit metadata timeline, where Key is commit id and value @@ -1214,7 +1224,7 @@ def commit_delete_record( """ self._feature_group_engine.commit_delete(self, delete_df, write_options) - def as_of(self, wallclock_time, exclude_until=None): + def as_of(self, wallclock_time=None, exclude_until=None): """Get Query object to retrieve all features of the group at a point in the past. This method selects all features in the feature group and returns a Query object @@ -1283,9 +1293,9 @@ def as_of(self, wallclock_time, exclude_until=None): when calling the `insert()` method. # Arguments - wallclock_time: Datetime string. The String should be formatted in one of the + wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`. - exclude_until: Datetime string. The String should be formatted in one of the + exclude_until: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`. # Returns @@ -1331,7 +1341,9 @@ def validate( self, dataframe, save_report, validation_options ) - def compute_statistics(self, wallclock_time: Optional[str] = None): + def compute_statistics( + self, wallclock_time: Optional[Union[str, int, datetime, date]] = None + ): """Recompute the statistics for the feature group and save them to the feature store. @@ -1339,8 +1351,9 @@ def compute_statistics(self, wallclock_time: Optional[str] = None): group. # Arguments - wallclock_time: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss". - Only valid if feature group is time travel enabled. If specified will recompute statistics on + wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Defaults to `None`. diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 8db41130bd..260cf28b8c 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -16,7 +16,7 @@ import json import warnings -from datetime import datetime +from datetime import datetime, date from typing import Optional, Union, List, Dict, Any from hsfs.training_dataset_split import TrainingDatasetSplit @@ -164,13 +164,19 @@ def init_batch_scoring( self._batch_scoring_server.init_batch_scoring(self) def get_batch_query( - self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None + self, + start_time: Optional[Union[str, int, datetime, date]] = None, + end_time: Optional[Union[str, int, datetime, date]] = None, ): """Get a query string of batch query. # Arguments - start_time: Optional. Start time of the batch query. - end_time: Optional. End time of the batch query. + start_time: Optional. Start time of the batch query. datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. + The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, + `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + end_time: Optional. End time of the batch query. datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. + The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, + `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. # Returns `str`: batch query @@ -241,10 +247,12 @@ def get_feature_vectors( def get_batch_data(self, start_time=None, end_time=None, read_options=None): """ - start_time: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - end_time: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + start_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be + formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + end_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be + formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. read_options: User provided read options. Defaults to `{}`. """ @@ -274,8 +282,8 @@ def delete_tag(self, name: str): def create_training_data( self, - start_time: Optional[str] = "", - end_time: Optional[str] = "", + start_time: Optional[Union[str, int, datetime, date]] = "", + end_time: Optional[Union[str, int, datetime, date]] = "", storage_connector: Optional[storage_connector.StorageConnector] = None, location: Optional[str] = "", description: Optional[str] = "", @@ -303,10 +311,12 @@ def create_training_data( # Arguments - start_time: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - end_time: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + start_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + end_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset on HopsFS. @@ -377,10 +387,10 @@ def create_training_data( def create_train_test_split( self, test_size: Optional[float] = None, - train_start: Optional[str] = "", - train_end: Optional[str] = "", - test_start: Optional[str] = "", - test_end: Optional[str] = "", + train_start: Optional[Union[str, int, datetime, date]] = "", + train_end: Optional[Union[str, int, datetime, date]] = "", + test_start: Optional[Union[str, int, datetime, date]] = "", + test_end: Optional[Union[str, int, datetime, date]] = "", storage_connector: Optional[storage_connector.StorageConnector] = None, location: Optional[str] = "", description: Optional[str] = "", @@ -409,14 +419,18 @@ def create_train_test_split( # Arguments test_size: size of test set. - train_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - train_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + train_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + train_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following ormats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset on HopsFS. @@ -496,12 +510,12 @@ def create_train_validation_test_split( self, validation_size: Optional[float] = None, test_size: Optional[float] = None, - train_start: Optional[str] = "", - train_end: Optional[str] = "", - validation_start: Optional[str] = "", - validation_end: Optional[str] = "", - test_start: Optional[str] = "", - test_end: Optional[str] = "", + train_start: Optional[Union[str, int, datetime, date]] = "", + train_end: Optional[Union[str, int, datetime, date]] = "", + validation_start: Optional[Union[str, int, datetime, date]] = "", + validation_end: Optional[Union[str, int, datetime, date]] = "", + test_start: Optional[Union[str, int, datetime, date]] = "", + test_end: Optional[Union[str, int, datetime, date]] = "", storage_connector: Optional[storage_connector.StorageConnector] = None, location: Optional[str] = "", description: Optional[str] = "", @@ -531,18 +545,24 @@ def create_train_validation_test_split( # Arguments validation_size: size of validation set. test_size: size of test set. - train_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - train_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - validation_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - validation_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + train_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + train_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + validation_start: tdatatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String + should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + validation_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String + should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset on HopsFS. @@ -657,8 +677,8 @@ def recreate_training_dataset( def training_data( self, - start_time: Optional[str] = None, - end_time: Optional[str] = None, + start_time: Optional[Union[str, int, datetime, date]] = None, + end_time: Optional[Union[str, int, datetime, date]] = None, description: Optional[str] = "", extra_filter: Optional[Union[filter.Filter, filter.Logic]] = None, statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, @@ -672,10 +692,14 @@ def training_data( recreate the training data. # Arguments - start_time: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - end_time: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + start_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following + formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + end_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be + formatted in one of the following + formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -724,10 +748,10 @@ def training_data( def train_test_split( self, test_size: Optional[float] = None, - train_start: Optional[str] = "", - train_end: Optional[str] = "", - test_start: Optional[str] = "", - test_end: Optional[str] = "", + train_start: Optional[Union[str, int, datetime, date]] = "", + train_end: Optional[Union[str, int, datetime, date]] = "", + test_start: Optional[Union[str, int, datetime, date]] = "", + test_end: Optional[Union[str, int, datetime, date]] = "", description: Optional[str] = "", extra_filter: Optional[Union[filter.Filter, filter.Logic]] = None, statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, @@ -742,14 +766,18 @@ def train_test_split( # Arguments test_size: size of test set. Should be between 0 and 1. - train_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - train_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + train_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + train_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -819,12 +847,12 @@ def train_validation_test_split( self, validation_size: Optional[float] = None, test_size: Optional[float] = None, - train_start: Optional[str] = "", - train_end: Optional[str] = "", - validation_start: Optional[str] = "", - validation_end: Optional[str] = "", - test_start: Optional[str] = "", - test_end: Optional[str] = "", + train_start: Optional[Union[str, int, datetime, date]] = "", + train_end: Optional[Union[str, int, datetime, date]] = "", + validation_start: Optional[Union[str, int, datetime, date]] = "", + validation_end: Optional[Union[str, int, datetime, date]] = "", + test_start: Optional[Union[str, int, datetime, date]] = "", + test_end: Optional[Union[str, int, datetime, date]] = "", description: Optional[str] = "", extra_filter: Optional[Union[filter.Filter, filter.Logic]] = None, statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, @@ -840,18 +868,24 @@ def train_validation_test_split( # Arguments validation_size: size of validation set. Should be between 0 and 1. test_size: size of test set. Should be between 0 and 1. - train_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - train_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - validation_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - validation_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_start: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. - test_end: timestamp in second or wallclock_time: Datetime string. The String should be formatted in one of the - following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`. + train_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + train_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + validation_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String + should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + validation_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String + should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_start: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. + test_end: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 9e551811d9..4042f4fc54 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -16,12 +16,13 @@ import json import warnings from typing import Optional, Union, Any, Dict, List, TypeVar -from datetime import timezone import humps import pandas as pd import numpy as np +from datetime import datetime, date + from hsfs import util, engine, training_dataset_feature from hsfs.training_dataset_split import TrainingDatasetSplit from hsfs.statistics_config import StatisticsConfig @@ -125,8 +126,8 @@ def __init__( self._training_dataset_type = None # set up depending on user initialized or coming from backend response if created is None: - self._start_time = self._convert_event_time_to_timestamp(event_start_time) - self._end_time = self._convert_event_time_to_timestamp(event_end_time) + self._start_time = util.convert_event_time_to_timestamp(event_start_time) + self._end_time = util.convert_event_time_to_timestamp(event_end_time) # no type -> user init self._features = features self.storage_connector = storage_connector @@ -193,6 +194,13 @@ def _set_time_splits( test_start=None, test_end=None, ): + train_start = util.convert_event_time_to_timestamp(train_start) + train_end = util.convert_event_time_to_timestamp(train_end) + validation_start = util.convert_event_time_to_timestamp(validation_start) + validation_end = util.convert_event_time_to_timestamp(validation_end) + test_start = util.convert_event_time_to_timestamp(test_start) + test_end = util.convert_event_time_to_timestamp(test_end) + time_splits = list() self._append_time_split( time_splits, @@ -230,25 +238,11 @@ def _append_time_split( TrainingDatasetSplit( name=split_name, split_type=TrainingDatasetSplit.TIME_SERIES_SPLIT, - start_time=self._convert_event_time_to_timestamp(start_time), - end_time=self._convert_event_time_to_timestamp(end_time), + start_time=util.convert_event_time_to_timestamp(start_time), + end_time=util.convert_event_time_to_timestamp(end_time), ) ) - def _convert_event_time_to_timestamp(self, event_time): - if event_time is None: - return None - if isinstance(event_time, str): - if event_time: - return util.get_timestamp_from_date_string(event_time, timezone.utc) - elif isinstance(event_time, int): - if event_time == 0: - raise ValueError("Event time should be greater than 0.") - # jdbc supports timestamp precision up to second only. - return event_time * 1000 - else: - raise ValueError("Given event time should be in `str` or `int` type") - def save( self, features: Union[ @@ -711,13 +705,15 @@ def statistics(self): """Get the latest computed statistics for the training dataset.""" return self._statistics_engine.get_last(self) - def get_statistics(self, commit_time: str = None): + def get_statistics(self, commit_time: Union[str, int, datetime, date] = None): """Returns the statistics for this training dataset at a specific time. If `commit_time` is `None`, the most recent statistics are returned. # Arguments - commit_time: Commit time in the format `YYYYMMDDhhmmss`, defaults to `None`. + commit_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should + be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`, + or `%Y%m%d%H%M%S%f`. Defaults to `None`. # Returns `Statistics`. Object with statistics information. diff --git a/python/hsfs/util.py b/python/hsfs/util.py index 3aacd1f8cb..20e04e3dbe 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -16,9 +16,9 @@ import re import json +import pandas as pd -from datetime import datetime -from datetime import timezone +from datetime import datetime, date, timezone from urllib.parse import urljoin, urlparse from sqlalchemy import create_engine @@ -128,10 +128,11 @@ def check_timestamp_format_from_date_string(input_date): return normalized_date, date_format -def get_timestamp_from_date_string(input_date, time_zone=timezone.utc): +def get_timestamp_from_date_string(input_date): norm_input_date, date_format = check_timestamp_format_from_date_string(input_date) date_time = datetime.strptime(norm_input_date, date_format) - date_time = date_time.replace(tzinfo=time_zone) + if date_time.tzinfo is None: + date_time = date_time.replace(tzinfo=timezone.utc) return int(float(date_time.timestamp()) * 1000) @@ -139,6 +140,42 @@ def get_hudi_datestr_from_timestamp(timestamp): return datetime.fromtimestamp(timestamp / 1000).strftime("%Y%m%d%H%M%S%f")[:-3] +def convert_event_time_to_timestamp(event_time): + if event_time is None: + return None + if isinstance(event_time, str): + return get_timestamp_from_date_string(event_time) + elif isinstance(event_time, pd._libs.tslibs.timestamps.Timestamp): + # convert to unix epoch time in milliseconds. + event_time = event_time.to_pydatetime() + # convert to unix epoch time in milliseconds. + if event_time.tzinfo is None: + event_time = event_time.replace(tzinfo=timezone.utc) + return int(event_time.timestamp() * 1000) + elif isinstance(event_time, datetime): + # convert to unix epoch time in milliseconds. + if event_time.tzinfo is None: + event_time = event_time.replace(tzinfo=timezone.utc) + return int(event_time.timestamp() * 1000) + elif isinstance(event_time, date): + # convert to unix epoch time in milliseconds. + event_time = datetime(*event_time.timetuple()[:7]) + if event_time.tzinfo is None: + event_time = event_time.replace(tzinfo=timezone.utc) + return int(event_time.timestamp() * 1000) + elif isinstance(event_time, int): + if event_time == 0: + raise ValueError("Event time should be greater than 0.") + # jdbc supports timestamp precision up to second only. + if len(str(event_time)) < 13: + event_time = event_time * 1000 + return event_time + else: + raise ValueError( + "Given event time should be in `datetime`, `date`, `str` or `int` type" + ) + + def setup_pydoop(): # Import Pydoop only here, so it doesn't trigger if the execution environment # does not support Pydoop. E.g. Sagemaker diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 165e0eb637..61e2ec9351 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -19,6 +19,7 @@ import numpy as np import pyarrow as pa +from datetime import datetime, date from hsfs import ( storage_connector, feature_group, @@ -26,6 +27,7 @@ feature_view, transformation_function, feature, + util, ) from hsfs.engine import python from hsfs.core import inode, execution @@ -1609,7 +1611,6 @@ def test_random_split_bad_percentage(self, mocker): def test_time_series_split(self, mocker): # Arrange mocker.patch("hsfs.client.get_instance") - mocker.patch("hsfs.engine.python.Engine._convert_to_unix_timestamp") python_engine = python.Engine() @@ -1647,7 +1648,6 @@ def test_time_series_split(self, mocker): def test_time_series_split_drop_event_time(self, mocker): # Arrange mocker.patch("hsfs.client.get_instance") - mocker.patch("hsfs.engine.python.Engine._convert_to_unix_timestamp") python_engine = python.Engine() @@ -1687,14 +1687,10 @@ def test_time_series_split_drop_event_time(self, mocker): def test_time_series_split_event_time(self, mocker): # Arrange mocker.patch("hsfs.client.get_instance") - mocker.patch( - "hsfs.engine.python.Engine._convert_to_unix_timestamp", - side_effect=[1000, 2000, 1000, 2000], - ) python_engine = python.Engine() - d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1, 2]} + d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1000, 2000]} df = pd.DataFrame(data=d) td = training_dataset.TrainingDataset( @@ -1726,11 +1722,10 @@ def test_time_series_split_event_time(self, mocker): assert result[column].equals(expected[column]) def test_convert_to_unix_timestamp_pandas(self): - # Arrange - python_engine = python.Engine() - # Act - result = python_engine._convert_to_unix_timestamp(t=pd.Timestamp("2017-01-01")) + result = util.convert_event_time_to_timestamp( + event_time=pd.Timestamp("2017-01-01") + ) # Assert assert result == 1483228800000.0 @@ -1741,26 +1736,46 @@ def test_convert_to_unix_timestamp_str(self, mocker): "hsfs.util.get_timestamp_from_date_string" ) - python_engine = python.Engine() - mock_util_get_timestamp_from_date_string.return_value = 1483225200000 # Act - result = python_engine._convert_to_unix_timestamp(t="2017-01-01 00-00-00-000") + result = util.convert_event_time_to_timestamp( + event_time="2017-01-01 00-00-00-000" + ) # Assert assert result == 1483225200000 def test_convert_to_unix_timestamp_int(self): - # Arrange - python_engine = python.Engine() - # Act - result = python_engine._convert_to_unix_timestamp(t=1483225200) + result = util.convert_event_time_to_timestamp(event_time=1483225200) # Assert assert result == 1483225200000 + def test_convert_to_unix_timestamp_datetime(self): + # Act + result = util.convert_event_time_to_timestamp(event_time=datetime(2022, 9, 18)) + + # Assert + assert result == 1663459200000 + + def test_convert_to_unix_timestamp_date(self): + # Act + result = util.convert_event_time_to_timestamp(event_time=date(2022, 9, 18)) + + # Assert + assert result == 1663459200000 + + def test_convert_to_unix_timestamp_pandas_datetime(self): + # Act + result = util.convert_event_time_to_timestamp( + event_time=pd.Timestamp("2022-09-18") + ) + + # Assert + assert result == 1663459200000 + def test_write_training_dataset(self, mocker): # Arrange mocker.patch("hsfs.engine.get_type") diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 736e6064b6..06d0290a2c 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -2109,7 +2109,7 @@ def test_time_series_split(self, mocker): test_end=3, ) - d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "event_time": [1, 2]} + d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "event_time": [1000, 2000]} df = pd.DataFrame(data=d) spark_df = spark_engine._spark_session.createDataFrame(df) @@ -2276,7 +2276,7 @@ def test_time_series_split_drop_event_time(self, mocker): test_end=3, ) - d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "event_time": [1, 2]} + d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "event_time": [1000, 2000]} df = pd.DataFrame(data=d) spark_df = spark_engine._spark_session.createDataFrame(df) @@ -2344,7 +2344,6 @@ def plus_one(a) -> int: data_format="CSV", featurestore_id=99, splits={}, - id=10, transformation_functions=transformation_fn_dict, features=features, ) @@ -2399,7 +2398,6 @@ def plus_one(a) -> int: data_format="CSV", featurestore_id=99, splits={}, - id=10, transformation_functions=transformation_fn_dict, features=features, )