Skip to content

Commit

Permalink
refactor is_time_travel
Browse files Browse the repository at this point in the history
  • Loading branch information
kennethmhc committed Oct 6, 2022
1 parent 6f70eb3 commit 3f2abd5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
16 changes: 13 additions & 3 deletions java/src/main/java/com/logicalclocks/hsfs/constructor/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public class Query {
@Getter
@Setter
private Boolean hiveEngine = false;
@Getter
private boolean isTimeTravel = false;

private QueryConstructorApi queryConstructorApi = new QueryConstructorApi();
private StorageConnectorApi storageConnectorApi = new StorageConnectorApi();
Expand Down Expand Up @@ -196,7 +194,6 @@ public Query asOf(String wallclockTime) throws FeatureStoreException, ParseExcep
* @throws ParseException
*/
public Query asOf(String wallclockTime, String excludeUntil) throws FeatureStoreException, ParseException {
isTimeTravel = true;
Long wallclockTimestamp = utils.getTimeStampFromDateString(wallclockTime);
Long excludeUntilTimestamp = null;
if (excludeUntil != null) {
Expand Down Expand Up @@ -306,4 +303,17 @@ public Query appendFeature(Feature feature) {
this.leftFeatures.add(feature);
return this;
}

public boolean isTimeTravel() {
if (leftFeatureGroupStartTime != null || leftFeatureGroupEndTime != null) {
return true;
}
for (Join join: joins) {
if (join.getQuery().leftFeatureGroupStartTime != null
|| join.getQuery().leftFeatureGroupEndTime != null) {
return true;
}
}
return false;
}
}
20 changes: 14 additions & 6 deletions python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(
self._storage_connector_api = storage_connector_api.StorageConnectorApi(
feature_store_id
)
self._is_time_travel = False

def _prep_read(self, online, read_options):
query = self._query_constructor_api.construct_query(self)
Expand Down Expand Up @@ -238,14 +237,13 @@ def as_of(self, wallclock_time=None, exclude_until=None):
# Returns
`Query`. The query object with the applied time travel condition.
"""
self._is_time_travel = True
wallclock_timestamp = util.convert_event_time_to_timestamp(wallclock_time)

exclude_until_timestamp = util.convert_event_time_to_timestamp(exclude_until)

for join in self._joins:
join.query.left_feature_group_end_time = wallclock_timestamp
join.query.left_feature_group_start_time = exclude_until_timestamp
for _join in self._joins:
_join.query.left_feature_group_end_time = wallclock_timestamp
_join.query.left_feature_group_start_time = exclude_until_timestamp
self.left_feature_group_end_time = wallclock_timestamp
self.left_feature_group_start_time = exclude_until_timestamp
return self
Expand Down Expand Up @@ -410,4 +408,14 @@ def append_feature(self, feature):
self._left_features.append(feature)

def is_time_travel(self):
return self._is_time_travel
return any(
self.left_feature_group_start_time
or self.left_feature_group_end_time
or any(
[
_join.query.left_feature_group_start_time
or _join.query.left_feature_group_end_time
for _join in self._joins
]
)
)

0 comments on commit 3f2abd5

Please sign in to comment.