Skip to content

Commit

Permalink
Use utc timestamp for start time
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng committed Jan 8, 2021
1 parent b4f1e21 commit fcdb476
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 4 deletions.
3 changes: 2 additions & 1 deletion sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Dict, List, NamedTuple, Optional
from urllib.parse import urlparse, urlunparse

import pytz
import yaml

from feast.pyspark.abc import BQ_SPARK_PACKAGE
Expand Down Expand Up @@ -254,7 +255,7 @@ def _get_job_creation_time(emr_client, job: EmrJobRef) -> datetime:
def _get_step_creation_time(emr_client, cluster_id: str, step_id: str) -> datetime:
response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id)
step_creation_time = response["Step"]["Status"]["Timeline"]["CreationDateTime"]
return step_creation_time
return step_creation_time.astimezone(pytz.utc).replace(tzinfo=None)


def _wait_for_step_state(
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus:

def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo:
labels = resource["metadata"]["labels"]
start_time = resource["metadata"].get("creationTimestamp")
start_time = datetime.strptime(
resource["metadata"].get("creationTimestamp"), "%Y-%m-%dT%H:%M:%SZ"
)
sparkConf = resource["spec"].get("sparkConf", {})

if "status" in resource:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(
self._job_name = job_name
self._process = process
self._ui_port = ui_port
self._start_time = datetime.now()
self._start_time = datetime.utcnow()

def get_id(self) -> str:
return self._job_id
Expand Down
3 changes: 2 additions & 1 deletion tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ def test_historical_features(

feature_refs = ["transactions:daily_transactions"]

job_submission_time = datetime.now()
job_submission_time = datetime.utcnow()
job = feast_client.get_historical_features(feature_refs, customers_df)
assert job.get_start_time() >= job_submission_time
assert job.get_start_time() <= job_submission_time + timedelta(hours=1)

output_dir = job.get_output_file_uri()
joined_df = read_parquet(output_dir)
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/test_online_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def test_streaming_ingestion(
feast_client.apply(feature_table)

job = feast_client.start_stream_to_online_ingestion(feature_table)
assert job.get_feature_table() == feature_table.name

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
Expand Down Expand Up @@ -165,6 +166,7 @@ def ingest_and_verify(
original.event_timestamp.min().to_pydatetime(),
original.event_timestamp.max().to_pydatetime() + timedelta(seconds=1),
)
assert job.get_feature_table() == feature_table.name

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180
Expand Down

0 comments on commit fcdb476

Please sign in to comment.