Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Delta Lake #95

Merged
merged 12 commits into from
Apr 5, 2022
4 changes: 2 additions & 2 deletions docs/how-to-guides/feathr_job_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ Since Feathr uses Spark as the underlying execution engine, there's a way to ove

| Property Name | Default | Meaning | Since Version |
| ------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- |
| spark.feathr.outputFormat | None | Specify the output format. "avro" is the default behavior if this value is not set. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options) | 0.2.1 |
| spark.feathr.inputFormat | None | Specify the input format if the file cannot be tell automatically. By default, Feathr will read files by parsing the file extension name; However the file/folder name doesn't have extension name, this configuration can be set to tell Feathr which format it should use to read the data. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options) | 0.2.1 |
| spark.feathr.inputFormat | None | Specify the input format if the file cannot be tell automatically. By default, Feathr will read files by parsing the file extension name; However the file/folder name doesn't have extension name, this configuration can be set to tell Feathr which format it should use to read the data. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to read delta lake. | 0.2.1 |
| spark.feathr.outputFormat | None | Specify the output format. "avro" is the default behavior if this value is not set. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to write delta lake. | 0.2.1 |
Binary file modified docs/images/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
41 changes: 29 additions & 12 deletions feathr_project/feathr/_databricks_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import traceback
import urllib
from collections import namedtuple
from os.path import basename
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
Expand Down Expand Up @@ -220,29 +221,45 @@ def download_result(self, result_path: str, local_folder: str):
"""
Supports downloading files from the result folder. Only support paths starts with `dbfs:/` and only support downloading files in one folder (per Spark's design, everything will be in the result folder in a flat manner)
"""
if not result_path.startswith('dfbs'):
RuntimeError('Currently only paths starting with dbfs is supported for downloading results from a databricks cluster. The path should start with \"dbfs:\" .')
if not result_path.startswith('dbfs'):
raise RuntimeError('Currently only paths starting with dbfs is supported for downloading results from a databricks cluster. The path should start with \"dbfs:\" .')
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved

try:
# listing all the files in a folder: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/dbfs#--list
result = requests.get(url=self.workspace_instance_url+'/api/2.0/dbfs/list',
headers=self.auth_headers, params={ 'path': result_path})
# see here for response structure: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/dbfs#--response-structure-2
dbfs_files = result.json()['files']
for file_path in tqdm(dbfs_files, desc="Downloading result files: "):
# each file_path would be a dict of this type: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/dbfs#dbfsfileinfo
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
dbfs_file_path, dbfs_file_size, local_file_path = file_path['path'], file_path['file_size'], os.path.join(local_folder, os.path.basename(file_path['path']))
with open(local_file_path, 'wb') as file_obj:
downloaded_size = 0
# Loop until we've downloaded the whole file
while downloaded_size < dbfs_file_size:
chunk = self._read_single_chunk(path=dbfs_file_path, offset=downloaded_size, length=MB_BYTES)
file_obj.write(base64.b64decode(chunk.data))
downloaded_size += chunk.bytes_read
# Logging once all the download is finished.
if not file_path['is_dir']:
# if it's not a directory
self._download_single_file(file_path, local_folder)
else:
# if the result is dir
# us this path as the new folder path
folder_path = file_path['path']
folder_name = basename(folder_path)
result = requests.get(url=self.workspace_instance_url+'/api/2.0/dbfs/list',
headers=self.auth_headers, params={ 'path': folder_path})
dbfs_files = result.json()['files']
for file_path in dbfs_files:
os.makedirs(os.path.join(local_folder,folder_name), exist_ok=True)
self._download_single_file(file_path, os.path.join(local_folder,folder_name))
# Logging once all the download is finished.
logger.info('Finish downloading files from {} to {}.', result_path,local_folder)
except requests.exceptions.RequestException as e: # This is the correct syntax
raise SystemExit(e)

def _download_single_file(self,file_path,local_folder):
dbfs_file_path, dbfs_file_size, local_file_path = file_path['path'], file_path['file_size'], os.path.join(local_folder, os.path.basename(file_path['path']))
with open(local_file_path, 'wb') as file_obj:
downloaded_size = 0
# Loop until we've downloaded the whole file
while downloaded_size < dbfs_file_size:
chunk = self._read_single_chunk(path=dbfs_file_path, offset=downloaded_size, length=MB_BYTES)
file_obj.write(base64.b64decode(chunk.data))
downloaded_size += chunk.bytes_read


def _read_single_chunk(self, path, offset, length=MB_BYTES):
Expand All @@ -258,4 +275,4 @@ def _read_single_chunk(self, path, offset, length=MB_BYTES):
if resp.status_code == 200:
return FileReadInfo(**resp.json())
else:
RuntimeError("Files cannot be downloaded.")
raise RuntimeError("Files cannot be downloaded.")
46 changes: 36 additions & 10 deletions feathr_project/feathr/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from os.path import basename

