diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py index 5b72b7ac45..0b8c2b2c6d 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List, NamedTuple, Optional from urllib.parse import urlparse, urlunparse +import pytz import yaml from feast.pyspark.abc import BQ_SPARK_PACKAGE @@ -254,7 +255,7 @@ def _get_job_creation_time(emr_client, job: EmrJobRef) -> datetime: def _get_step_creation_time(emr_client, cluster_id: str, step_id: str) -> datetime: response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id) step_creation_time = response["Step"]["Status"]["Timeline"]["CreationDateTime"] - return step_creation_time + return step_creation_time.astimezone(pytz.utc).replace(tzinfo=None) def _wait_for_step_state( diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py index 241c8d0f43..9630d08266 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py @@ -193,7 +193,9 @@ def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus: def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo: labels = resource["metadata"]["labels"] - start_time = resource["metadata"].get("creationTimestamp") + start_time = datetime.strptime( + resource["metadata"].get("creationTimestamp"), "%Y-%m-%dT%H:%M:%SZ" + ) sparkConf = resource["spec"].get("sparkConf", {}) if "status" in resource: diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index d9e625ecf5..e67dd75630 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -102,7 +102,7 @@ def __init__( self._job_name = job_name self._process = process self._ui_port = ui_port - self._start_time = datetime.now() + self._start_time = datetime.utcnow() def get_id(self) -> str: return self._job_id diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py index 19f2d3dd33..ed862b9c91 100644 --- a/tests/e2e/test_historical_features.py +++ b/tests/e2e/test_historical_features.py @@ -106,9 +106,10 @@ def test_historical_features( feature_refs = ["transactions:daily_transactions"] - job_submission_time = datetime.now() + job_submission_time = datetime.utcnow() job = feast_client.get_historical_features(feature_refs, customers_df) assert job.get_start_time() >= job_submission_time + assert job.get_start_time() <= job_submission_time + timedelta(hours=1) output_dir = job.get_output_file_uri() joined_df = read_parquet(output_dir) diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py index beee4e62ab..8cda80ca89 100644 --- a/tests/e2e/test_online_features.py +++ b/tests/e2e/test_online_features.py @@ -125,6 +125,7 @@ def test_streaming_ingestion( feast_client.apply(feature_table) job = feast_client.start_stream_to_online_ingestion(feature_table) + assert job.get_feature_table() == feature_table.name wait_retry_backoff( lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120 @@ -165,6 +166,7 @@ def ingest_and_verify( original.event_timestamp.min().to_pydatetime(), original.event_timestamp.max().to_pydatetime() + timedelta(seconds=1), ) + assert job.get_feature_table() == feature_table.name wait_retry_backoff( lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180