Skip to content

Commit

Permalink
Merge branch 'main' into separate-snowflake-source
Browse files Browse the repository at this point in the history
  • Loading branch information
aabbasi-hbo authored Nov 11, 2022
2 parents 78695d1 + 88a0348 commit ec4012c
Show file tree
Hide file tree
Showing 39 changed files with 754 additions and 236 deletions.
112 changes: 59 additions & 53 deletions docs/how-to-guides/feathr-configuration-and-env.md

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions docs/samples/customer360/Customer360.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@
" - 'REDIS_PASSWORD'\n",
" - 'ADLS_ACCOUNT'\n",
" - 'ADLS_KEY'\n",
" - 'WASB_ACCOUNT'\n",
" - 'WASB_KEY'\n",
" - 'BLOB_ACCOUNT'\n",
" - 'BLOB_KEY'\n",
" - 'DATABRICKS_WORKSPACE_TOKEN_VALUE '\n",
" \n",
"offline_store:\n",
Expand Down Expand Up @@ -328,8 +328,8 @@
"os.environ['REDIS_PASSWORD'] = ''\n",
"os.environ['ADLS_ACCOUNT'] = ''\n",
"os.environ['ADLS_KEY'] = ''\n",
"os.environ['WASB_ACCOUNT'] = \"\"\n",
"os.environ['WASB_KEY'] = ''\n",
"os.environ['BLOB_ACCOUNT'] = \"\"\n",
"os.environ['BLOB_KEY'] = ''\n",
"os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = ''"
]
},
Expand Down
12 changes: 10 additions & 2 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from feathr.utils._file_utils import write_to_file
from feathr.utils.feature_printer import FeaturePrinter
from feathr.utils.spark_job_params import FeatureGenerationJobParams, FeatureJoinJobParams
from feathr.definition.source import InputContext


