diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index e998377ff7..ef6e5cf037 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -140,7 +140,7 @@ class ConfigOptions(metaclass=ConfigMeta): #: Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: - #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/ + #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/ SPARK_STAGING_LOCATION: Optional[str] = None #: Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER. @@ -206,7 +206,7 @@ class ConfigOptions(metaclass=ConfigMeta): #: Ingestion Job DeadLetter Destination. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: - #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/ + #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/ DEADLETTER_PATH: str = "" #: ProtoRegistry Address (currently only Stencil Server is supported as registry) diff --git a/sdk/python/feast/loaders/file.py b/sdk/python/feast/loaders/file.py index b0692457ee..f2a0f702ca 100644 --- a/sdk/python/feast/loaders/file.py +++ b/sdk/python/feast/loaders/file.py @@ -77,6 +77,7 @@ def export_source_to_staging_location( ) else: # gs, s3 file provided as a source. + assert source_uri.hostname is not None return get_staging_client(source_uri.scheme).list_files( bucket=source_uri.hostname, path=source_uri.path ) @@ -87,9 +88,12 @@ def export_source_to_staging_location( ) # Push data to required staging location - get_staging_client(uri.scheme).upload_file( - source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name, - ) + with open(source_path, "rb") as f: + get_staging_client(uri.scheme).upload_fileobj( + f, + source_path, + remote_uri=uri._replace(path=str(uri.path).strip("/") + "/" + file_name), + ) # Clean up, remove local staging file if dir_path and isinstance(source, pd.DataFrame) and len(dir_path) > 4: diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 67c8f5f726..820d53317a 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -187,16 +187,28 @@ def _upload_to_file_source( for path in glob.glob(os.path.join(dest_path, "**/*")): file_name = path.split("/")[-1] partition_col = path.split("/")[-2] - staging_client.upload_file( - path, - uri.hostname, - str(uri.path).strip("/") + "/" + partition_col + "/" + file_name, - ) + with open(path, "rb") as f: + staging_client.upload_fileobj( + f, + path, + remote_uri=uri._replace( + path=str(uri.path).rstrip("/") + + "/" + + partition_col + + "/" + + file_name + ), + ) else: file_name = dest_path.split("/")[-1] - staging_client.upload_file( - dest_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name, - ) + with open(dest_path, "rb") as f: + staging_client.upload_fileobj( + f, + dest_path, + remote_uri=uri._replace( + path=str(uri.path).rstrip("/") + "/" + file_name + ), + ) def _upload_to_bq_source( diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py index 42b5348c29..100d25a240 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -2,6 +2,7 @@ import tempfile from io import BytesIO from typing import Any, Dict, List, Optional +from urllib.parse import urlunparse import boto3 import pandas @@ -21,6 +22,7 @@ StreamIngestionJob, StreamIngestionJobParameters, ) +from feast.staging.storage_client import get_staging_client from .emr_utils import ( FAILED_STEP_STATES, @@ -39,7 +41,6 @@ _list_jobs, _load_new_cluster_template, _random_string, - _s3_upload, _stream_ingestion_step, _sync_offline_to_online_step, _upload_jar, @@ -221,11 +222,13 @@ def historical_feature_retrieval( with open(job_params.get_main_file_path()) as f: pyspark_script = f.read() - pyspark_script_path = _s3_upload( - BytesIO(pyspark_script.encode("utf8")), - local_path="historical_retrieval.py", - remote_path_prefix=self._staging_location, - remote_path_suffix=".py", + pyspark_script_path = urlunparse( + get_staging_client("s3").upload_fileobj( + BytesIO(pyspark_script.encode("utf8")), + local_path="historical_retrieval.py", + remote_path_prefix=self._staging_location, + remote_path_suffix=".py", + ) ) step = _historical_retrieval_step( @@ -304,12 +307,18 @@ def start_stream_to_online_ingestion( def stage_dataframe(self, df: pandas.DataFrame, event_timestamp: str) -> FileSource: with tempfile.NamedTemporaryFile() as f: df.to_parquet(f) - file_url = _s3_upload( - f, - f.name, - remote_path_prefix=os.path.join(self._staging_location, "dataframes"), - remote_path_suffix=".parquet", + + file_url = urlunparse( + get_staging_client("s3").upload_fileobj( + f, + f.name, + remote_path_prefix=os.path.join( + self._staging_location, "dataframes" + ), + remote_path_suffix=".parquet", + ) ) + return FileSource( event_timestamp_column=event_timestamp, file_format=ParquetFormat(), diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py index 712871ffbf..db39dd1f30 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py @@ -1,13 +1,11 @@ -import hashlib import logging import os import random import string import time -from typing import IO, Any, Dict, List, NamedTuple, Optional, Tuple +from typing import Any, Dict, List, NamedTuple, Optional +from urllib.parse import urlparse, urlunparse -import boto3 -import botocore import yaml __all__ = [ @@ -27,12 +25,12 @@ "_list_jobs", "_load_new_cluster_template", "_random_string", - "_s3_upload", "_stream_ingestion_step", "_sync_offline_to_online_step", "_upload_jar", "_wait_for_job_state", ] +from feast.staging.storage_client import get_staging_client log = logging.getLogger("aws") @@ -77,82 +75,13 @@ def _random_string(length) -> str: return "".join(random.choice(string.ascii_lowercase) for _ in range(length)) -def _s3_split_path(path: str) -> Tuple[str, str]: - """ Convert s3:// url to (bucket, key) """ - assert path.startswith("s3://") - _, _, bucket, key = path.split("/", 3) - return bucket, key - - -def _hash_fileobj(fileobj: IO[bytes]) -> str: - """ Compute sha256 hash of a file. File pointer will be reset to 0 on return. """ - fileobj.seek(0) - h = hashlib.sha256() - for block in iter(lambda: fileobj.read(2 ** 20), b""): - h.update(block) - fileobj.seek(0) - return h.hexdigest() - - -def _s3_upload( - fileobj: IO[bytes], - local_path: str, - *, - remote_path: Optional[str] = None, - remote_path_prefix: Optional[str] = None, - remote_path_suffix: Optional[str] = None, -) -> str: - """ - Upload a local file to S3. We store the file sha256 sum in S3 metadata and skip the upload - if the file hasn't changed. - - You can either specify remote_path or remote_path_prefix+remote_path_suffix. In the latter case, - the remote path will be computed as $remote_path_prefix/$sha256$remote_path_suffix - """ - - assert (remote_path is not None) or ( - remote_path_prefix is not None and remote_path_suffix is not None - ) - - sha256sum = _hash_fileobj(fileobj) - - if remote_path is None: - assert remote_path_prefix is not None - remote_path = os.path.join( - remote_path_prefix, f"{sha256sum}{remote_path_suffix}" - ) - - bucket, key = _s3_split_path(remote_path) - client = boto3.client("s3") - - try: - head_response = client.head_object(Bucket=bucket, Key=key) - if head_response["Metadata"]["sha256sum"] == sha256sum: - # File already exists - return remote_path - else: - log.info("Uploading {local_path} to {remote_path}") - client.upload_fileobj( - fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, - ) - return remote_path - except botocore.exceptions.ClientError as e: - if e.response["Error"]["Code"] == "404": - log.info("Uploading {local_path} to {remote_path}") - client.upload_fileobj( - fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, - ) - return remote_path - else: - raise - - def _upload_jar(jar_s3_prefix: str, local_path: str) -> str: with open(local_path, "rb") as f: - return _s3_upload( - f, - local_path, - remote_path=os.path.join(jar_s3_prefix, os.path.basename(local_path)), + uri = urlparse(os.path.join(jar_s3_prefix, os.path.basename(local_path))) + return urlunparse( + get_staging_client(uri.scheme).upload_fileobj( + f, local_path, remote_uri=uri, + ) ) diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index cfbfb82810..47cc116f96 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -253,10 +253,16 @@ def _stage_file(self, file_path: str, job_id: str) -> str: return file_path staging_client = get_staging_client("gs") - blob_path = os.path.join(self.remote_path, job_id, os.path.basename(file_path),) - staging_client.upload_file(file_path, self.staging_bucket, blob_path) + blob_path = os.path.join( + self.remote_path, job_id, os.path.basename(file_path), + ).lstrip("/") + blob_uri_str = f"gs://{self.staging_bucket}/{blob_path}" + with open(file_path, "rb") as f: + staging_client.upload_fileobj( + f, file_path, remote_uri=urlparse(blob_uri_str) + ) - return f"gs://{self.staging_bucket}/{blob_path}" + return blob_uri_str def dataproc_submit( self, job_params: SparkJobParameters diff --git a/sdk/python/feast/staging/entities.py b/sdk/python/feast/staging/entities.py index 242e144c35..461dc546dc 100644 --- a/sdk/python/feast/staging/entities.py +++ b/sdk/python/feast/staging/entities.py @@ -35,12 +35,11 @@ def stage_entities_to_fs( ) entity_source.to_parquet(df_export_path.name) - bucket = ( - None if entity_staging_uri.scheme == "file" else entity_staging_uri.netloc - ) - staging_client.upload_file( - df_export_path.name, bucket, entity_staging_uri.path.lstrip("/") - ) + + with open(df_export_path.name, "rb") as f: + staging_client.upload_fileobj( + f, df_export_path.name, remote_uri=entity_staging_uri + ) # ToDo: support custom event_timestamp_column return FileSource( diff --git a/sdk/python/feast/staging/storage_client.py b/sdk/python/feast/staging/storage_client.py index 9d6c9f52a4..d7c4055f40 100644 --- a/sdk/python/feast/staging/storage_client.py +++ b/sdk/python/feast/staging/storage_client.py @@ -18,9 +18,9 @@ import shutil from abc import ABC, ABCMeta, abstractmethod from tempfile import TemporaryFile -from typing import List +from typing import List, Optional, Tuple from typing.io import IO -from urllib.parse import ParseResult +from urllib.parse import ParseResult, urlparse from google.auth.exceptions import DefaultCredentialsError @@ -32,6 +32,36 @@ LOCAL_FILE = "file" +def _hash_fileobj(fileobj: IO[bytes]) -> str: + """ Compute sha256 hash of a file. File pointer will be reset to 0 on return. """ + fileobj.seek(0) + h = hashlib.sha256() + for block in iter(lambda: fileobj.read(2 ** 20), b""): + h.update(block) + fileobj.seek(0) + return h.hexdigest() + + +def _gen_remote_uri( + fileobj: IO[bytes], + remote_uri: Optional[ParseResult], + remote_path_prefix: Optional[str], + remote_path_suffix: Optional[str], + sha256sum: Optional[str], +) -> ParseResult: + if remote_uri is None: + assert remote_path_prefix is not None and remote_path_suffix is not None + + if sha256sum is None: + sha256sum = _hash_fileobj(fileobj) + + return urlparse( + os.path.join(remote_path_prefix, f"{sha256sum}{remote_path_suffix}") + ) + else: + return remote_uri + + class AbstractStagingClient(ABC): """ Client used to stage files in order to upload or download datasets into a historical store. @@ -58,9 +88,36 @@ def list_files(self, bucket: str, path: str) -> List[str]: pass @abstractmethod - def upload_file(self, local_path: str, bucket: str, remote_path: str): + def upload_fileobj( + self, + fileobj: IO[bytes], + local_path: str, + *, + remote_uri: Optional[ParseResult] = None, + remote_path_prefix: Optional[str] = None, + remote_path_suffix: Optional[str] = None, + ) -> ParseResult: """ - Uploads a file to an object store. + Uploads a file to an object store. You can either specify the destination object URI, + or destination suffix+prefix. In the latter case, this interface will work as a + content-addressable storage and the remote path will be computed using sha256 of the + uploaded content as `$remote_path_prefix/$sha256$remote_path_suffix` + + Args: + fileobj (IO[bytes]): file-like object containing the data to be uploaded. It needs to + supports seek() operation in addition to read/write. + local_path (str): a file name associated with fileobj. This param is only used for + diagnostic messages. If `fileobj` is a local file, pass its filename here. + remote_uri (ParseResult or None): destination object URI to upload to + remote_path_prefix (str or None): destination path prefix to upload to when using + content-addressable storage mode + remote_path_suffix (str or None): destination path suffix to upload to when using + content-addressable storage mode + + Returns: + ParseResult: the URI to the uploaded file. It would be the same as `remote_uri` if + `remote_uri` was passed in. Otherwise it will be the path computed from + `remote_path_prefix` and `remote_path_suffix`. """ pass @@ -128,18 +185,27 @@ def list_files(self, bucket: str, path: str) -> List[str]: else: return [f"{GS}://{bucket}/{path.lstrip('/')}"] - def upload_file(self, local_path: str, bucket: str, remote_path: str): - """ - Uploads file to google cloud storage. - - Args: - local_path (str): Path to the local file that needs to be uploaded/staged - bucket (str): gs Bucket name - remote_path (str): relative path to the folder to which the files need to be uploaded - """ + def _uri_to_bucket_key(self, remote_path: ParseResult) -> Tuple[str, str]: + assert remote_path.hostname is not None + return remote_path.hostname, remote_path.path.lstrip("/") + + def upload_fileobj( + self, + fileobj: IO[bytes], + local_path: str, + *, + remote_uri: Optional[ParseResult] = None, + remote_path_prefix: Optional[str] = None, + remote_path_suffix: Optional[str] = None, + ) -> ParseResult: + remote_uri = _gen_remote_uri( + fileobj, remote_uri, remote_path_prefix, remote_path_suffix, None + ) + bucket, key = self._uri_to_bucket_key(remote_uri) gs_bucket = self.gcs_client.get_bucket(bucket) - blob = gs_bucket.blob(remote_path.lstrip("/")) - blob.upload_from_filename(local_path) + blob = gs_bucket.blob(key) + blob.upload_from_file(fileobj) + return remote_uri class S3Client(AbstractStagingClient): @@ -199,52 +265,50 @@ def list_files(self, bucket: str, path: str) -> List[str]: else: return [f"{S3}://{bucket}/{path.lstrip('/')}"] - def _hash_file(self, local_path: str): - h = hashlib.sha256() - with open(local_path, "rb") as f: - for block in iter(lambda: f.read(2 ** 20), b""): - h.update(block) - return h.hexdigest() - - def upload_file(self, local_path: str, bucket: str, remote_path: str): - """ - Uploads file to s3. - - Args: - local_path (str): Path to the local file that needs to be uploaded/staged - bucket (str): s3 Bucket name - remote_path (str): relative path to the folder to which the files need to be uploaded - """ - - sha256sum = self._hash_file(local_path) + def _uri_to_bucket_key(self, remote_path: ParseResult) -> Tuple[str, str]: + assert remote_path.hostname is not None + return remote_path.hostname, remote_path.path.lstrip("/") + + def upload_fileobj( + self, + fileobj: IO[bytes], + local_path: str, + *, + remote_uri: Optional[ParseResult] = None, + remote_path_prefix: Optional[str] = None, + remote_path_suffix: Optional[str] = None, + ) -> ParseResult: + sha256sum = _hash_fileobj(fileobj) + remote_uri = _gen_remote_uri( + fileobj, remote_uri, remote_path_prefix, remote_path_suffix, sha256sum + ) import botocore + bucket, key = self._uri_to_bucket_key(remote_uri) + try: - head_response = self.s3_client.head_object(Bucket=bucket, Key=remote_path) + head_response = self.s3_client.head_object(Bucket=bucket, Key=key) if head_response["Metadata"]["sha256sum"] == sha256sum: # File already exists - return remote_path + return remote_uri else: - print(f"Uploading {local_path} to {remote_path}") - self.s3_client.upload_file( - local_path, + print(f"Uploading {local_path} to {remote_uri}") + self.s3_client.upload_fileobj( + fileobj, bucket, - remote_path, + key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, ) - return remote_path + return remote_uri except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] != "404": raise - self.s3_client.upload_file( - local_path, - bucket, - remote_path, - ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, - ) - return remote_path + self.s3_client.upload_fileobj( + fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, + ) + return remote_uri class LocalFSClient(AbstractStagingClient): @@ -272,10 +336,27 @@ def download_file(self, uri: ParseResult) -> IO[bytes]: def list_files(self, bucket: str, path: str) -> List[str]: raise NotImplementedError("list files not implemented for Local file") - def upload_file(self, local_path: str, bucket: str, remote_path: str): - dest_fpath = remote_path if remote_path.startswith("/") else "/" + remote_path - os.makedirs(os.path.dirname(dest_fpath), exist_ok=True) - shutil.copy(local_path, dest_fpath) + def _uri_to_path(self, uri: ParseResult) -> str: + return uri.path + + def upload_fileobj( + self, + fileobj: IO[bytes], + local_path: str, + *, + remote_uri: Optional[ParseResult] = None, + remote_path_prefix: Optional[str] = None, + remote_path_suffix: Optional[str] = None, + ) -> ParseResult: + + remote_uri = _gen_remote_uri( + fileobj, remote_uri, remote_path_prefix, remote_path_suffix, None + ) + remote_file_path = self._uri_to_path(remote_uri) + os.makedirs(os.path.dirname(remote_file_path), exist_ok=True) + with open(remote_file_path, "wb") as fdest: + shutil.copyfileobj(fileobj, fdest) + return remote_uri def _s3_client(config: Config = None): @@ -297,7 +378,7 @@ def _local_fs_client(config: Config = None): storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client} -def get_staging_client(scheme, config: Config = None): +def get_staging_client(scheme, config: Config = None) -> AbstractStagingClient: """ Initialization of a specific client object(GCSClient, S3Client etc.) diff --git a/sdk/python/tests/loaders/test_file.py b/sdk/python/tests/loaders/test_file.py index 9d02447ab3..dae1006aae 100644 --- a/sdk/python/tests/loaders/test_file.py +++ b/sdk/python/tests/loaders/test_file.py @@ -32,7 +32,7 @@ FOLDER_NAME = "test_folder" FILE_NAME = "test.avro" -LOCAL_FILE = "file://tmp/tmp" +LOCAL_FILE = "file:///tmp/tmp" S3_LOCATION = f"s3://{BUCKET}/{FOLDER_NAME}" TEST_DATA_FRAME = pd.DataFrame(