from azure.identity import (ChainedTokenCredential, DefaultAzureCredential,
DeviceCodeCredential, EnvironmentCredential,
Expand Down Expand Up @@ -298,7 +299,7 @@ def upload_file_to_workdir(self, src_file_path: str) -> str:

src_parse_result = urlparse(src_file_path)
if src_parse_result.scheme.startswith('http'):
file_name = os.path.basename(src_file_path)
file_name = basename(src_file_path)
file_client = self.dir_client.create_file(file_name)
# returned paths for the uploaded file
returned_path = self.datalake_dir + file_name
Expand All @@ -324,7 +325,7 @@ def upload_file_to_workdir(self, src_file_path: str) -> str:
return returned_path

def upload_file(self, src_file_path)-> str:
file_name = os.path.basename(src_file_path)
file_name = basename(src_file_path)
logger.info("Uploading file {}", file_name)
file_client = self.dir_client.create_file(file_name)
returned_path = self.datalake_dir + file_name
Expand All @@ -336,7 +337,8 @@ def upload_file(self, src_file_path)-> str:

def download_file(self, target_adls_directory: str, local_dir_cache: str):
"""
Download file to a local cache
Download file to a local cache. Supporting download a folder and the content in its subfolder.
Note that the code will just download the content in the root folder, and the folder in the next level (rather than recursively for all layers of folders)

Args:
target_adls_directory (str): target ADLS directory
Expand All @@ -347,14 +349,40 @@ def download_file(self, target_adls_directory: str, local_dir_cache: str):
parse_result = urlparse(target_adls_directory)
directory_client = self.file_system_client.get_directory_client(
parse_result.path)

# returns the paths to all the files in the target director in ADLS
adls_paths = [file_path.name.split("/")[-1] for file_path in self.file_system_client.get_paths(
path=parse_result.path) if not file_path.is_directory][1:]
# need to generate list of local paths to write the files to
# get all the paths that are not under a directory
result_paths = [basename(file_path.name) for file_path in self.file_system_client.get_paths(
path=parse_result.path, recursive=False) if not file_path.is_directory]

# get all the paths that are directories and download them
result_folders = [file_path.name for file_path in self.file_system_client.get_paths(
path=parse_result.path) if file_path.is_directory]

# list all the files under the certain folder, and download them preserving the hierarchy
for folder in result_folders:
folder_name = basename(folder)
file_in_folder = [os.path.join(folder_name, basename(file_path.name)) for file_path in self.file_system_client.get_paths(
path=folder, recursive=False) if not file_path.is_directory]
local_paths = [os.path.join(local_dir_cache, file_name)
for file_name in file_in_folder]
self._download_file_list(local_paths, file_in_folder, directory_client)

# download files that are in the result folder
local_paths = [os.path.join(local_dir_cache, file_name)
for file_name in adls_paths]
for idx, file_to_write in enumerate(tqdm(adls_paths,desc="Downloading result files: ")):
for file_name in result_paths]
self._download_file_list(local_paths, result_paths, directory_client)

logger.info('Finish downloading files from {} to {}.',
target_adls_directory,local_dir_cache)
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved

def _download_file_list(self, local_paths: List[str], result_paths, directory_client):
'''
Download filelist to local
'''
for idx, file_to_write in enumerate(tqdm(result_paths,desc="Downloading result files: ")):
try:
os.makedirs(os.path.dirname(local_paths[idx]), exist_ok=True)
local_file = open(local_paths[idx], 'wb')
file_client = directory_client.get_file_client(file_to_write)
download = file_client.download_file()
Expand All @@ -363,5 +391,3 @@ def download_file(self, target_adls_directory: str, local_dir_cache: str):
local_file.close()
except Exception as e:
logger.error(e)
logger.info('Finish downloading files from {} to {}.',
target_adls_directory,local_dir_cache)
24 changes: 18 additions & 6 deletions feathr_project/feathr/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import os
import glob
from feathr.constants import OUTPUT_FORMAT
import pandavro as pdx
from loguru import logger
import pandas as pd
import tempfile
from pyarrow.parquet import ParquetDataset


def get_result_df(client: FeathrClient) -> pd.DataFrame:
"""Download the job result dataset from cloud as a Pandas dataframe."""
Expand All @@ -17,13 +17,25 @@ def get_result_df(client: FeathrClient) -> pd.DataFrame:
# by default the result are in avro format
if format:
# helper function for only parquet and avro
if format.lower()=="parquet":
if format.casefold()=="parquet":
files = glob.glob(os.path.join(tmp_dir.name, '*.parquet'))
from pyarrow.parquet import ParquetDataset
ds = ParquetDataset(files)
vertical_concat_df = ds.read().to_pandas()
result_df = ds.read().to_pandas()
if format.casefold()=="delta":
from deltalake import DeltaTable
delta = DeltaTable(tmp_dir.name)
if not client.spark_runtime == 'azure_synapse':
# don't detect for synapse result with Delta as there's a problem with underlying system
# Issues are trached here: https://github.com/delta-io/delta-rs/issues/582
result_df = delta.to_pyarrow_table().to_pandas()
else:
logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Emtpy DataFrame is returned.")
result_df = pd.DataFrame()
else:
import pandavro as pdx
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):
dataframe_list.append(pdx.read_avro(file))
vertical_concat_df = pd.concat(dataframe_list, axis=0)
result_df = pd.concat(dataframe_list, axis=0)
tmp_dir.cleanup()
return vertical_concat_df
return result_df
1 change: 1 addition & 0 deletions feathr_project/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"tqdm",
"pyarrow",
"python-snappy",
"deltalake",
"google>=3.0.0",
"google-api-python-client>=2.41.0",
],
Expand Down
10 changes: 10 additions & 0 deletions feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ def _validate_constant_feature(feature):
assert feature[7] == ([1, 2, 3], [1, 2, 3])


def test_dbfs_path():
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
if client.spark_runtime.casefold() == "databricks":
# expect this raise an error since the result path is not in dbfs: format
with pytest.raises(RuntimeError):
client.feathr_spark_laucher.download_result(result_path="wasb://res_url", local_folder="/tmp")


def test_feathr_get_offline_features():
"""
Test get_offline_features() can get data correctly.
Expand Down
Loading