class FeathrClient(object):
Expand Down Expand Up @@ -633,8 +634,15 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
allow_materialize_non_agg_feature: Materializing non-aggregated features (the features without WindowAggTransformation) doesn't output meaningful results so it's by default set to False, but if you really want to materialize non-aggregated features, set this to True.
"""
feature_list = settings.feature_names
if len(feature_list) > 0 and not self._valid_materialize_keys(feature_list):
raise RuntimeError(f"Invalid materialization features: {feature_list}, since they have different keys. Currently Feathr only supports materializing features of the same keys.")
if len(feature_list) > 0:
if 'anchor_list' in dir(self):
anchors = [anchor for anchor in self.anchor_list if isinstance(anchor.source, InputContext)]
anchor_feature_names = set(feature.name for anchor in anchors for feature in anchor.features)
for feature in feature_list:
if feature in anchor_feature_names:
raise RuntimeError(f"Materializing features that are defined on INPUT_CONTEXT is not supported. {feature} is defined on INPUT_CONTEXT so you should remove it from the feature list in MaterializationSettings.")
if not self._valid_materialize_keys(feature_list):
raise RuntimeError(f"Invalid materialization features: {feature_list}, since they have different keys. Currently Feathr only supports materializing features of the same keys.")

if not allow_materialize_non_agg_feature:
# Check if there are non-aggregation features in the list
Expand Down
5 changes: 0 additions & 5 deletions feathr_project/feathr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
TYPEDEF_ARRAY_DERIVED_FEATURE=f"array<feathr_derived_feature_{REGISTRY_TYPEDEF_VERSION}>"
TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array<feathr_anchor_feature_{REGISTRY_TYPEDEF_VERSION}>"

# Decouple Feathr MAVEN Version from Feathr Python SDK Version
import os
from feathr.version import __version__
FEATHR_MAVEN_VERSION = os.environ.get("FEATHR_MAVEN_VERSION", __version__)
FEATHR_MAVEN_ARTIFACT=f"com.linkedin.feathr:feathr_2.12:{FEATHR_MAVEN_VERSION}"

JOIN_CLASS_NAME="com.linkedin.feathr.offline.job.FeatureJoinJob"
GEN_CLASS_NAME="com.linkedin.feathr.offline.job.FeatureGenJob"
3 changes: 1 addition & 2 deletions feathr_project/feathr/registry/_feature_registry_purview.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,7 @@ def upload_single_entity_to_purview(self,entity:Union[AtlasEntity,AtlasProcess])
"""
Try to find existing entity/process first, if found, return the existing entity's GUID
"""
id = self.get_entity_id(entity.qualifiedName)
response = self.purview_client.get_entity(id)['entities'][0]
response = self.purview_client.get_entity(qualifiedName=entity.qualifiedName)['entities'][0]
j = entity.to_json()
if j["typeName"] == response["typeName"]:
if j["typeName"] == "Process":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from databricks_cli.runs.api import RunsApi
from databricks_cli.sdk.api_client import ApiClient
from feathr.constants import *
from feathr.version import get_maven_artifact_fullname
from feathr.spark_provider._abc import SparkJobLauncher
from loguru import logger
from requests.structures import CaseInsensitiveDict
Expand Down Expand Up @@ -166,8 +167,8 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name:

# the feathr main jar file is anyway needed regardless it's pyspark or scala spark
if not main_jar_path:
logger.info(f"Main JAR file is not set, using default package '{FEATHR_MAVEN_ARTIFACT}' from Maven")
submission_params['libraries'][0]['maven'] = { "coordinates": FEATHR_MAVEN_ARTIFACT }
logger.info(f"Main JAR file is not set, using default package '{get_maven_artifact_fullname()}' from Maven")
submission_params['libraries'][0]['maven'] = { "coordinates": get_maven_artifact_fullname() }
else:
submission_params['libraries'][0]['jar'] = self.upload_or_get_cloud_path(main_jar_path)
# see here for the submission parameter definition https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--request-structure-6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from loguru import logger
from pyspark import *

from feathr.constants import FEATHR_MAVEN_ARTIFACT
from feathr.version import get_maven_artifact_fullname
from feathr.spark_provider._abc import SparkJobLauncher


Expand Down Expand Up @@ -77,7 +77,7 @@ def submit_feathr_job(

# Get conf and package arguments
cfg = configuration.copy() if configuration else {}
maven_dependency = f"{cfg.pop('spark.jars.packages', self.packages)},{FEATHR_MAVEN_ARTIFACT}"
maven_dependency = f"{cfg.pop('spark.jars.packages', self.packages)},{get_maven_artifact_fullname()}"
spark_args = self._init_args(job_name=job_name, confs=cfg)

if not main_jar_path:
Expand All @@ -86,7 +86,7 @@ def submit_feathr_job(
# This is a JAR job
# Azure Synapse/Livy doesn't allow JAR job starts from Maven directly, we must have a jar file uploaded.
# so we have to use a dummy jar as the main file.
logger.info(f"Main JAR file is not set, using default package '{FEATHR_MAVEN_ARTIFACT}' from Maven")
logger.info(f"Main JAR file is not set, using default package '{get_maven_artifact_fullname()}' from Maven")
# Use the no-op jar as the main file
# This is a dummy jar which contains only one `org.example.Noop` class with one empty `main` function
# which does nothing
Expand Down
7 changes: 4 additions & 3 deletions feathr_project/feathr/spark_provider/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from feathr.spark_provider._abc import SparkJobLauncher
from feathr.constants import *
from feathr.version import get_maven_artifact_fullname

class LivyStates(Enum):
""" Adapt LivyStates over to relax the dependency for azure-synapse-spark pacakge.
Expand Down Expand Up @@ -114,12 +115,12 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas
if not main_jar_path:
# We don't have the main jar, use Maven
# Add Maven dependency to the job configuration
logger.info(f"Main JAR file is not set, using default package '{FEATHR_MAVEN_ARTIFACT}' from Maven")
logger.info(f"Main JAR file is not set, using default package '{get_maven_artifact_fullname()}' from Maven")
if "spark.jars.packages" in cfg:
cfg["spark.jars.packages"] = ",".join(
[cfg["spark.jars.packages"], FEATHR_MAVEN_ARTIFACT])
[cfg["spark.jars.packages"], get_maven_artifact_fullname()])
else:
cfg["spark.jars.packages"] = FEATHR_MAVEN_ARTIFACT
cfg["spark.jars.packages"] = get_maven_artifact_fullname()

if not python_files:
# This is a JAR job
Expand Down
11 changes: 10 additions & 1 deletion feathr_project/feathr/version.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
__version__ = "0.9.0-rc2"
__version__ = "0.9.0-rc2"

def get_version():
return __version__

