Skip to content

Commit

Permalink
launcher
Browse files Browse the repository at this point in the history
move common job properties to job classes

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

universal test
  • Loading branch information
pyalex committed Oct 15, 2020
1 parent fe202c9 commit 42c4480
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 98 deletions.
11 changes: 11 additions & 0 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import uuid
from itertools import groupby
from typing import Any, Dict, List, Optional, Union
from datetime import datetime

import grpc
import pandas as pd
Expand Down Expand Up @@ -78,11 +79,13 @@
from feast.pyspark.launcher import (
start_historical_feature_retrieval_job,
start_historical_feature_retrieval_spark_session,
start_offline_to_online_ingestion,
)
from feast.serving.ServingService_pb2 import (
GetFeastServingInfoRequest,
GetOnlineFeaturesRequestV2,
)
from feast.pyspark.abc import SparkJob
from feast.serving.ServingService_pb2_grpc import ServingServiceStub

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -883,3 +886,11 @@ def _get_feature_tables_from_feature_refs(
]
feature_tables.append(feature_table)
return feature_tables

def start_offline_to_online_ingestion(
self,
feature_table: Union[FeatureTable, str],
start: Union[datetime, str],
end: Union[datetime, str],
) -> SparkJob:
return start_offline_to_online_ingestion(feature_table, start, end, self)
5 changes: 5 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class AuthProvider(Enum):
# Spark Job Config
CONFIG_SPARK_LAUNCHER = "spark_launcher" # standalone, dataproc, emr

CONFIG_SPARK_INGESTION_JOB_JAR = "spark_ingestion_jar"

CONFIG_SPARK_STANDALONE_MASTER = "spark_standalone_master"
CONFIG_SPARK_HOME = "spark_home"

CONFIG_SPARK_DATAPROC_CLUSTER_NAME = "dataproc_cluster_name"
CONFIG_SPARK_DATAPROC_PROJECT = "dataproc_project"
Expand Down Expand Up @@ -109,4 +112,6 @@ class AuthProvider(Enum):
CONFIG_MAX_WAIT_INTERVAL_KEY: "60",
# Authentication Provider - Google OpenID/OAuth
CONFIG_AUTH_PROVIDER: "google",
CONFIG_SPARK_LAUNCHER: "dataproc",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-0.8-SNAPSHOT.jar",
}
141 changes: 135 additions & 6 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import abc
from typing import Dict, List
import os
import json
from datetime import datetime
from typing import Dict, List, Optional
from enum import Enum


class SparkJobFailure(Exception):
Expand All @@ -10,6 +14,12 @@ class SparkJobFailure(Exception):
pass


class SparkJobStatus(Enum):
IN_PROGRESS = 1
FAILED = 2
COMPLETED = 3


class SparkJob(abc.ABC):
"""
Base class for all spark jobs
Expand All @@ -25,12 +35,87 @@ def get_id(self) -> str:
"""
raise NotImplementedError

@abc.abstractmethod
def get_status(self) -> SparkJobStatus:
"""
Job Status retrieval
:return: SparkJobStatus
"""
raise NotImplementedError

@abc.abstractmethod
def get_name(self) -> str:
"""
Getter for job name
:return: Job name
"""
raise NotImplementedError

@abc.abstractmethod
def get_main_file_path(self) -> str:
"""
Getter for jar | python path
:return: Full path to file
"""
raise NotImplementedError

def get_class_name(self) -> Optional[str]:
"""
Getter for main class name if it's applicable
:return: java class path, e.g. feast.ingestion.IngestionJob
"""
return None

@abc.abstractmethod
def get_arguments(self) -> List[str]:
"""
Getter for job arguments
E.g., ["--source", '{"kafka":...}', ...]
:return: List of arguments
"""
raise NotImplementedError


class RetrievalJob(SparkJob):
"""
Container for the historical feature retrieval job result
"""

def __init__(
self,
feature_tables: List[Dict],
feature_tables_sources: List[Dict],
entity_source: Dict,
destination: Dict,
**kwargs,
):
super().__init__(**kwargs)
self._feature_tables = feature_tables
self._feature_tables_sources = feature_tables_sources
self._entity_source = entity_source
self._destination = destination

def get_name(self) -> str:
return f"HistoryRetrieval-{self.get_id()}"

def get_main_file_path(self) -> str:
return os.path.join(
os.path.dirname(__file__), "historical_feature_retrieval_job.py"
)

def get_arguments(self) -> List[str]:
return [
"--feature-tables",
json.dumps(self._feature_tables),
"--feature-tables-sources",
json.dumps(self._feature_tables_sources),
"--entity-source",
json.dumps(self._entity_source),
"--destination",
json.dumps(self._destination),
]

