Skip to content

Commit

Permalink
[HOPSWORKS-3326] Support datetime in hsfs python API (logicalclocks#783)
Browse files Browse the repository at this point in the history
* remove unused import

* init

* black has been run

* add datetime object

* add datetime object

* check timestamp length

* address comments

* allow timestamp=None for feature group as of (logicalclocks#7)

* fix tests

* remove redundant timestamp conversions

* address comments

* updated docstrings (logicalclocks#8)

Co-authored-by: Ralf <bubriks@gmail.com>
Co-authored-by: Till Döhmen <till.doehmen@web.de>
  • Loading branch information
3 people authored Sep 22, 2022
1 parent 86c2ce5 commit ac21e4d
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 179 deletions.
22 changes: 8 additions & 14 deletions python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,24 +221,18 @@ def as_of(self, wallclock_time=None, exclude_until=None):
when calling the `insert()` method.
# Arguments
wallclock_time: Datetime string. The String should be formatted in one of the
wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the
following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`.
exclude_until: Datetime string. The String should be formatted in one of the
exclude_until: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the
following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`.
# Returns
`Query`. The query object with the applied time travel condition.
"""
wallclock_timestamp = (
util.get_timestamp_from_date_string(wallclock_time)
if wallclock_time
else None
)
exclude_until_timestamp = (
util.get_timestamp_from_date_string(exclude_until)
if exclude_until
else None
)
wallclock_timestamp = util.convert_event_time_to_timestamp(wallclock_time)

exclude_until_timestamp = util.convert_event_time_to_timestamp(exclude_until)

for join in self._joins:
join.query.left_feature_group_end_time = wallclock_timestamp
join.query.left_feature_group_start_time = exclude_until_timestamp
Expand All @@ -252,10 +246,10 @@ def pull_changes(self, wallclock_start_time, wallclock_end_time):
`pull_changes` method is deprecated. Use
`as_of(end_wallclock_time, exclude_until=start_wallclock_time) instead.
"""
self.left_feature_group_start_time = util.get_timestamp_from_date_string(
self.left_feature_group_start_time = util.convert_event_time_to_timestamp(
wallclock_start_time
)
self.left_feature_group_end_time = util.get_timestamp_from_date_string(
self.left_feature_group_end_time = util.convert_event_time_to_timestamp(
wallclock_end_time
)
return self
Expand Down
7 changes: 2 additions & 5 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,8 @@ def commit_details(self, feature_group, wallclock_time, limit):
"commit_details can only be used on time travel enabled feature groups"
)

wallclock_timestamp = (
util.get_timestamp_from_date_string(wallclock_time)
if wallclock_time is not None
else None
)
wallclock_timestamp = util.convert_event_time_to_timestamp(wallclock_time)

feature_group_commits = self._feature_group_api.get_commit_details(
feature_group, wallclock_timestamp, limit
)
Expand Down
8 changes: 4 additions & 4 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def get_batch_query(
return self._feature_view_api.get_batch_query(
feature_view_obj.name,
feature_view_obj.version,
start_time,
end_time,
util.convert_event_time_to_timestamp(start_time),
util.convert_event_time_to_timestamp(end_time),
training_dataset_version=training_dataset_version,
is_python_engine=engine.get_type() == "python",
with_label=with_label,
Expand All @@ -119,8 +119,8 @@ def get_batch_query_string(
query_obj = self._feature_view_api.get_batch_query(
feature_view_obj.name,
feature_view_obj.version,
start_time,
end_time,
util.convert_event_time_to_timestamp(start_time),
util.convert_event_time_to_timestamp(end_time),
training_dataset_version=training_dataset_version,
is_python_engine=engine.get_type() == "python",
)
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/core/statistics_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def get(
training_dataset_version=None,
):
"""Get Statistics with the specified commit time of an entity."""
commit_timestamp = util.get_timestamp_from_date_string(commit_time)
commit_timestamp = util.convert_event_time_to_timestamp(commit_time)
return self._statistics_api.get(
metadata_instance,
commit_timestamp,
Expand Down
12 changes: 1 addition & 11 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def _time_series_split(
result_df = df[
[
split.start_time
<= self._convert_to_unix_timestamp(t)
<= util.convert_event_time_to_timestamp(t)
< split.end_time
for t in df[event_time]
]
Expand All @@ -624,16 +624,6 @@ def _time_series_split(
result_dfs[split.name] = result_df
return result_dfs

def _convert_to_unix_timestamp(self, t):
if isinstance(t, pd._libs.tslibs.timestamps.Timestamp):
# pandas.timestamp represents millisecond in decimal
return t.timestamp() * 1000
elif isinstance(t, str):
return util.get_timestamp_from_date_string(t)
else:
# jdbc supports timestamp precision up to second only.
return t * 1000

def write_training_dataset(
self,
training_dataset,
Expand Down
47 changes: 30 additions & 17 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import avro.schema
from typing import Optional, Union, Any, Dict, List, TypeVar

from datetime import datetime, date

from hsfs import util, engine, feature, user, storage_connector as sc
from hsfs.core import (
feature_group_engine,
Expand Down Expand Up @@ -578,13 +580,15 @@ def primary_key(self):
def primary_key(self, new_primary_key):
self._primary_key = [pk.lower() for pk in new_primary_key]

def get_statistics(self, commit_time: str = None):
def get_statistics(self, commit_time: Union[str, int, datetime, date] = None):
"""Returns the statistics for this feature group at a specific time.
If `commit_time` is `None`, the most recent statistics are returned.
# Arguments
commit_time: Commit time in the format `YYYYMMDDhhmmss`, defaults to `None`.
commit_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should
be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`,
or `%Y%m%d%H%M%S%f`. Defaults to `None`. Defaults to `None`.
# Returns
`Statistics`. Statistics object.
Expand Down Expand Up @@ -772,7 +776,7 @@ def __init__(

def read(
self,
wallclock_time: Optional[str] = None,
wallclock_time: Optional[Union[str, int, datetime, date]] = None,
online: Optional[bool] = False,
dataframe_type: Optional[str] = "default",
read_options: Optional[dict] = {},
Expand Down Expand Up @@ -800,7 +804,9 @@ def read(
fg.read("2020-10-20 07:34:11")
```
# Arguments
wallclock_time: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss".
wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should
be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`,
or `%Y%m%d%H%M%S%f`.
If Specified will retrieve feature group as of specific point in time.
If not specified will return as of most recent time. Defaults to `None`.
online: bool, optional. If `True` read from online feature store, defaults
Expand Down Expand Up @@ -844,8 +850,8 @@ def read(

def read_changes(
self,
start_wallclock_time: str,
end_wallclock_time: str,
start_wallclock_time: Union[str, int, datetime, date],
end_wallclock_time: Union[str, int, datetime, date],
read_options: Optional[dict] = {},
):
"""Reads updates of this feature that occurred between specified points in time.
Expand All @@ -865,10 +871,12 @@ def read_changes(
```
# Arguments
start_wallclock_time: Date string in the format of "YYYYMMDD" or
"YYYYMMDDhhmmss".
end_wallclock_time: Date string in the format of "YYYYMMDD" or
"YYYYMMDDhhmmss".
start_wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string.
The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`,
`%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`.
end_wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string.
The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`,
`%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`.
read_options: User provided read options. Defaults to `{}`.
# Returns
Expand Down Expand Up @@ -1185,7 +1193,9 @@ def commit_details(
# Arguments
wallclock_time: Commit details as of specific point in time. Defaults to `None`.
limit: Number of commits to retrieve. Defaults to `None`.
limit: Number of commits to retrieve. Defaults to `None`. datatime.datetime, datetime.date, unix timestamp in seconds (int), or string.
The String should be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`,
`%Y%m%d%H%M%S`, or `%Y%m%d%H%M%S%f`.
# Returns
`Dict[str, Dict[str, str]]`. Dictionary object of commit metadata timeline, where Key is commit id and value
Expand Down Expand Up @@ -1214,7 +1224,7 @@ def commit_delete_record(
"""
self._feature_group_engine.commit_delete(self, delete_df, write_options)

def as_of(self, wallclock_time, exclude_until=None):
def as_of(self, wallclock_time=None, exclude_until=None):
"""Get Query object to retrieve all features of the group at a point in the past.
This method selects all features in the feature group and returns a Query object
Expand Down Expand Up @@ -1283,9 +1293,9 @@ def as_of(self, wallclock_time, exclude_until=None):
when calling the `insert()` method.
# Arguments
wallclock_time: Datetime string. The String should be formatted in one of the
wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the
following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`.
exclude_until: Datetime string. The String should be formatted in one of the
exclude_until: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the
following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, or `%Y%m%d%H%M%S`.
# Returns
Expand Down Expand Up @@ -1331,16 +1341,19 @@ def validate(
self, dataframe, save_report, validation_options
)

def compute_statistics(self, wallclock_time: Optional[str] = None):
def compute_statistics(
self, wallclock_time: Optional[Union[str, int, datetime, date]] = None
):
"""Recompute the statistics for the feature group and save them to the
feature store.
Statistics are only computed for data in the offline storage of the feature
group.
# Arguments
wallclock_time: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss".
Only valid if feature group is time travel enabled. If specified will recompute statistics on
wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should
be formatted in one of the following formats `%Y%m%d`, `%Y%m%d%H`, `%Y%m%d%H%M`, `%Y%m%d%H%M%S`,
or `%Y%m%d%H%M%S%f`. If specified will recompute statistics on
feature group as of specific point in time. If not specified then will compute statistics
as of most recent time of this feature group. Defaults to `None`.
Expand Down
Loading

0 comments on commit ac21e4d

Please sign in to comment.