diff --git a/docs/how-to-guides/deploy-feathr-api-as-webapp.md b/docs/how-to-guides/deploy-feathr-api-as-webapp.md new file mode 100644 index 000000000..87ee70a19 --- /dev/null +++ b/docs/how-to-guides/deploy-feathr-api-as-webapp.md @@ -0,0 +1,114 @@ +--- +layout: default +title: Feathr API Deployment +parent: Feathr How-to Guides +--- + +# Feathr API +The API currently supports following functionality + +1. Get Feature by Qualified Name +2. Get Feature by GUID +3. Get List of Features +4. Get Lineage for a Feature + + +## Build and run locally +### Install +__NOTE:__ You can run the following command in your local python environment or in your Azure Virtual machine. +You can install dependencies through the requirements file +```bash +$ pip install -r requirements.txt +``` + +### Run +This command will start the uvicorn server locally and will dynamically load your changes. +```bash +uvicorn api:app --port 8080 --reload +``` + +## Build and deploy on Azure +Here are the steps to build the API as a docker container, push it to Azure Container registry and then deploy it as webapp. The instructions below are for Mac/Linux but should work on Windows too. You might have to use sudo command or run docker as administrator on windows if you don't have right privileges. + +1. Install Azure CLI by following instructions [here](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest) + +1. Create Azure Container Registry. First create the resource group. + ```bash + az group create --name --location + ``` + + Then create the container registry + ```bash + az acr create --resource-group --name --sku Basic + ``` + +1. Login to your Azure container registry (ACR) account. + ```bash + $ az acr login --name + ``` + +1. Clone the repository and navigate to api folder + ```bash + $ git clone git@github.com:linkedin/feathr.git + + $ cd feathr_project/feathr/api + + ``` + +1. Build the docker container locally, you need to have docker installed locally and have it running. To set up docker on your machine follow the instructions [here](https://docs.docker.com/get-started/) +__Note: Note: /image_name is not a mandatory format for specifying the name of the image.It’s just a useful convention to avoid tagging your image again when you need to push it to a registry. It can be anything you want in the format below__ + + ```bash + $ docker build -t feathr/api . + ``` + +1. Run docker images command and you will see your newly created image + ```bash + $ docker images + + REPOSITORY TAG IMAGE ID CREATED SIZE + feathr/api latest a647ea749b9b 5 minutes ago 529MB + ``` + +1. Before you can push an image to your registry, you must tag it with the fully qualified name of your ACR login server. The login server name is in the format .azurecr.io (all lowercase), for example, mycontainerregistry007.azurecr.io. Tag the image + ```bash + $ docker tag feathr/api:latest feathracr.azurecr.io/feathr/api:latest + ``` +1. Push the image to the registry + ```bash + $ docker push feathracr.azurecr.io/feathr/api:latest + ``` +1. List the images from your registry to see your recently pushed image + ``` + az acr repository list --name feathracr --output table + ``` + Output: + ``` + Result + ---------- + feathr/api + ``` + +## Deploy image to Azure WebApp for Containers + +1. Go to [Azure portal](https://portal.azure.com) and search for your container registry +1. Select repositories from the left pane and click latest tag. Click on the three dots on right side of the tag and select __Deploy to WebApp__ option. If you see the __Deploy to WebApp__ option greyed out, you would have to enable Admin User on the registry by Updating it. + + ![Container Image 1](../images/feathr_api_image_latest.png) + + ![Container Image 2](../images/feathr_api_image_latest_options.png) + + +1. Provide a name for the deployed webapp, along with the subscription to deploy app into, the resource group and the appservice plan + + ![Container Image](../images/feathr_api_image_latest_deployment.png) + +1. You will get the notification that your app has been successfully deployed, click on __Go to Resource__ button. + + +1. On the App overview page go to the URL (https://.azurewebsites.net/docs) for deployed app (it's under URL on the app overview page) and you should see the API documentation. + + ![API docs](../images/api-docs.png) + +Congratulations you have successfully deployed the Feathr API. + diff --git a/docs/images/api-docs.png b/docs/images/api-docs.png new file mode 100644 index 000000000..1dd670bf4 Binary files /dev/null and b/docs/images/api-docs.png differ diff --git a/docs/images/feathr_api_image_latest.png b/docs/images/feathr_api_image_latest.png new file mode 100644 index 000000000..34a1fcda3 Binary files /dev/null and b/docs/images/feathr_api_image_latest.png differ diff --git a/docs/images/feathr_api_image_latest_deployment.png b/docs/images/feathr_api_image_latest_deployment.png new file mode 100644 index 000000000..32b7f8998 Binary files /dev/null and b/docs/images/feathr_api_image_latest_deployment.png differ diff --git a/docs/images/feathr_api_image_latest_options.png b/docs/images/feathr_api_image_latest_options.png new file mode 100644 index 000000000..fa23f99eb Binary files /dev/null and b/docs/images/feathr_api_image_latest_options.png differ diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index 4c601890a..a7e52de14 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -23,7 +23,6 @@ from pyapacheatlas.core.util import GuidTracker from pyhocon import ConfigFactory -from feathr._envvariableutil import _EnvVaraibleUtil from feathr._file_utils import write_to_file from feathr.anchor import FeatureAnchor from feathr.constants import * @@ -52,9 +51,9 @@ def __init__(self, project_name: str, azure_purview_name: str, registry_delimite self.credential = DefaultAzureCredential(exclude_interactive_browser_credential=False) if credential is None else credential self.oauth = AzCredentialWrapper(credential=self.credential) self.purview_client = PurviewClient( - account_name=self.azure_purview_name, - authentication=self.oauth - ) + account_name=self.azure_purview_name, + authentication=self.oauth + ) self.guid = GuidTracker(starting=-1000) self.entity_batch_queue = [] @@ -657,6 +656,7 @@ def register_features(self, workspace_path: Optional[Path] = None, from_context: logger.info( "Finished registering features. See {} to access the Purview web interface", webinterface_path) + def _purge_feathr_registry(self): """ Delete all the feathr related entities and type definitions in feathr registry. For internal use only @@ -704,7 +704,8 @@ def _delete_all_feathr_entitties(self): self.purview_client.delete_entity( guid=guid_list[i:i+batch_delte_size]) logger.info("{} feathr entities deleted", batch_delte_size) - + + @classmethod def _get_registry_client(self): """ Return a client object and users can operate more on it (like doing search) @@ -733,12 +734,66 @@ def list_registered_features(self, project_name: str = None, limit=50, starting_ feature_list.append(entity["name"]) return feature_list + + def get_feature_by_fqdn_type(self, qualifiedName, typeName): + """ + Get a single feature by it's QualifiedName and Type + Returns the feature else throws an AtlasException with 400 error code + """ + response = self.purview_client.get_entity(qualifiedName=qualifiedName, typeName=typeName) + entities = response.get('entities') + for entity in entities: + if entity.get('typeName') == typeName and entity.get('attributes').get('qualifiedName') == qualifiedName: + return entity + + def get_feature_by_fqdn(self, qualifiedName): + """ + Get feature by qualifiedName + Returns the feature else throws an AtlasException with 400 error code + """ + guid = self.get_feature_guid(qualifiedName) + return self.get_feature_by_guid(guid) + + def get_feature_by_guid(self, guid): + """ + Get a single feature by it's GUID + Returns the feature else throws an AtlasException with 400 error code + """ + response = self.purview_client.get_single_entity(guid=guid) + return response + + def get_feature_lineage(self, guid): + """ + Get feature's lineage by it's GUID + Returns the feature else throws an AtlasException with 400 error code + """ + return self.purview_client.get_entity_lineage(guid=guid) + + def get_feature_guid(self, qualifiedName): + """ + Get guid of a feature given its qualifiedName + """ + search_term = "qualifiedName:{0}".format(qualifiedName) + entities = self.purview_client.discovery.search_entities(search_term) + for entity in entities: + if entity.get('qualifiedName') == qualifiedName: + return entity.get('id') + def search_features(self, searchTerm): + """ + Search the registry for the given query term + For a ride hailing company few examples could be - "taxi", "passenger", "fare" etc. + It's a keyword search on the registry metadata + """ + search_term = "qualifiedName:{0}".format(searchTerm) + entities = self.purview_client.discovery.search_entities(search_term) + return entities + def _list_registered_entities_with_details(self, project_name: str = None, entity_type: Union[str, List[str]] = None, limit=50, starting_offset=0,) -> List[Dict]: """ List all the already registered entities. entity_type should be one of: SOURCE, DERIVED_FEATURE, ANCHOR, ANCHOR_FEATURE, FEATHR_PROJECT, or a list of those values limit: a maximum 1000 will be enforced at the underlying API - + returns a list of the result entities. """ entity_type_list = [entity_type] if isinstance( diff --git a/feathr_project/feathr/_synapse_submission.py b/feathr_project/feathr/_synapse_submission.py index 0acb7d1a5..727eebe52 100644 --- a/feathr_project/feathr/_synapse_submission.py +++ b/feathr_project/feathr/_synapse_submission.py @@ -6,20 +6,38 @@ from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse from os.path import basename +from enum import Enum from azure.identity import (ChainedTokenCredential, DefaultAzureCredential, DeviceCodeCredential, EnvironmentCredential, ManagedIdentityCredential) from azure.storage.filedatalake import DataLakeServiceClient from azure.synapse.spark import SparkClient -from azure.synapse.spark.models import (LivyStates, SparkBatchJob, - SparkBatchJobOptions) +from azure.synapse.spark.models import SparkBatchJobOptions from loguru import logger from tqdm import tqdm from feathr._abc import SparkJobLauncher from feathr.constants import * +class LivyStates(Enum): + """ Adapt LivyStates over to relax the dependency for azure-synapse-spark pacakge. + Definition is here: + https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/synapse/azure-synapse-spark/azure/synapse/spark/models/_spark_client_enums.py#L38 + """ + + NOT_STARTED = "not_started" + STARTING = "starting" + IDLE = "idle" + BUSY = "busy" + SHUTTING_DOWN = "shutting_down" + ERROR = "error" + DEAD = "dead" + KILLED = "killed" + SUCCESS = "success" + RUNNING = "running" + RECOVERING = "recovering" + class _FeathrSynapseJobLauncher(SparkJobLauncher): """ @@ -111,9 +129,9 @@ def wait_for_completion(self, timeout_seconds: Optional[float]) -> bool: while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds): status = self.get_status() logger.info('Current Spark job status: {}', status) - if status in {LivyStates.SUCCESS}: + if status in {LivyStates.SUCCESS.value}: return True - elif status in {LivyStates.ERROR, LivyStates.DEAD, LivyStates.KILLED}: + elif status in {LivyStates.ERROR.value, LivyStates.DEAD.value, LivyStates.KILLED.value}: return False else: time.sleep(30) diff --git a/feathr_project/feathr/api/api.py b/feathr_project/feathr/api/api.py new file mode 100644 index 000000000..c6a2235e1 --- /dev/null +++ b/feathr_project/feathr/api/api.py @@ -0,0 +1,192 @@ +import os +import logging +from typing import List, Optional +from fastapi import FastAPI, HTTPException, Response, status +from pyapacheatlas.core import (AtlasException) +from pydantic import BaseModel +from opencensus.ext.azure.log_exporter import AzureLogHandler +from feathr._feature_registry import _FeatureRegistry +from feathr.constants import * +from feathr._feature_registry import _FeatureRegistry + + +app = FastAPI() + +# Log Level +log_level = os.getenv("logLevel", "INFO") + +logger = logging.getLogger(__name__) +handler = logging.StreamHandler() +# Set the application insights connection string to enable the logger to write to Application Insight's directly +app_insights_connection_string = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") +formatter = logging.Formatter("[%(asctime)s] [%(name)s:%(lineno)s - %(funcName)5s()] %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) +if app_insights_connection_string: + azure_handler = AzureLogHandler(connection_string=app_insights_connection_string) + logger.addHandler(azure_handler) +else: + logger.warning("APPLICATIONINSIGHTS_CONNECTION_STRING is not set - will NOT log to AppInsights!!") +logger.setLevel(log_level) +logger.info("starting %s", __file__) + +""" +This is to enable Authentication for API and not keep it wide open. +You can set the AppServiceKey variable as environment variable +and make sure to pass the variable as query parameter when you access the API. +eg - /projects//features?code= +""" +appServiceKey = os.getenv("AppServiceKey") +project_name = os.getenv("ProjectName") +azure_purview_name = os.getenv("AZURE_PURVIEW_NAME") +registry_delimiter = os.getenv("REGISTRY_DELIMITER") + + +def getRegistry(): + return _FeatureRegistry(project_name=project_name, azure_purview_name= azure_purview_name, registry_delimiter = registry_delimiter) + + +@app.get("/") +async def root(code): + """ + Root endpoint + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + return {"message": "Welcome to Feature Store APIs. Please call specific APIs for your use case"} + +class Features(BaseModel): + """ + Defining contract for input field + """ + + features: List[str] + + +@app.get("/projects/{project_name}/features", response_model=Features) +def list_registered_features(code, project_name: str, response: Response): + """List all the already registered features. If project_name is not provided or is None, it will return all + the registered features; otherwise it will only return features under this project + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + try: + registry = getRegistry() + logger.info("Retrieved registry client successfully") + response.status_code = status.HTTP_200_OK + result = registry.list_registered_features(project_name) + return {"features" : result} + except AtlasException as ae: + logger.error("Error retrieving feature: %s", ae.args[0]) + raise HTTPException(status_code=400, detail="Error: " + ae.args[0]) + except Exception as err: + logger.error("Error: %s", err.args[0]) + raise HTTPException(status_code=400, detail="Error: " + err.args[0]) + +@app.get("/projects/{project_name}/features/{feature_name}") +def get_feature_qualifiedName(code : str, project_name: str, feature_name: str, response: Response, type_name: Optional[str] = None): + """List all the already registered features. If project_name is not provided or is None, it will return all + the registered features; otherwise it will only return features under this project + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + try: + registry = getRegistry() + logger.info("Retrieved registry client successfully") + response.status_code = status.HTTP_200_OK + result = None + if type_name: # Type is provided + result = registry.get_feature_by_fqdn_type(feature_name, type_name) + else: + result = registry.get_feature_by_fqdn(feature_name) + return result + except AtlasException as ae: + logger.error("Error retrieving feature: %s", ae.args[0]) + raise HTTPException(status_code=400, detail=ae.args[0]) + except Exception as err: + logger.error("Error: %s", err.args[0]) + raise HTTPException(status_code=400, detail=err.args[0]) + +@app.get("/projects/{project_name}/features/lineage/{feature_name}") +def get_feature_qualifiedName(code : str, project_name: str, feature_name: str, response: Response): + """List all the already registered features. If project_name is not provided or is None, it will return all + the registered features; otherwise it will only return features under this project + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + try: + registry = getRegistry() + logger.info("Retrieved registry client successfully") + response.status_code = status.HTTP_200_OK + guid = registry.get_feature_guid(feature_name) + if guid: + result = registry.get_feature_lineage(guid) + return result + except AtlasException as ae: + logger.error("Error retrieving feature: %s", ae.args[0]) + raise HTTPException(status_code=400, detail=ae.args[0]) + except Exception as err: + logger.error("Error: %s", err.args[0]) + raise HTTPException(status_code=400, detail=err.args[0]) + +@app.get("/featurestore/search") +def get_feature_qualifiedName(code : str, query: str, response: Response): + """List all the already registered features. If project_name is not provided or is None, it will return all + the registered features; otherwise it will only return features under this project + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + try: + registry = getRegistry() + logger.info("Retrieved registry client successfully") + response.status_code = status.HTTP_200_OK + result = registry.search_features(query) + return result + except AtlasException as ae: + logger.error("Error retrieving feature: %s", ae.args[0]) + raise HTTPException(status_code=400, detail=ae.args[0]) + except Exception as err: + logger.error("Error: %s", err.args[0]) + raise HTTPException(status_code=400, detail=err.args[0]) + +@app.get("/projects/{project_name}/sources/{source_name}") +def get_source_by_qualifiedName(code : str, project_name: str, source_name: str, response: Response): + """List all the sources. If project_name is not provided or is None, it will return all + the sources; otherwise it will only return sources under this project + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + try: + registry = getRegistry() + logger.info("Retrieved registry client successfully") + response.status_code = status.HTTP_200_OK + type_name = "feathr_source_v1" + result = registry.get_feature_by_fqdn_type(source_name, type_name) + return result + except AtlasException as ae: + logger.error("Error retrieving feature: %s", ae.args[0]) + raise HTTPException(status_code=400, detail=ae.args[0]) + except Exception as err: + logger.error("Error: %s", err.args[0]) + raise HTTPException(status_code=400, detail=err.args[0]) + +@app.get("/projects/{project_name}/anchors/{anchor_name}") +def get_anchor_by_qualifiedName(code : str, project_name: str, anchor_name: str, response: Response): + """List all the anchors. If project_name is not provided or is None, it will return all + the anchors; otherwise it will only return anchors under this project + """ + if code != appServiceKey: + raise HTTPException(status_code=403, detail="You are not allowed to access this resource") + try: + registry = getRegistry() + logger.info("Retrieved registry client successfully") + response.status_code = status.HTTP_200_OK + type_name = "feathr_anchor" + result = registry.get_feature_by_fqdn_type(anchor_name, type_name) + return result + except AtlasException as ae: + logger.error("Error retrieving feature: %s", ae.args[0]) + raise HTTPException(status_code=400, detail=ae.args[0]) + except Exception as err: + logger.error("Error: %s", err.args[0]) + raise HTTPException(status_code=400, detail=err.args[0]) diff --git a/feathr_project/feathr/api/dockerfile b/feathr_project/feathr/api/dockerfile new file mode 100644 index 000000000..1b1b351be --- /dev/null +++ b/feathr_project/feathr/api/dockerfile @@ -0,0 +1,14 @@ +FROM ubuntu:latest +RUN apt-get update -y +RUN apt-get install -y build-essential python3.8 python3.8-dev python3-pip python3.8-venv git +# update pip +RUN python3.8 -m pip install pip --upgrade +RUN python3.8 -m pip install wheel +EXPOSE 80 + +COPY . /app +WORKDIR /app +RUN pip install -r requirements.txt +ENV LC_ALL=C.UTF-8 +ENV LANG=C.UTF-8 +CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "80"] \ No newline at end of file diff --git a/feathr_project/feathr/api/requirements.txt b/feathr_project/feathr/api/requirements.txt new file mode 100644 index 000000000..d50cf3911 --- /dev/null +++ b/feathr_project/feathr/api/requirements.txt @@ -0,0 +1,8 @@ +azure-core +azure-purview-catalog==1.0.0b2 +fastapi +opencensus-ext-azure +pyapacheatlas +pydantic +uvicorn +git+https://github.com/linkedin/feathr.git@main#subdirectory=feathr_project diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 76361859b..40aab9631 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -91,7 +91,9 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir if local_workspace_dir: self.local_workspace_dir = local_workspace_dir else: - self.local_workspace_dir = tempfile.TemporaryDirectory().name + # this is required for Windows + tem_dir_obj = tempfile.TemporaryDirectory() + self.local_workspace_dir = tem_dir_obj.name self.envutils = envutils diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/features/agg_features.py b/feathr_project/feathrcli/data/feathr_user_workspace/features/agg_features.py index 5fd6d72e3..aa166a221 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/features/agg_features.py +++ b/feathr_project/feathrcli/data/feathr_user_workspace/features/agg_features.py @@ -6,7 +6,7 @@ from feathr.typed_key import TypedKey batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/features/non_agg_features.py b/feathr_project/feathrcli/data/feathr_user_workspace/features/non_agg_features.py index 8926f5d3b..8d7d7c93b 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/features/non_agg_features.py +++ b/feathr_project/feathrcli/data/feathr_user_workspace/features/non_agg_features.py @@ -5,7 +5,7 @@ from feathr.source import HdfsSource batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 1826ecfbc..9a5f6ab8b 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -22,7 +22,7 @@ install_requires=[ 'Click', "azure-storage-file-datalake>=12.5.0", - "azure-synapse-spark>=0.7.0", + "azure-synapse-spark", "azure-identity", "py4j", "loguru", diff --git a/feathr_project/test/test_azure_snowflake_e2e.py b/feathr_project/test/test_azure_snowflake_e2e.py index b5f588827..b58229493 100644 --- a/feathr_project/test/test_azure_snowflake_e2e.py +++ b/feathr_project/test/test_azure_snowflake_e2e.py @@ -31,7 +31,7 @@ def test_feathr_online_store_agg_features(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=600) + client.wait_job_to_finish(timeout_sec=900) res = client.get_online_features('snowflakeSampleDemoFeature', '1', ['f_snowflake_call_center_division_name', 'f_snowflake_call_center_zipcode']) diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index be1357a4a..adfba0aef 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -44,7 +44,7 @@ def test_feathr_online_store_agg_features(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=600) + client.wait_job_to_finish(timeout_sec=900) res = client.get_online_features(online_test_table, '265', [ 'f_location_avg_fare', 'f_location_max_fare']) @@ -85,7 +85,7 @@ def test_feathr_online_store_non_agg_features(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=600) + client.wait_job_to_finish(timeout_sec=900) res = client.get_online_features(online_test_table, '111', ['f_gen_trip_distance', 'f_gen_is_long_trip_distance', 'f1', 'f2', 'f3', 'f4', 'f5', 'f6']) @@ -148,7 +148,7 @@ def test_feathr_get_offline_features(): feature_query = FeatureQuery( feature_list=["f_location_avg_fare"], key=location_id) settings = ObservationSettings( - observation_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") diff --git a/feathr_project/test/test_feature_anchor.py b/feathr_project/test/test_feature_anchor.py index 96c86612c..d5e6701b8 100644 --- a/feathr_project/test/test_feature_anchor.py +++ b/feathr_project/test/test_feature_anchor.py @@ -60,7 +60,7 @@ def test_request_feature_anchor_to_config(): def test_non_agg_feature_anchor_to_config(): batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -111,7 +111,7 @@ def test_non_agg_feature_anchor_to_config(): def test_agg_anchor_to_config(): batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index a2c3680a4..d0a0d7656 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -18,7 +18,7 @@ def basic_test_setup(config_path: str): client = FeathrClient(config_path=config_path) batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -135,7 +135,7 @@ def add_new_dropoff_and_fare_amount_column(df: DataFrame): return df batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss", preprocessing=add_new_dropoff_and_fare_amount_column, diff --git a/feathr_project/test/test_observation_setting.py b/feathr_project/test/test_observation_setting.py index bef11d1a8..1460d7e7b 100644 --- a/feathr_project/test/test_observation_setting.py +++ b/feathr_project/test/test_observation_setting.py @@ -3,7 +3,7 @@ def test_observation_setting_with_timestamp(): observation_settings = ObservationSettings( - observation_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") config = observation_settings.to_config() @@ -17,7 +17,7 @@ def test_observation_setting_with_timestamp(): } } - observationPath: "abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv" + observationPath: "wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv" """ assert ''.join(config.split()) == ''.join(expected_config.split()) diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index 747f0c7d1..f9e7cf602 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -59,7 +59,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) batch_source = HdfsSource(name="nycTaxiBatchSource_add_new_fare_amount", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", preprocessing=add_new_fare_amount, event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -103,7 +103,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=600) + client.wait_job_to_finish(timeout_sec=900) res = client.get_online_features(online_test_table, '2020-04-01 07:21:51', [ 'f_is_long_trip_distance', 'f_day_of_week']) @@ -118,7 +118,7 @@ def test_feature_swa_feature_gen_with_preprocessing(): client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) batch_source = HdfsSource(name="nycTaxiBatchSource", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", preprocessing=add_new_dropoff_and_fare_amount_column, event_timestamp_column="new_lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -165,7 +165,7 @@ def test_feature_swa_feature_gen_with_preprocessing(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=600) + client.wait_job_to_finish(timeout_sec=900) res = client.get_online_features(online_test_table, '265', ['f_location_avg_fare', 'f_location_max_fare']) assert res == [1000041.625, 1000100.0] @@ -180,14 +180,14 @@ def test_feathr_get_offline_features_hdfs_source(): client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) batch_source1 = HdfsSource(name="nycTaxiBatchSource_add_new_dropoff_and_fare_amount_column", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", preprocessing=add_new_dropoff_and_fare_amount_column, event_timestamp_column="new_lpep_dropoff_datetime", # event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") batch_source2 = HdfsSource(name="nycTaxiBatchSource_add_new_fare_amount", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", preprocessing=add_new_fare_amount, event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -248,7 +248,7 @@ def test_feathr_get_offline_features_hdfs_source(): ] settings = ObservationSettings( - observation_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -281,7 +281,7 @@ def test_get_offline_feature_two_swa_with_diff_preprocessing(): client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) swa_source_1 = HdfsSource(name="nycTaxiBatchSource1", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", preprocessing=add_new_dropoff_and_fare_amount_column, event_timestamp_column="new_lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -313,7 +313,7 @@ def test_get_offline_feature_two_swa_with_diff_preprocessing(): swa_source_2 = HdfsSource(name="nycTaxiBatchSource2", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", preprocessing=add_new_surcharge_amount_and_pickup_column, event_timestamp_column="new_lpep_pickup_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -337,7 +337,7 @@ def test_get_offline_feature_two_swa_with_diff_preprocessing(): ) swa_source_3 = HdfsSource(name="nycTaxiBatchSource3", - path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04_old.csv", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04_old.csv", preprocessing=add_old_lpep_dropoff_datetime, event_timestamp_column="old_lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss") @@ -365,7 +365,7 @@ def test_get_offline_feature_two_swa_with_diff_preprocessing(): ] settings = ObservationSettings( - observation_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH:mm:ss")