@abc.abstractmethod
def get_output_file_uri(self, timeout_sec=None):
"""
Expand All @@ -54,7 +139,44 @@ def get_output_file_uri(self, timeout_sec=None):


class IngestionJob(SparkJob):
pass
def __init__(
self,
feature_table: Dict,
source: Dict,
start: datetime,
end: datetime,
jar: str,
**kwargs,
):
super().__init__(**kwargs)
self._feature_table = feature_table
self._source = source
self._start = start
self._end = end
self._jar = jar

def get_name(self) -> str:
return f"BatchIngestion-{self.get_id()}"

def get_main_file_path(self) -> str:
return self._jar

def get_class_name(self) -> Optional[str]:
return "feast.ingestion.IngestionJob"

def get_arguments(self) -> List[str]:
return [
"--mode",
"offline",
"--feature-table",
json.dumps(self._feature_table),
"--source",
json.dumps(self._source),
"--start",
self._start.strftime("%Y-%m-%dT%H:%M:%S"),
"--end",
self._end.strftime("%Y-%m-%dT%H:%M:%S"),
]


class JobLauncher(abc.ABC):
Expand All @@ -65,20 +187,16 @@ class JobLauncher(abc.ABC):
@abc.abstractmethod
def historical_feature_retrieval(
self,
pyspark_script: str,
entity_source_conf: Dict,
feature_tables_sources_conf: List[Dict],
feature_tables_conf: List[Dict],
destination_conf: Dict,
job_id: str,
**kwargs,
) -> RetrievalJob:
"""
Submits a historical feature retrieval job to a Spark cluster.
Args:
pyspark_script (str): Local file path to the pyspark script for historical feature
retrieval.
entity_source_conf (Dict): Entity data source configuration.
feature_tables_sources_conf (List[Dict]): List of feature tables data sources configurations.
feature_tables_conf (List[Dict]): List of feature table specification.
Expand Down Expand Up @@ -191,3 +309,14 @@ def historical_feature_retrieval(
str: file uri to the result file.
"""
raise NotImplementedError

@abc.abstractmethod
def offline_to_online_ingestion(
self,
jar_path: str,
source_conf: Dict,
feature_table_conf: Dict,
start: datetime,
end: datetime,
) -> IngestionJob:
raise NotImplementedError
71 changes: 46 additions & 25 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import pathlib
from typing import TYPE_CHECKING, List, Union

from datetime import datetime
from urllib.parse import urlparse
import tempfile
import shutil

from feast.config import Config
from feast.constants import (
CONFIG_SPARK_DATAPROC_CLUSTER_NAME,
CONFIG_SPARK_DATAPROC_PROJECT,
CONFIG_SPARK_DATAPROC_REGION,
CONFIG_SPARK_DATAPROC_STAGING_LOCATION,
CONFIG_SPARK_LAUNCHER,
CONFIG_SPARK_STANDALONE_MASTER,
)
from feast.constants import *
from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.feature_table import FeatureTable
from feast.pyspark.abc import JobLauncher, RetrievalJob
from feast.pyspark.abc import JobLauncher, RetrievalJob, IngestionJob
from feast.value_type import ValueType
from feast.staging.storage_client import get_staging_client

if TYPE_CHECKING:
from feast.client import Client
Expand All @@ -23,7 +21,7 @@ def _standalone_launcher(config: Config) -> JobLauncher:
from feast.pyspark.launchers import standalone

return standalone.StandaloneClusterLauncher(
config.get(CONFIG_SPARK_STANDALONE_MASTER)
config.get(CONFIG_SPARK_STANDALONE_MASTER), config.get(CONFIG_SPARK_HOME)
)


Expand Down Expand Up @@ -51,7 +49,7 @@ def resolve_launcher(config: Config) -> JobLauncher:
}


def source_to_argument(source: DataSource):
def _source_to_argument(source: DataSource):
common_properties = {
"field_mapping": dict(source.field_mapping),
"event_timestamp_column": source.event_timestamp_column,
Expand All @@ -72,7 +70,7 @@ def source_to_argument(source: DataSource):
return {kind: properties}


def feature_table_to_argument(client: "Client", feature_table: FeatureTable):
def _feature_table_to_argument(client: "Client", feature_table: FeatureTable):
return {
"features": [
{"name": f.name, "type": ValueType(f.dtype).name}
Expand Down Expand Up @@ -102,13 +100,13 @@ def start_historical_feature_retrieval_spark_session(
spark_session = SparkSession.builder.getOrCreate()
return retrieve_historical_features(
spark=spark_session,
entity_source_conf=source_to_argument(entity_source),
entity_source_conf=_source_to_argument(entity_source),
feature_tables_sources_conf=[
source_to_argument(feature_table.batch_source)
_source_to_argument(feature_table.batch_source)
for feature_table in feature_tables
],
feature_tables_conf=[
feature_table_to_argument(client, feature_table)
_feature_table_to_argument(client, feature_table)
for feature_table in feature_tables
],
)
Expand All @@ -123,22 +121,45 @@ def start_historical_feature_retrieval_job(
job_id: str,
) -> RetrievalJob:
launcher = resolve_launcher(client._config)
retrieval_job_pyspark_script = str(
pathlib.Path(__file__).parent.absolute()
/ "pyspark"
/ "historical_feature_retrieval_job.py"
)
return launcher.historical_feature_retrieval(
pyspark_script=retrieval_job_pyspark_script,
entity_source_conf=source_to_argument(entity_source),
entity_source_conf=_source_to_argument(entity_source),
feature_tables_sources_conf=[
source_to_argument(feature_table.batch_source)
_source_to_argument(feature_table.batch_source)
for feature_table in feature_tables
],
feature_tables_conf=[
feature_table_to_argument(client, feature_table)
_feature_table_to_argument(client, feature_table)
for feature_table in feature_tables
],
destination_conf={"format": output_format, "path": output_path},
job_id=job_id,
)


def _download_jar(remote_jar: str) -> str:
remote_jar_parts = urlparse(remote_jar)

f = tempfile.NamedTemporaryFile(suffix=".jar", delete=False)
with f:
shutil.copyfileobj(
get_staging_client(remote_jar_parts.scheme).download_file(remote_jar_parts),
f,
)

return f.name


def start_offline_to_online_ingestion(
feature_table: FeatureTable, start: datetime, end: datetime, client: Client
) -> IngestionJob:

launcher = resolve_launcher(client._config)
local_jar_path = _download_jar(client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR))

return launcher.offline_to_online_ingestion(
jar_path=local_jar_path,
source_conf=_source_to_argument(feature_table.batch_source),
feature_table_conf=_feature_table_to_argument(client, feature_table),
start=start,
end=end,
)
Loading

0 comments on commit 42c4480

Please sign in to comment.