Skip to content

Commit

Permalink
Refactor staging client uploader and use it in EMR launcher
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Dec 8, 2020
1 parent 96b082b commit 0a2b6ec
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 165 deletions.
4 changes: 2 additions & 2 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class ConfigOptions(metaclass=ConfigMeta):

#: Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER.
#:
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/
SPARK_STAGING_LOCATION: Optional[str] = None

#: Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER.
Expand Down Expand Up @@ -206,7 +206,7 @@ class ConfigOptions(metaclass=ConfigMeta):

#: Ingestion Job DeadLetter Destination. The choice of storage is connected to the choice of SPARK_LAUNCHER.
#:
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/
DEADLETTER_PATH: str = ""

#: ProtoRegistry Address (currently only Stencil Server is supported as registry)
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/loaders/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def export_source_to_staging_location(
)
else:
# gs, s3 file provided as a source.
assert source_uri.hostname is not None
return get_staging_client(source_uri.scheme).list_files(
bucket=source_uri.hostname, path=source_uri.path
)
Expand All @@ -87,9 +88,12 @@ def export_source_to_staging_location(
)

# Push data to required staging location
get_staging_client(uri.scheme).upload_file(
source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name,
)
with open(source_path, "rb") as f:
get_staging_client(uri.scheme).upload_fileobj(
f,
source_path,
remote_uri=uri._replace(path=str(uri.path).strip("/") + "/" + file_name),
)