# Decouple Feathr MAVEN Version from Feathr Python SDK Version
import os
def get_maven_artifact_fullname():
maven_artifact_version = os.environ.get("MAVEN_ARTIFACT_VERSION", __version__)
return f"com.linkedin.feathr:feathr_2.12:{maven_artifact_version}"
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ project_config:
# the environemnt variables are optional, however you will need them if you want to use some of the services:
- ADLS_ACCOUNT
- ADLS_KEY
- WASB_ACCOUNT
- WASB_KEY
- BLOB_ACCOUNT
- BLOB_KEY
- S3_ACCESS_KEY
- S3_SECRET_KEY
- JDBC_TABLE
Expand All @@ -41,7 +41,7 @@ offline_store:
adls_enabled: true

# paths starts with wasb:// or wasbs://
# WASB_ACCOUNT and WASB_KEY should be set in environment variable
# BLOB_ACCOUNT and BLOB_KEY should be set in environment variable
wasb:
wasb_enabled: true

Expand Down Expand Up @@ -118,8 +118,8 @@ feature_registry:
delimiter: "__"
# controls whether the type system will be initialized or not. Usually this is only required to be executed once.
type_system_initialization: false


secrets:
azure_key_vault:
name: feathrazuretest3-kv
21 changes: 20 additions & 1 deletion feathr_project/test/test_feature_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from test_fixture import basic_test_setup
from test_fixture import get_online_test_table_name
from test_utils.constants import Constants
from logging import raiseExceptions
import pytest

def test_feature_materialization_config():
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5,20), step=timedelta(days=1))
Expand Down Expand Up @@ -255,4 +257,21 @@ def test_delete_feature_from_redis():
res = client.get_online_features(online_test_table, '265', ['f_location_avg_fare'])

assert len(res) == 1
assert res[0] == None
assert res[0] == None

def test_feature_list_on_input_context():
with pytest.raises(RuntimeError) as e_info:
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
online_test_table = get_online_test_table_name('nycTaxiCITableDeletion')
redisSink = RedisSink(table_name=online_test_table)
settings = MaterializationSettings(name="py_udf",
sinks=[redisSink],
feature_names=[
"f_location_avg_fare",
"f_day_of_week"
])
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
assert e_info is not None
assert e_info.value.args[0] == "Materializing features that are defined on INPUT_CONTEXT is not supported. f_day_of_week is defined on INPUT_CONTEXT so you should remove it from the feature list in MaterializationSettings."
6 changes: 3 additions & 3 deletions feathr_project/test/test_user_workspace/feathr_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ project_config:
# the environemnt variables are optional, however you will need them if you want to use some of the services:
- ADLS_ACCOUNT
- ADLS_KEY
- WASB_ACCOUNT
- WASB_KEY
- BLOB_ACCOUNT
- BLOB_KEY
- S3_ACCESS_KEY
- S3_SECRET_KEY
- JDBC_TABLE
Expand All @@ -41,7 +41,7 @@ offline_store:
adls_enabled: true

# paths starts with wasb:// or wasbs://
# WASB_ACCOUNT and WASB_KEY should be set in environment variable
# BLOB_ACCOUNT and BLOB_KEY should be set in environment variable
wasb:
wasb_enabled: true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ project_config:
# the environemnt variables are optional, however you will need them if you want to use some of the services:
- ADLS_ACCOUNT
- ADLS_KEY
- WASB_ACCOUNT
- WASB_KEY
- BLOB_ACCOUNT
- BLOB_KEY
- S3_ACCESS_KEY
- S3_SECRET_KEY
- JDBC_TABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ project_config:
# the environemnt variables are optional, however you will need them if you want to use some of the services:
- ADLS_ACCOUNT
- ADLS_KEY
- WASB_ACCOUNT
- WASB_KEY
- BLOB_ACCOUNT
- BLOB_KEY
- S3_ACCESS_KEY
- S3_SECRET_KEY
- JDBC_TABLE
Expand All @@ -41,7 +41,7 @@ offline_store:
adls_enabled: true

# paths starts with wasb:// or wasbs://
# WASB_ACCOUNT and WASB_KEY should be set in environment variable
# BLOB_ACCOUNT and BLOB_KEY should be set in environment variable
wasb:
wasb_enabled: true

Expand Down
Empty file.
Loading

0 comments on commit ec4012c

Please sign in to comment.