Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance job api to return associated feature table and start time #1259

Merged
merged 8 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,19 @@ message Job {
JobStatus status = 3;
// Deterministic hash of the Job
string hash = 8;
// Start time of the Job
google.protobuf.Timestamp start_time = 9;

message RetrievalJobMeta {
string output_location = 4;
}

message OfflineToOnlineMeta {
string feature_table = 10;
khorshuheng marked this conversation as resolved.
Show resolved Hide resolved
}

message StreamToOnlineMeta {
string feature_table = 11;
}

// JobType specific metadata on the job
Expand All @@ -109,6 +113,12 @@ message StartOfflineToOnlineIngestionJobRequest {
message StartOfflineToOnlineIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;

// Job start time
google.protobuf.Timestamp job_start_time = 2;

// Feature table associated with the job
string feature_table = 3;
khorshuheng marked this conversation as resolved.
Show resolved Hide resolved
}

message GetHistoricalFeaturesRequest {
Expand Down Expand Up @@ -138,7 +148,13 @@ message GetHistoricalFeaturesRequest {
message GetHistoricalFeaturesResponse {
// Export Job with ID assigned by Feast
string id = 1;

// Uri to the join result output file
string output_file_uri = 2;

// Job start time
google.protobuf.Timestamp job_start_time = 3;

}

message StartStreamToOnlineIngestionJobRequest {
Expand All @@ -150,10 +166,16 @@ message StartStreamToOnlineIngestionJobRequest {
message StartStreamToOnlineIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;

// Job start time
google.protobuf.Timestamp job_start_time = 2;

// Feature table associated with the job
string feature_table = 3;
}

message ListJobsRequest {
bool include_terminated = 1;
bool include_terminated = 1;
}

message ListJobsResponse {
Expand Down
13 changes: 11 additions & 2 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ def get_historical_features(
self._extra_grpc_params,
response.id,
output_file_uri=response.output_file_uri,
start_time=response.job_start_time.ToDatetime(),
)
else:
return start_historical_feature_retrieval_job(
Expand Down Expand Up @@ -1174,7 +1175,11 @@ def start_offline_to_online_ingestion(
request.end_date.FromDatetime(end)
response = self._job_service.StartOfflineToOnlineIngestionJob(request)
return RemoteBatchIngestionJob(
self._job_service, self._extra_grpc_params, response.id,
self._job_service,
self._extra_grpc_params,
response.id,
feature_table.name,
response.job_start_time.ToDatetime(),
)

def start_stream_to_online_ingestion(
Expand All @@ -1196,7 +1201,11 @@ def start_stream_to_online_ingestion(
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return RemoteStreamIngestionJob(
self._job_service, self._extra_grpc_params, response.id
self._job_service,
self._extra_grpc_params,
response.id,
feature_table.name,
response.job_start_time,
)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
Expand Down
39 changes: 35 additions & 4 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Dict, List, Tuple

import grpc
from google.protobuf.timestamp_pb2 import Timestamp

import feast
from feast.constants import ConfigOptions as opt
Expand Down Expand Up @@ -71,11 +72,15 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
job.retrieval.output_location = spark_job.get_output_file_uri(block=False)
elif isinstance(spark_job, BatchIngestionJob):
job.type = JobType.BATCH_INGESTION_JOB
job.batch_ingestion.feature_table = spark_job.get_feature_table()
elif isinstance(spark_job, StreamIngestionJob):
job.type = JobType.STREAM_INGESTION_JOB
job.stream_ingestion.feature_table = spark_job.get_feature_table()
else:
raise ValueError(f"Invalid job type {job}")

job.start_time.FromDatetime(spark_job.get_start_time())

return job


Expand All @@ -97,7 +102,15 @@ def StartOfflineToOnlineIngestionJob(
start=request.start_date.ToDatetime(),
end=request.end_date.ToDatetime(),
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())

return StartOfflineToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
feature_table=request.table_name,
)

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
Expand All @@ -114,8 +127,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):

output_file_uri = job.get_output_file_uri(block=False)

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())

return GetHistoricalFeaturesResponse(
id=job.get_id(), output_file_uri=output_file_uri
id=job.get_id(),
output_file_uri=output_file_uri,
job_start_time=job_start_timestamp,
)

def StartStreamToOnlineIngestionJob(
Expand All @@ -135,7 +153,13 @@ def StartStreamToOnlineIngestionJob(
job_hash = params.get_job_hash()
for job in list_jobs(include_terminated=True, client=self.client):
if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash:
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())
job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())
return StartStreamToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
feature_table=job.get_feature_table(),
)
raise RuntimeError(
"Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable"
)
Expand All @@ -147,7 +171,14 @@ def StartStreamToOnlineIngestionJob(
feature_table=feature_table,
extra_jars=[],
)
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())
return StartStreamToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
feature_table=request.table_name,
)

def ListJobs(self, request, context):
"""List all types of jobs"""
Expand Down
30 changes: 30 additions & 0 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def cancel(self):
"""
raise NotImplementedError

@abc.abstractmethod
def get_start_time(self) -> datetime:
"""
Get job start time.
"""


class SparkJobParameters(abc.ABC):
@abc.abstractmethod
Expand Down Expand Up @@ -496,6 +502,18 @@ class BatchIngestionJob(SparkJob):
Container for the ingestion job result
"""

@abc.abstractmethod
def get_feature_table(self) -> str:
"""
Get the feature table name associated with this job. Return empty string if unable to
determine the feature table, such as when the job is created by the earlier
version of Feast.

Returns:
str: Feature table name
"""
raise NotImplementedError


class StreamIngestionJob(SparkJob):
"""
Expand All @@ -513,6 +531,18 @@ def get_hash(self) -> str:
"""
raise NotImplementedError

@abc.abstractmethod
def get_feature_table(self) -> str:
"""
Get the feature table name associated with this job. Return `None` if unable to
determine the feature table, such as when the job is created by the earlier
version of Feast.

Returns:
str: Feature table name
"""
raise NotImplementedError


class JobLauncher(abc.ABC):
"""
Expand Down
37 changes: 32 additions & 5 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from datetime import datetime
from io import BytesIO
from typing import Any, Dict, List, Optional
from urllib.parse import urlunparse
Expand Down Expand Up @@ -31,6 +32,7 @@
EmrJobRef,
JobInfo,
_cancel_job,
_get_job_creation_time,
_get_job_state,
_historical_retrieval_step,
_job_ref_to_str,
Expand Down Expand Up @@ -72,6 +74,9 @@ def get_status(self) -> SparkJobStatus:
def cancel(self):
_cancel_job(self._emr_client, self._job_ref)

def get_start_time(self) -> datetime:
return _get_job_creation_time(self._emr_client, self._job_ref)


class EmrRetrievalJob(EmrJobMixin, RetrievalJob):
"""
Expand Down Expand Up @@ -106,22 +111,30 @@ class EmrBatchIngestionJob(EmrJobMixin, BatchIngestionJob):
Ingestion job result for a EMR cluster
"""

def __init__(self, emr_client, job_ref: EmrJobRef):
def __init__(self, emr_client, job_ref: EmrJobRef, table_name: str):
super().__init__(emr_client, job_ref)
self._table_name = table_name

def get_feature_table(self) -> str:
return self._table_name


class EmrStreamIngestionJob(EmrJobMixin, StreamIngestionJob):
"""
Ingestion streaming job for a EMR cluster
"""

def __init__(self, emr_client, job_ref: EmrJobRef, job_hash: str):
def __init__(self, emr_client, job_ref: EmrJobRef, job_hash: str, table_name: str):
super().__init__(emr_client, job_ref)
self._job_hash = job_hash
self._table_name = table_name

def get_hash(self) -> str:
return self._job_hash

def get_feature_table(self) -> str:
return self._table_name


class EmrClusterLauncher(JobLauncher):
"""
Expand Down Expand Up @@ -265,7 +278,9 @@ def offline_to_online_ingestion(

job_ref = self._submit_emr_job(step)

return EmrBatchIngestionJob(self._emr_client(), job_ref)
return EmrBatchIngestionJob(
self._emr_client(), job_ref, ingestion_job_params.get_feature_table_name()
)

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
Expand Down Expand Up @@ -299,7 +314,12 @@ def start_stream_to_online_ingestion(

job_ref = self._submit_emr_job(step)

return EmrStreamIngestionJob(self._emr_client(), job_ref, job_hash)
return EmrStreamIngestionJob(
self._emr_client(),
job_ref,
job_hash,
ingestion_job_params.get_feature_table_name(),
)

def _job_from_job_info(self, job_info: JobInfo) -> SparkJob:
if job_info.job_type == HISTORICAL_RETRIEVAL_JOB_TYPE:
Expand All @@ -310,16 +330,23 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob:
output_file_uri=job_info.output_file_uri,
)
elif job_info.job_type == OFFLINE_TO_ONLINE_JOB_TYPE:
table_name = job_info.table_name if job_info.table_name else ""
assert table_name is not None
return EmrBatchIngestionJob(
emr_client=self._emr_client(), job_ref=job_info.job_ref,
emr_client=self._emr_client(),
job_ref=job_info.job_ref,
table_name=table_name,
)
elif job_info.job_type == STREAM_TO_ONLINE_JOB_TYPE:
table_name = job_info.table_name if job_info.table_name else ""
assert table_name is not None
# job_hash must not be None for stream ingestion jobs
assert job_info.job_hash is not None
return EmrStreamIngestionJob(
emr_client=self._emr_client(),
job_ref=job_info.job_ref,
job_hash=job_info.job_hash,
table_name=table_name,
)
else:
# We should never get here
Expand Down
18 changes: 18 additions & 0 deletions sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import random
import string
import time
from datetime import datetime
from typing import Any, Dict, List, NamedTuple, Optional
from urllib.parse import urlparse, urlunparse

import pytz
import yaml

from feast.pyspark.abc import BQ_SPARK_PACKAGE
Expand All @@ -21,6 +23,7 @@
"EmrJobRef",
"JobInfo",
"_cancel_job",
"_get_job_creation_time",
"_get_job_state",
"_historical_retrieval_step",
"_job_ref_to_str",
Expand Down Expand Up @@ -240,6 +243,21 @@ def _get_step_state(emr_client, cluster_id: str, step_id: str) -> str:
return state


def _get_job_creation_time(emr_client, job: EmrJobRef) -> datetime:
if job.step_id is None:
step_id = _get_first_step_id(emr_client, job.cluster_id)
else:
step_id = job.step_id

return _get_step_creation_time(emr_client, job.cluster_id, step_id)


def _get_step_creation_time(emr_client, cluster_id: str, step_id: str) -> datetime:
response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id)
step_creation_time = response["Step"]["Status"]["Timeline"]["CreationDateTime"]
return step_creation_time.astimezone(pytz.utc).replace(tzinfo=None)


def _wait_for_step_state(
emr_client,
cluster_id: str,
Expand Down
Loading