# Clean up, remove local staging file
if dir_path and isinstance(source, pd.DataFrame) and len(dir_path) > 4:
Expand Down
28 changes: 20 additions & 8 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,28 @@ def _upload_to_file_source(
for path in glob.glob(os.path.join(dest_path, "**/*")):
file_name = path.split("/")[-1]
partition_col = path.split("/")[-2]
staging_client.upload_file(
path,
uri.hostname,
str(uri.path).strip("/") + "/" + partition_col + "/" + file_name,
)
with open(path, "rb") as f:
staging_client.upload_fileobj(
f,
path,
remote_uri=uri._replace(
path=str(uri.path).rstrip("/")
+ "/"
+ partition_col
+ "/"
+ file_name
),
)
else:
file_name = dest_path.split("/")[-1]
staging_client.upload_file(
dest_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name,
)
with open(dest_path, "rb") as f:
staging_client.upload_fileobj(
f,
dest_path,
remote_uri=uri._replace(
path=str(uri.path).rstrip("/") + "/" + file_name
),
)


def _upload_to_bq_source(
Expand Down
31 changes: 20 additions & 11 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import tempfile
from io import BytesIO
from typing import Any, Dict, List, Optional
from urllib.parse import urlunparse

import boto3
import pandas
Expand All @@ -21,6 +22,7 @@
StreamIngestionJob,
StreamIngestionJobParameters,
)
from feast.staging.storage_client import get_staging_client

from .emr_utils import (
FAILED_STEP_STATES,
Expand All @@ -39,7 +41,6 @@
_list_jobs,
_load_new_cluster_template,
_random_string,
_s3_upload,
_stream_ingestion_step,
_sync_offline_to_online_step,
_upload_jar,
Expand Down Expand Up @@ -221,11 +222,13 @@ def historical_feature_retrieval(
with open(job_params.get_main_file_path()) as f:
pyspark_script = f.read()

pyspark_script_path = _s3_upload(
BytesIO(pyspark_script.encode("utf8")),
local_path="historical_retrieval.py",
remote_path_prefix=self._staging_location,
remote_path_suffix=".py",
pyspark_script_path = urlunparse(
get_staging_client("s3").upload_fileobj(
BytesIO(pyspark_script.encode("utf8")),
local_path="historical_retrieval.py",
remote_path_prefix=self._staging_location,
remote_path_suffix=".py",
)
)

step = _historical_retrieval_step(
Expand Down Expand Up @@ -304,12 +307,18 @@ def start_stream_to_online_ingestion(
def stage_dataframe(self, df: pandas.DataFrame, event_timestamp: str) -> FileSource:
with tempfile.NamedTemporaryFile() as f:
df.to_parquet(f)
file_url = _s3_upload(
f,
f.name,
remote_path_prefix=os.path.join(self._staging_location, "dataframes"),
remote_path_suffix=".parquet",

file_url = urlunparse(
get_staging_client("s3").upload_fileobj(
f,
f.name,
remote_path_prefix=os.path.join(
self._staging_location, "dataframes"
),
remote_path_suffix=".parquet",
)
)

return FileSource(
event_timestamp_column=event_timestamp,
file_format=ParquetFormat(),
Expand Down
87 changes: 8 additions & 79 deletions sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import hashlib
import logging
import os
import random
import string
import time
from typing import IO, Any, Dict, List, NamedTuple, Optional, Tuple
from typing import Any, Dict, List, NamedTuple, Optional
from urllib.parse import urlparse, urlunparse

import boto3
import botocore
import yaml

__all__ = [
Expand All @@ -27,12 +25,12 @@
"_list_jobs",
"_load_new_cluster_template",
"_random_string",
"_s3_upload",
"_stream_ingestion_step",
"_sync_offline_to_online_step",
"_upload_jar",
"_wait_for_job_state",
]
from feast.staging.storage_client import get_staging_client

log = logging.getLogger("aws")

Expand Down Expand Up @@ -77,82 +75,13 @@ def _random_string(length) -> str:
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))


def _s3_split_path(path: str) -> Tuple[str, str]:
""" Convert s3:// url to (bucket, key) """
assert path.startswith("s3://")
_, _, bucket, key = path.split("/", 3)
return bucket, key


def _hash_fileobj(fileobj: IO[bytes]) -> str:
""" Compute sha256 hash of a file. File pointer will be reset to 0 on return. """
fileobj.seek(0)
h = hashlib.sha256()
for block in iter(lambda: fileobj.read(2 ** 20), b""):
h.update(block)
fileobj.seek(0)
return h.hexdigest()


def _s3_upload(
fileobj: IO[bytes],
local_path: str,
*,
remote_path: Optional[str] = None,
remote_path_prefix: Optional[str] = None,
remote_path_suffix: Optional[str] = None,
) -> str:
"""
Upload a local file to S3. We store the file sha256 sum in S3 metadata and skip the upload
if the file hasn't changed.
You can either specify remote_path or remote_path_prefix+remote_path_suffix. In the latter case,
the remote path will be computed as $remote_path_prefix/$sha256$remote_path_suffix
"""

assert (remote_path is not None) or (
remote_path_prefix is not None and remote_path_suffix is not None
)

sha256sum = _hash_fileobj(fileobj)

if remote_path is None:
assert remote_path_prefix is not None
remote_path = os.path.join(
remote_path_prefix, f"{sha256sum}{remote_path_suffix}"
)

bucket, key = _s3_split_path(remote_path)
client = boto3.client("s3")

try:
head_response = client.head_object(Bucket=bucket, Key=key)
if head_response["Metadata"]["sha256sum"] == sha256sum:
# File already exists
return remote_path
else:
log.info("Uploading {local_path} to {remote_path}")
client.upload_fileobj(
fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
)
return remote_path
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
log.info("Uploading {local_path} to {remote_path}")
client.upload_fileobj(
fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
)
return remote_path
else:
raise


def _upload_jar(jar_s3_prefix: str, local_path: str) -> str:
with open(local_path, "rb") as f:
return _s3_upload(
f,
local_path,
remote_path=os.path.join(jar_s3_prefix, os.path.basename(local_path)),
uri = urlparse(os.path.join(jar_s3_prefix, os.path.basename(local_path)))
return urlunparse(
get_staging_client(uri.scheme).upload_fileobj(
f, local_path, remote_uri=uri,
)
)


Expand Down
12 changes: 9 additions & 3 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,16 @@ def _stage_file(self, file_path: str, job_id: str) -> str:
return file_path

staging_client = get_staging_client("gs")
blob_path = os.path.join(self.remote_path, job_id, os.path.basename(file_path),)
staging_client.upload_file(file_path, self.staging_bucket, blob_path)
blob_path = os.path.join(
self.remote_path, job_id, os.path.basename(file_path),
).lstrip("/")
blob_uri_str = f"gs://{self.staging_bucket}/{blob_path}"
with open(file_path, "rb") as f:
staging_client.upload_fileobj(
f, file_path, remote_uri=urlparse(blob_uri_str)
)

return f"gs://{self.staging_bucket}/{blob_path}"
return blob_uri_str

def dataproc_submit(
self, job_params: SparkJobParameters
Expand Down
11 changes: 5 additions & 6 deletions sdk/python/feast/staging/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ def stage_entities_to_fs(
)

entity_source.to_parquet(df_export_path.name)
bucket = (
None if entity_staging_uri.scheme == "file" else entity_staging_uri.netloc
)
staging_client.upload_file(
df_export_path.name, bucket, entity_staging_uri.path.lstrip("/")
)

with open(df_export_path.name, "rb") as f:
staging_client.upload_fileobj(
f, df_export_path.name, remote_uri=entity_staging_uri
)

# ToDo: support custom event_timestamp_column
return FileSource(
Expand Down
Loading

0 comments on commit 0a2b6ec

Please sign in to comment.