diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto index d0ae6ac05f..baeae3ee88 100644 --- a/protos/feast/core/JobService.proto +++ b/protos/feast/core/JobService.proto @@ -73,24 +73,31 @@ message Job { // Current job status JobStatus status = 3; // Deterministic hash of the Job - string hash = 8; + string hash = 4; + // Start time of the Job + google.protobuf.Timestamp start_time = 5; message RetrievalJobMeta { - string output_location = 4; + string output_location = 1; } message OfflineToOnlineMeta { + string table_name = 1; } message StreamToOnlineMeta { + string table_name = 1; } // JobType specific metadata on the job oneof meta { - RetrievalJobMeta retrieval = 5; - OfflineToOnlineMeta batch_ingestion = 6; - StreamToOnlineMeta stream_ingestion = 7; + RetrievalJobMeta retrieval = 6; + OfflineToOnlineMeta batch_ingestion = 7; + StreamToOnlineMeta stream_ingestion = 8; } + + // Path to Spark job logs, if available + string log_uri = 9; } // Ingest data from offline store into online store @@ -107,8 +114,17 @@ message StartOfflineToOnlineIngestionJobRequest { } message StartOfflineToOnlineIngestionJobResponse { - // Job ID assigned by Feast - string id = 1; + // Job ID assigned by Feast + string id = 1; + + // Job start time + google.protobuf.Timestamp job_start_time = 2; + + // Feature table associated with the job + string table_name = 3; + + // Path to Spark job logs, if available + string log_uri = 4; } message GetHistoricalFeaturesRequest { @@ -136,9 +152,18 @@ message GetHistoricalFeaturesRequest { } message GetHistoricalFeaturesResponse { - // Export Job with ID assigned by Feast - string id = 1; - string output_file_uri = 2; + // Export Job with ID assigned by Feast + string id = 1; + + // Uri to the join result output file + string output_file_uri = 2; + + // Job start time + google.protobuf.Timestamp job_start_time = 3; + + // Path to Spark job logs, if available + string log_uri = 4; + } message StartStreamToOnlineIngestionJobRequest { @@ -148,8 +173,17 @@ message StartStreamToOnlineIngestionJobRequest { } message StartStreamToOnlineIngestionJobResponse { - // Job ID assigned by Feast - string id = 1; + // Job ID assigned by Feast + string id = 1; + + // Job start time + google.protobuf.Timestamp job_start_time = 2; + + // Feature table associated with the job + string table_name = 3; + + // Path to Spark job logs, if available + string log_uri = 4; } message ListJobsRequest { diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index fbe960afd7..9b7dedf9c2 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -1075,6 +1075,8 @@ def get_historical_features( self._extra_grpc_params, response.id, output_file_uri=response.output_file_uri, + start_time=response.job_start_time.ToDatetime(), + log_uri=response.log_uri, ) else: return start_historical_feature_retrieval_job( @@ -1174,7 +1176,12 @@ def start_offline_to_online_ingestion( request.end_date.FromDatetime(end) response = self._job_service.StartOfflineToOnlineIngestionJob(request) return RemoteBatchIngestionJob( - self._job_service, self._extra_grpc_params, response.id, + self._job_service, + self._extra_grpc_params, + response.id, + feature_table.name, + response.job_start_time.ToDatetime(), + response.log_uri, ) def start_stream_to_online_ingestion( @@ -1196,7 +1203,12 @@ def start_stream_to_online_ingestion( ) response = self._job_service.StartStreamToOnlineIngestionJob(request) return RemoteStreamIngestionJob( - self._job_service, self._extra_grpc_params, response.id + self._job_service, + self._extra_grpc_params, + response.id, + feature_table.name, + response.job_start_time, + response.log_uri, ) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 6742ff1493..e613b16d73 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -5,9 +5,10 @@ import time import traceback from concurrent.futures import ThreadPoolExecutor -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, cast import grpc +from google.protobuf.timestamp_pb2 import Timestamp import feast from feast.constants import ConfigOptions as opt @@ -54,6 +55,7 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto: job = JobProto() job.id = spark_job.get_id() + job.log_uri = cast(str, spark_job.get_log_uri() or "") status = spark_job.get_status() if status == SparkJobStatus.COMPLETED: job.status = JobStatus.JOB_STATUS_DONE @@ -71,11 +73,15 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto: job.retrieval.output_location = spark_job.get_output_file_uri(block=False) elif isinstance(spark_job, BatchIngestionJob): job.type = JobType.BATCH_INGESTION_JOB + job.batch_ingestion.table_name = spark_job.get_feature_table() elif isinstance(spark_job, StreamIngestionJob): job.type = JobType.STREAM_INGESTION_JOB + job.stream_ingestion.table_name = spark_job.get_feature_table() else: raise ValueError(f"Invalid job type {job}") + job.start_time.FromDatetime(spark_job.get_start_time()) + return job @@ -97,7 +103,16 @@ def StartOfflineToOnlineIngestionJob( start=request.start_date.ToDatetime(), end=request.end_date.ToDatetime(), ) - return StartOfflineToOnlineIngestionJobResponse(id=job.get_id()) + + job_start_timestamp = Timestamp() + job_start_timestamp.FromDatetime(job.get_start_time()) + + return StartOfflineToOnlineIngestionJobResponse( + id=job.get_id(), + job_start_time=job_start_timestamp, + table_name=request.table_name, + log_uri=job.get_log_uri(), + ) def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context): """Produce a training dataset, return a job id that will provide a file reference""" @@ -114,8 +129,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context): output_file_uri = job.get_output_file_uri(block=False) + job_start_timestamp = Timestamp() + job_start_timestamp.FromDatetime(job.get_start_time()) + return GetHistoricalFeaturesResponse( - id=job.get_id(), output_file_uri=output_file_uri + id=job.get_id(), + output_file_uri=output_file_uri, + job_start_time=job_start_timestamp, ) def StartStreamToOnlineIngestionJob( @@ -135,7 +155,14 @@ def StartStreamToOnlineIngestionJob( job_hash = params.get_job_hash() for job in list_jobs(include_terminated=True, client=self.client): if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash: - return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) + job_start_timestamp = Timestamp() + job_start_timestamp.FromDatetime(job.get_start_time()) + return StartStreamToOnlineIngestionJobResponse( + id=job.get_id(), + job_start_time=job_start_timestamp, + table_name=job.get_feature_table(), + log_uri=job.get_log_uri(), + ) raise RuntimeError( "Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable" ) @@ -147,7 +174,15 @@ def StartStreamToOnlineIngestionJob( feature_table=feature_table, extra_jars=[], ) - return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) + + job_start_timestamp = Timestamp() + job_start_timestamp.FromDatetime(job.get_start_time()) + return StartStreamToOnlineIngestionJobResponse( + id=job.get_id(), + job_start_time=job_start_timestamp, + table_name=request.table_name, + log_uri=job.get_log_uri(), + ) def ListJobs(self, request, context): """List all types of jobs""" diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index 64c27d9b1d..04aa697e3e 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -67,6 +67,18 @@ def cancel(self): """ raise NotImplementedError + @abc.abstractmethod + def get_start_time(self) -> datetime: + """ + Get job start time. + """ + + def get_log_uri(self) -> Optional[str]: + """ + Get path to Spark job log, if applicable. + """ + return None + class SparkJobParameters(abc.ABC): @abc.abstractmethod @@ -496,6 +508,18 @@ class BatchIngestionJob(SparkJob): Container for the ingestion job result """ + @abc.abstractmethod + def get_feature_table(self) -> str: + """ + Get the feature table name associated with this job. Return empty string if unable to + determine the feature table, such as when the job is created by the earlier + version of Feast. + + Returns: + str: Feature table name + """ + raise NotImplementedError + class StreamIngestionJob(SparkJob): """ @@ -513,6 +537,18 @@ def get_hash(self) -> str: """ raise NotImplementedError + @abc.abstractmethod + def get_feature_table(self) -> str: + """ + Get the feature table name associated with this job. Return `None` if unable to + determine the feature table, such as when the job is created by the earlier + version of Feast. + + Returns: + str: Feature table name + """ + raise NotImplementedError + class JobLauncher(abc.ABC): """ diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py index 49eab4c7a4..486135d95d 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -1,4 +1,5 @@ import os +from datetime import datetime from io import BytesIO from typing import Any, Dict, List, Optional from urllib.parse import urlunparse @@ -31,6 +32,7 @@ EmrJobRef, JobInfo, _cancel_job, + _get_job_creation_time, _get_job_state, _historical_retrieval_step, _job_ref_to_str, @@ -72,6 +74,9 @@ def get_status(self) -> SparkJobStatus: def cancel(self): _cancel_job(self._emr_client, self._job_ref) + def get_start_time(self) -> datetime: + return _get_job_creation_time(self._emr_client, self._job_ref) + class EmrRetrievalJob(EmrJobMixin, RetrievalJob): """ @@ -106,8 +111,12 @@ class EmrBatchIngestionJob(EmrJobMixin, BatchIngestionJob): Ingestion job result for a EMR cluster """ - def __init__(self, emr_client, job_ref: EmrJobRef): + def __init__(self, emr_client, job_ref: EmrJobRef, table_name: str): super().__init__(emr_client, job_ref) + self._table_name = table_name + + def get_feature_table(self) -> str: + return self._table_name class EmrStreamIngestionJob(EmrJobMixin, StreamIngestionJob): @@ -115,13 +124,17 @@ class EmrStreamIngestionJob(EmrJobMixin, StreamIngestionJob): Ingestion streaming job for a EMR cluster """ - def __init__(self, emr_client, job_ref: EmrJobRef, job_hash: str): + def __init__(self, emr_client, job_ref: EmrJobRef, job_hash: str, table_name: str): super().__init__(emr_client, job_ref) self._job_hash = job_hash + self._table_name = table_name def get_hash(self) -> str: return self._job_hash + def get_feature_table(self) -> str: + return self._table_name + class EmrClusterLauncher(JobLauncher): """ @@ -265,7 +278,9 @@ def offline_to_online_ingestion( job_ref = self._submit_emr_job(step) - return EmrBatchIngestionJob(self._emr_client(), job_ref) + return EmrBatchIngestionJob( + self._emr_client(), job_ref, ingestion_job_params.get_feature_table_name() + ) def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters @@ -299,7 +314,12 @@ def start_stream_to_online_ingestion( job_ref = self._submit_emr_job(step) - return EmrStreamIngestionJob(self._emr_client(), job_ref, job_hash) + return EmrStreamIngestionJob( + self._emr_client(), + job_ref, + job_hash, + ingestion_job_params.get_feature_table_name(), + ) def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: if job_info.job_type == HISTORICAL_RETRIEVAL_JOB_TYPE: @@ -310,16 +330,23 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: output_file_uri=job_info.output_file_uri, ) elif job_info.job_type == OFFLINE_TO_ONLINE_JOB_TYPE: + table_name = job_info.table_name if job_info.table_name else "" + assert table_name is not None return EmrBatchIngestionJob( - emr_client=self._emr_client(), job_ref=job_info.job_ref, + emr_client=self._emr_client(), + job_ref=job_info.job_ref, + table_name=table_name, ) elif job_info.job_type == STREAM_TO_ONLINE_JOB_TYPE: + table_name = job_info.table_name if job_info.table_name else "" + assert table_name is not None # job_hash must not be None for stream ingestion jobs assert job_info.job_hash is not None return EmrStreamIngestionJob( emr_client=self._emr_client(), job_ref=job_info.job_ref, job_hash=job_info.job_hash, + table_name=table_name, ) else: # We should never get here diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py index 175d0709ef..0b8c2b2c6d 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py @@ -3,9 +3,11 @@ import random import string import time +from datetime import datetime from typing import Any, Dict, List, NamedTuple, Optional from urllib.parse import urlparse, urlunparse +import pytz import yaml from feast.pyspark.abc import BQ_SPARK_PACKAGE @@ -21,6 +23,7 @@ "EmrJobRef", "JobInfo", "_cancel_job", + "_get_job_creation_time", "_get_job_state", "_historical_retrieval_step", "_job_ref_to_str", @@ -240,6 +243,21 @@ def _get_step_state(emr_client, cluster_id: str, step_id: str) -> str: return state +def _get_job_creation_time(emr_client, job: EmrJobRef) -> datetime: + if job.step_id is None: + step_id = _get_first_step_id(emr_client, job.cluster_id) + else: + step_id = job.step_id + + return _get_step_creation_time(emr_client, job.cluster_id, step_id) + + +def _get_step_creation_time(emr_client, cluster_id: str, step_id: str) -> datetime: + response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id) + step_creation_time = response["Step"]["Status"]["Timeline"]["CreationDateTime"] + return step_creation_time.astimezone(pytz.utc).replace(tzinfo=None) + + def _wait_for_step_state( emr_client, cluster_id: str, diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 6ca136a182..bd9cf11647 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -130,6 +130,12 @@ def block_polling(self, interval_sec=30, timeout_sec=3600) -> SparkJobStatus: time.sleep(interval_sec) return status + def get_start_time(self): + return self._job.status.state_start_time + + def get_log_uri(self) -> Optional[str]: + return self._job.driver_output_resource_uri + class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob): """ @@ -167,6 +173,9 @@ class DataprocBatchIngestionJob(DataprocJobMixin, BatchIngestionJob): Batch Ingestion job result for a Dataproc cluster """ + def get_feature_table(self) -> str: + return self._job.labels.get(DataprocClusterLauncher.FEATURE_TABLE_LABEL_KEY, "") + class DataprocStreamingIngestionJob(DataprocJobMixin, StreamIngestionJob): """ @@ -186,6 +195,9 @@ def __init__( def get_hash(self) -> str: return self._job_hash + def get_feature_table(self) -> str: + return self._job.labels.get(DataprocClusterLauncher.FEATURE_TABLE_LABEL_KEY, "") + class DataprocClusterLauncher(JobLauncher): """ @@ -197,6 +209,7 @@ class DataprocClusterLauncher(JobLauncher): EXTERNAL_JARS = ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"] JOB_TYPE_LABEL_KEY = "feast_job_type" JOB_HASH_LABEL_KEY = "feast_job_hash" + FEATURE_TABLE_LABEL_KEY = "feast_feature_tables" def __init__( self, @@ -282,10 +295,19 @@ def dataproc_submit( "spark.executor.cores": self.executor_cores, "spark.executor.memory": self.executor_memory, } - # Add job hash to labels only for the stream ingestion job + if isinstance(job_params, StreamIngestionJobParameters): + job_config["labels"][ + self.FEATURE_TABLE_LABEL_KEY + ] = job_params.get_feature_table_name() + # Add job hash to labels only for the stream ingestion job job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash() + if isinstance(job_params, BatchIngestionJobParameters): + job_config["labels"][ + self.FEATURE_TABLE_LABEL_KEY + ] = job_params.get_feature_table_name() + if job_params.get_class_name(): scala_job_properties = { "spark.yarn.user.classpath.first": "true", @@ -391,7 +413,7 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob: cancel_fn = partial(self.dataproc_cancel, job_id) if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower(): - output_path = job.pyspark_job.properties.get("dev.feast.outputuri") + output_path = job.pyspark_job.properties.get("dev.feast.outputuri", "") return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path) if job_type == SparkJobType.BATCH_INGESTION.name.lower(): diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s.py b/sdk/python/feast/pyspark/launchers/k8s/k8s.py index dc555f7b78..1361860159 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s.py @@ -1,6 +1,7 @@ import random import string import time +from datetime import datetime from io import BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, cast @@ -28,6 +29,7 @@ from .k8s_utils import ( DEFAULT_JOB_TEMPLATE, HISTORICAL_RETRIEVAL_JOB_TYPE, + LABEL_FEATURE_TABLE, METADATA_JOBHASH, METADATA_OUTPUT_URI, OFFLINE_TO_ONLINE_JOB_TYPE, @@ -67,6 +69,11 @@ def get_status(self) -> SparkJobStatus: assert job is not None return job.state + def get_start_time(self) -> datetime: + job = _get_job_by_id(self._api, self._namespace, self._job_id) + assert job is not None + return job.start_time + def cancel(self): _cancel_job_by_id(self._api, self._namespace, self._job_id) @@ -117,8 +124,14 @@ class KubernetesBatchIngestionJob(KubernetesJobMixin, BatchIngestionJob): Ingestion job result for a k8s cluster """ - def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str): + def __init__( + self, api: CustomObjectsApi, namespace: str, job_id: str, feature_table: str + ): super().__init__(api, namespace, job_id) + self._feature_table = feature_table + + def get_feature_table(self) -> str: + return self._feature_table class KubernetesStreamIngestionJob(KubernetesJobMixin, StreamIngestionJob): @@ -127,14 +140,23 @@ class KubernetesStreamIngestionJob(KubernetesJobMixin, StreamIngestionJob): """ def __init__( - self, api: CustomObjectsApi, namespace: str, job_id: str, job_hash: str + self, + api: CustomObjectsApi, + namespace: str, + job_id: str, + job_hash: str, + feature_table: str, ): super().__init__(api, namespace, job_id) self._job_hash = job_hash + self._feature_table = feature_table def get_hash(self) -> str: return self._job_hash + def get_feature_table(self) -> str: + return self._feature_table + class KubernetesJobLauncher(JobLauncher): """ @@ -169,7 +191,10 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: ) elif job_info.job_type == OFFLINE_TO_ONLINE_JOB_TYPE: return KubernetesBatchIngestionJob( - api=self._api, namespace=job_info.namespace, job_id=job_info.job_id, + api=self._api, + namespace=job_info.namespace, + job_id=job_info.job_id, + feature_table=job_info.labels.get(LABEL_FEATURE_TABLE, ""), ) elif job_info.job_type == STREAM_TO_ONLINE_JOB_TYPE: # job_hash must not be None for stream ingestion jobs @@ -179,6 +204,7 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: namespace=job_info.namespace, job_id=job_info.job_id, job_hash=job_info.extra_metadata[METADATA_JOBHASH], + feature_table=job_info.labels.get(LABEL_FEATURE_TABLE, ""), ) else: # We should never get here @@ -293,6 +319,9 @@ def offline_to_online_ingestion( azure_credentials=self._get_azure_credentials(), arguments=ingestion_job_params.get_arguments(), namespace=self._namespace, + extra_labels={ + LABEL_FEATURE_TABLE: ingestion_job_params.get_feature_table_name() + }, ) job_info = _submit_job( @@ -336,6 +365,9 @@ def start_stream_to_online_ingestion( azure_credentials=self._get_azure_credentials(), arguments=ingestion_job_params.get_arguments(), namespace=self._namespace, + extra_labels={ + LABEL_FEATURE_TABLE: ingestion_job_params.get_feature_table_name() + }, ) job_info = _submit_job( diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py index ab8d3aa93c..9630d08266 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py @@ -1,4 +1,5 @@ from copy import deepcopy +from datetime import datetime from typing import Any, Dict, List, NamedTuple, Optional, Tuple from kubernetes import client, config @@ -26,6 +27,7 @@ LABEL_JOBID = "feast.dev/jobid" LABEL_JOBTYPE = "feast.dev/type" +LABEL_FEATURE_TABLE = "feast.dev/table" # Can't store these bits of info in k8s labels due to 64-character limit, so we store them as # sparkConf @@ -114,11 +116,14 @@ def _prepare_job_resource( azure_credentials: Dict[str, str], arguments: List[str], namespace: str, + extra_labels: Dict[str, str] = None, ) -> Dict[str, Any]: """ Prepare SparkApplication custom resource configs """ job = deepcopy(job_template) labels = {LABEL_JOBID: job_id, LABEL_JOBTYPE: job_type} + if extra_labels: + labels = {**labels, **extra_labels} _add_keys(job, ("metadata", "labels"), labels) _add_keys( @@ -164,6 +169,8 @@ class JobInfo(NamedTuple): namespace: str extra_metadata: Dict[str, str] state: SparkJobStatus + labels: Dict[str, str] + start_time: datetime STATE_MAP = { @@ -186,6 +193,9 @@ def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus: def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo: labels = resource["metadata"]["labels"] + start_time = datetime.strptime( + resource["metadata"].get("creationTimestamp"), "%Y-%m-%dT%H:%M:%SZ" + ) sparkConf = resource["spec"].get("sparkConf", {}) if "status" in resource: @@ -199,6 +209,8 @@ def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo: namespace=resource["metadata"].get("namespace", "default"), extra_metadata={k: v for k, v in sparkConf.items() if k in METADATA_KEYS}, state=state, + labels=labels, + start_time=start_time, ) diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 440e1d73ef..e67dd75630 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -4,6 +4,7 @@ import threading import uuid from contextlib import closing +from datetime import datetime from typing import Dict, List, Optional import requests @@ -101,6 +102,7 @@ def __init__( self._job_name = job_name self._process = process self._ui_port = ui_port + self._start_time = datetime.utcnow() def get_id(self) -> str: return self._job_id @@ -127,6 +129,9 @@ def check_if_started(self): ).json() return bool(stages) + def get_start_time(self) -> datetime: + return self._start_time + def get_status(self) -> SparkJobStatus: code = self._process.poll() if code is None: @@ -149,7 +154,19 @@ class StandaloneClusterBatchIngestionJob(StandaloneClusterJobMixin, BatchIngesti Batch Ingestion job result for a standalone spark cluster """ - pass + def __init__( + self, + job_id: str, + job_name: str, + process: subprocess.Popen, + ui_port: int, + feature_table: str, + ) -> None: + super().__init__(job_id, job_name, process, ui_port) + self._feature_table = feature_table + + def get_feature_table(self) -> str: + return self._feature_table class StandaloneClusterStreamingIngestionJob( @@ -166,13 +183,18 @@ def __init__( process: subprocess.Popen, ui_port: int, job_hash: str, + feature_table: str, ) -> None: super().__init__(job_id, job_name, process, ui_port) self._job_hash = job_hash + self._feature_table = feature_table def get_hash(self) -> str: return self._job_hash + def get_feature_table(self) -> str: + return self._feature_table + class StandaloneClusterRetrievalJob(StandaloneClusterJobMixin, RetrievalJob): """ @@ -312,6 +334,7 @@ def offline_to_online_ingestion( ingestion_job_params.get_name(), self.spark_submit(ingestion_job_params, ui_port), ui_port, + ingestion_job_params.get_feature_table_name(), ) global_job_cache.add_job(job) return job @@ -327,6 +350,7 @@ def start_stream_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ingestion_job_params.get_job_hash(), + ingestion_job_params.get_feature_table_name(), ) global_job_cache.add_job(job) return job diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 6af6081749..8e1e12a70f 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -1,5 +1,6 @@ import time -from typing import Any, Callable, Dict, List +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional from feast.core.JobService_pb2 import CancelJobRequest, GetJobRequest from feast.core.JobService_pb2 import Job as JobProto @@ -23,6 +24,8 @@ def __init__( service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, + start_time: datetime, + log_uri: Optional[str], ): """ Args: @@ -32,6 +35,8 @@ def __init__( self._job_id = job_id self._service = service self._grpc_extra_param_provider = grpc_extra_param_provider + self._start_time = start_time + self._log_uri = log_uri def get_id(self) -> str: return self._job_id @@ -53,6 +58,9 @@ def get_status(self) -> SparkJobStatus: # we should never get here raise Exception(f"Invalid remote job state {response.job.status}") + def get_start_time(self) -> datetime: + return self._start_time + def cancel(self): self._service.CancelJob( CancelJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() @@ -72,6 +80,9 @@ def _wait_for_job_status( else: raise TimeoutError("Timed out waiting for job status") + def get_log_uri(self) -> Optional[str]: + return self._log_uri + class RemoteRetrievalJob(RemoteJobMixin, RetrievalJob): """ @@ -84,6 +95,8 @@ def __init__( grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, output_file_uri: str, + start_time: datetime, + log_uri: Optional[str], ): """ This is the job object representing the historical retrieval job. @@ -91,7 +104,9 @@ def __init__( Args: output_file_uri (str): Uri to the historical feature retrieval job output file. """ - super().__init__(service, grpc_extra_param_provider, job_id) + super().__init__( + service, grpc_extra_param_provider, job_id, start_time, log_uri + ) self._output_file_uri = output_file_uri def get_output_file_uri(self, timeout_sec=None): @@ -115,8 +130,17 @@ def __init__( service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, + feature_table: str, + start_time: datetime, + log_uri: Optional[str], ): - super().__init__(service, grpc_extra_param_provider, job_id) + super().__init__( + service, grpc_extra_param_provider, job_id, start_time, log_uri + ) + self._feature_table = feature_table + + def get_feature_table(self) -> str: + return self._feature_table class RemoteStreamIngestionJob(RemoteJobMixin, StreamIngestionJob): @@ -124,6 +148,20 @@ class RemoteStreamIngestionJob(RemoteJobMixin, StreamIngestionJob): Stream ingestion job result. """ + def __init__( + self, + service: JobServiceStub, + grpc_extra_param_provider: GrpcExtraParamProvider, + job_id: str, + feature_table: str, + start_time: datetime, + log_uri: Optional[str], + ): + super().__init__( + service, grpc_extra_param_provider, job_id, start_time, log_uri + ) + self._feature_table = feature_table + def get_hash(self) -> str: response = self._service.GetJob( GetJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() @@ -131,6 +169,9 @@ def get_hash(self) -> str: return response.job.hash + def get_feature_table(self) -> str: + return self._feature_table + def get_remote_job_from_proto( service: JobServiceStub, @@ -147,14 +188,34 @@ def get_remote_job_from_proto( Returns: (SparkJob): A remote job object for the given job """ + if job.type == JobType.RETRIEVAL_JOB: return RemoteRetrievalJob( - service, grpc_extra_param_provider, job.id, job.retrieval.output_location + service, + grpc_extra_param_provider, + job.id, + job.retrieval.output_location, + job.start_time.ToDatetime(), + job.log_uri, ) elif job.type == JobType.BATCH_INGESTION_JOB: - return RemoteBatchIngestionJob(service, grpc_extra_param_provider, job.id) + return RemoteBatchIngestionJob( + service, + grpc_extra_param_provider, + job.id, + job.batch_ingestion.table_name, + job.start_time.ToDatetime(), + job.log_uri, + ) elif job.type == JobType.STREAM_INGESTION_JOB: - return RemoteStreamIngestionJob(service, grpc_extra_param_provider, job.id) + return RemoteStreamIngestionJob( + service, + grpc_extra_param_provider, + job.id, + job.stream_ingestion.table_name, + job.start_time.ToDatetime(), + job.log_uri, + ) else: raise ValueError( f"Invalid Job Type {job.type}, has to be one of " diff --git a/sdk/python/tests/test_remote_job.py b/sdk/python/tests/test_remote_job.py index 98c18642c5..37bbb50414 100644 --- a/sdk/python/tests/test_remote_job.py +++ b/sdk/python/tests/test_remote_job.py @@ -1,6 +1,7 @@ from collections import defaultdict from concurrent import futures from contextlib import contextmanager +from datetime import datetime import grpc @@ -57,7 +58,9 @@ def GetJob(self, request, context): mock_servicer = MockServicer() with mock_server(mock_servicer) as service: - remote_job = RemoteRetrievalJob(service, lambda: {}, "test", "foo") + remote_job = RemoteRetrievalJob( + service, lambda: {}, "test", "foo", datetime.now(), None + ) assert remote_job.get_output_file_uri(timeout_sec=2) == "foo" assert mock_servicer._call_count["GetJob"] == 2 diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py index fb6b969972..ed862b9c91 100644 --- a/tests/e2e/test_historical_features.py +++ b/tests/e2e/test_historical_features.py @@ -106,7 +106,11 @@ def test_historical_features( feature_refs = ["transactions:daily_transactions"] + job_submission_time = datetime.utcnow() job = feast_client.get_historical_features(feature_refs, customers_df) + assert job.get_start_time() >= job_submission_time + assert job.get_start_time() <= job_submission_time + timedelta(hours=1) + output_dir = job.get_output_file_uri() joined_df = read_parquet(output_dir) diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py index beee4e62ab..8cda80ca89 100644 --- a/tests/e2e/test_online_features.py +++ b/tests/e2e/test_online_features.py @@ -125,6 +125,7 @@ def test_streaming_ingestion( feast_client.apply(feature_table) job = feast_client.start_stream_to_online_ingestion(feature_table) + assert job.get_feature_table() == feature_table.name wait_retry_backoff( lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120 @@ -165,6 +166,7 @@ def ingest_and_verify( original.event_timestamp.min().to_pydatetime(), original.event_timestamp.max().to_pydatetime() + timedelta(seconds=1), ) + assert job.get_feature_table() == feature_table.name wait_retry_backoff( lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180 diff --git a/tests/integration/test_launchers.py b/tests/integration/test_launchers.py index fba5804718..44a6fd22ed 100644 --- a/tests/integration/test_launchers.py +++ b/tests/integration/test_launchers.py @@ -32,6 +32,7 @@ def test_dataproc_job_api( job = dataproc_launcher.historical_feature_retrieval(dataproc_retrieval_job_params) job_id = job.get_id() retrieved_job = dataproc_launcher.get_job_by_id(job_id) + assert retrieved_job.get_log_uri is not None assert retrieved_job.get_id() == job_id status = retrieved_job.get_status() assert status in [