Skip to content

Commit

Permalink
Expose 'timePartitionPattern' in Python API [ WIP ] (#714)
Browse files Browse the repository at this point in the history
* Expose 'timePartitionPattern'

* add test case

* Add test cases and docstring

* delete local files

* quick fix

Co-authored-by: enya-yx <enya@LAPTOP-NBH6175C.redmond.corp.microsoft.com>
Co-authored-by: enya-yx <enya@v-ellinlu-2.fareast.corp.microsoft.com>
  • Loading branch information
3 people authored Oct 17, 2022
1 parent 3d12944 commit 3070a86
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 3 deletions.
21 changes: 20 additions & 1 deletion feathr_project/feathr/definition/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,30 @@ class HdfsSource(Source):
- `epoch` (seconds since epoch), for example `1647737463`
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
registry_tags: A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {"deprecated": "true"} to indicate this source is deprecated, etc.
time_partition_pattern(Optional[str]): Format of the time partitioned feature data. e.g. yyyy/MM/DD. All formats supported in dateTimeFormatter.
config:
timeSnapshotHdfsSource:
{
location:
{
path: "/data/somePath/daily"
}
timePartitionPattern: "yyyy/MM/dd"
}
Given the above HDFS path: /data/somePath/daily,
then the expectation is that the following sub directorie(s) should exist:
/data/somePath/daily/{yyyy}/{MM}/{dd}
"""

def __init__(self, name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
def __init__(self, name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None, time_partition_pattern: Optional[str] = None) -> None:
super().__init__(name, event_timestamp_column,
timestamp_format, registry_tags=registry_tags)
self.path = path
self.preprocessing = preprocessing
self.time_partition_pattern = time_partition_pattern
if path.startswith("http"):
logger.warning(
"Your input path {} starts with http, which is not supported. Consider using paths starting with wasb[s]/abfs[s]/s3.", path)
Expand All @@ -116,6 +132,9 @@ def to_feature_config(self) -> str:
tm = Template("""
{{source.name}}: {
location: {path: "{{source.path}}"}
{% if source.time_partition_pattern %}
timePartitionPattern: "{{source.time_partition_pattern}}"
{% endif %}
{% if source.event_timestamp_column %}
timeWindowParameters: {
timestampColumn: "{{source.event_timestamp_column}}"
Expand Down
2 changes: 2 additions & 0 deletions feathr_project/feathr/spark_provider/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ def download_file(self, target_adls_directory: str, local_dir_cache: str):

# returns the paths to all the files in the target director in ADLS
# get all the paths that are not under a directory
test_paths = self.file_system_client.get_paths(
path=parse_result.path, recursive=False)
result_paths = [basename(file_path.name) for file_path in self.file_system_client.get_paths(
path=parse_result.path, recursive=False) if not file_path.is_directory]

Expand Down
36 changes: 35 additions & 1 deletion feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from feathr import ValueType
from feathr.utils.job_utils import get_result_df
from feathrcli.cli import init
from test_fixture import (basic_test_setup, get_online_test_table_name)
from test_fixture import (basic_test_setup, get_online_test_table_name, time_partition_pattern_test_setup)
from test_utils.constants import Constants

# make sure you have run the upload feature script before running these tests
Expand Down Expand Up @@ -58,6 +58,40 @@ def test_feathr_materialize_to_offline():
res_df = get_result_df(client, "avro", output_path + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0

def test_feathr_materialize_with_time_partition_pattern():
"""
Test FeathrClient() using HdfsSource with 'timePartitionPattern'.
"""
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
# os.chdir(test_workspace_dir)

client: FeathrClient = time_partition_pattern_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

now = datetime.now()
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob_materialize_offline_','_', str(now.minute), '_', str(now.second), ""])
else:
output_path = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/feathrazure_cijob_materialize_offline_','_', str(now.minute), '_', str(now.second), ""])
offline_sink = HdfsSink(output_path=output_path)
settings = MaterializationSettings("nycTaxiTable",
sinks=[offline_sink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.materialize_features(settings)
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

# download result and just assert the returned result is not empty
# by default, it will write to a folder appended with date
res_df = get_result_df(client, "avro", output_path + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0


def test_feathr_online_store_agg_features():
"""
Test FeathrClient() get_online_features and batch_get can get data correctly.
Expand Down
58 changes: 57 additions & 1 deletion feathr_project/test/test_feature_anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,60 @@ def test_agg_anchor_to_config():
}
}
"""
assert ''.join(agg_anchor.to_feature_config().split()) == ''.join(expected_agg_feature_config.split())
assert ''.join(agg_anchor.to_feature_config().split()) == ''.join(expected_agg_feature_config.split())

def test_time_partition_to_config():
batch_source = HdfsSource(name="testTimePartitionSource",
path="abfss://public@azurefeathrstorage.blob.core.windows.net/sample_data/time_partition_pattern",
time_partition_pattern="yyyy/MM/dd"
)
key = TypedKey(key_column="key0",
key_column_type=ValueType.INT32)
agg_features = [
Feature(name="f_loc_avg",
key=[key],
feature_type=FLOAT,
transform="f_location_avg_fare"),
Feature(name="f_loc_max",
feature_type=FLOAT,
key=[key],
transform="f_location_max_fare"),
]
agg_anchor = FeatureAnchor(name="testTimePartitionFeaturesSource",

This comment has been minimized.

Copy link
@hh23485

hh23485 Oct 26, 2022

I can't pass the test on my dev machine, actually the anchor name on line 189 is different with the line 195. Can you pass the test case?

source=batch_source,
features=agg_features)
expected_time_partition_config = """
anchors: {
testTimePartitionFeatures: {
source: testTimePartitionSource
key.sqlExpr: [key0]
features: {
f_loc_avg: {
def.sqlExpr: "f_location_avg_fare"
type: {
type: TENSOR
tensorCategory: DENSE
dimensionType: []
valType: FLOAT
}
}
f_loc_max: {
def.sqlExpr: "f_location_max_fare"
type: {
type: TENSOR
tensorCategory: DENSE
dimensionType: []
valType: FLOAT
}
}
}
}
}
sources: {
testTimePartitionSource: {
location: {path: "abfss://public@azurefeathrstorage.blob.core.windows.net/sample_data/time_partition_pattern"}
timePartitionPattern: "yyyy/MM/dd"
}
}
"""
assert ''.join(agg_anchor.to_feature_config().split()) == ''.join(expected_time_partition_config.split())
28 changes: 28 additions & 0 deletions feathr_project/test/test_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,34 @@ def basic_test_setup(config_path: str):

return client

def time_partition_pattern_test_setup(config_path: str):
now = datetime.now()
# set workspace folder by time; make sure we don't have write conflict if there are many CI tests running
os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)])
client = FeathrClient(config_path=config_path)
batch_source = HdfsSource(name="testTimePartitionSource",
path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/time_partition_pattern/daily",
time_partition_pattern="yyyy/MM/dd"
)
key = TypedKey(key_column="key0",
key_column_type=ValueType.INT32)
agg_features = [
Feature(name="f_loc_avg",
key=[key],
feature_type=FLOAT,
transform="f_location_avg_fare"),
Feature(name="f_loc_max",
feature_type=FLOAT,
key=[key],
transform="f_location_max_fare"),
]

agg_anchor = FeatureAnchor(name="testTimePartitionFeatures",
source=batch_source,
features=agg_features)
client.build_features(anchor_list=[agg_anchor])
return client

def snowflake_test_setup(config_path: str):
now = datetime.now()
Expand Down

0 comments on commit 3070a86

Please sign in to comment.