From e91521007e01c5d548f79a1e6a66f809876932bd Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Tue, 13 Sep 2022 22:50:20 +0800 Subject: [PATCH 1/6] squash commit and avoid conflict --- build.sbt | 3 +- docs/dev_guide/aerospike_setup_guide.md | 34 +++++- feathr_project/feathr/__init__.py | 1 + feathr_project/feathr/definition/sink.py | 24 ++++ .../feathr/registry/registry_utils.py | 4 + feathr_project/test/test_azure_spark_e2e.py | 102 +++++++++++++++- feathr_project/test/test_feature_registry.py | 46 ++++++- .../feathr_config_purview.yaml | 113 ++++++++++++++++++ .../config/location/GenericLocation.scala | 12 ++ 9 files changed, 334 insertions(+), 5 deletions(-) create mode 100644 feathr_project/test/test_user_workspace/feathr_config_purview.yaml diff --git a/build.sbt b/build.sbt index 52c9da927..fdab63f4f 100644 --- a/build.sbt +++ b/build.sbt @@ -47,7 +47,8 @@ val localAndCloudCommonDependencies = Seq( "com.github.changvvb" %% "jackson-module-caseclass" % "1.1.1", "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.11.1", "org.elasticsearch" % "elasticsearch-spark-30_2.12" % "7.15.2", - "org.eclipse.jetty" % "jetty-util" % "9.3.24.v20180605" + "org.eclipse.jetty" % "jetty-util" % "9.3.24.v20180605", + "com.aerospike" % "aerospike-spark_2.12" % "3.3.0_spark3.1-allshaded" ) // Common deps val jdbcDrivers = Seq( diff --git a/docs/dev_guide/aerospike_setup_guide.md b/docs/dev_guide/aerospike_setup_guide.md index d5f8a1784..4bf26130d 100644 --- a/docs/dev_guide/aerospike_setup_guide.md +++ b/docs/dev_guide/aerospike_setup_guide.md @@ -64,4 +64,36 @@ Next we will verify the functionality of Aerospike database by performing basic # View the config asadm -e "show config" -``` \ No newline at end of file +``` + +# Configure feathr core Spark to connect with Aerospike + +1. To connect to Aerospike (with Aerospike SDK, or Spark), username and password need to be configured. +Guidance for setting up username and password: +https://docs.aerospike.com/server/operations/configure/security/access-control + +2. To connect to Aerospike with Spark, a spark conector jar needs to be submitted to your Spark runtime. +Link to spark connector: +https://docs.aerospike.com/connect/spark + +3. To operate data in Aerospike database via Spark, spark job should have following configurations: +``` + "aerospike__seedhost": , + "aerospike__port": "3000", + "aerospike__namespace": , + "aerospike__set": , + "aerospike__user": , + "aerospike__password": , + "aerospike__updatebykey": "__key", +``` + +# Known limitations for Aerospike: +Aerospike has its own limitations on the data . +One limitation is that worth attention is, for any incoming data row, ANY column name should not be longer than 15 bytes. + +So when using feathr, do not define feature names longer than 15 ascii characters. + +Check +https://docs.aerospike.com/guide/limitations for more details. + + diff --git a/feathr_project/feathr/__init__.py b/feathr_project/feathr/__init__.py index 981a705f6..fae0cb60c 100644 --- a/feathr_project/feathr/__init__.py +++ b/feathr_project/feathr/__init__.py @@ -46,6 +46,7 @@ 'RedisSink', 'HdfsSink', 'MonitoringSqlSink', + 'AerospikeSink', 'FeatureQuery', 'LookupFeature', 'Aggregation', diff --git a/feathr_project/feathr/definition/sink.py b/feathr_project/feathr/definition/sink.py index c287ba714..fb3e738a8 100644 --- a/feathr_project/feathr/definition/sink.py +++ b/feathr_project/feathr/definition/sink.py @@ -333,3 +333,27 @@ def get_required_properties(self) -> List[str]: if self.auth: return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"] return [] + +class AerospikeSink(GenericSink): + def __init__(self,name:str,seedhost:str,port:str,namespace:str,setname:str,auth: bool = True): + super().__init__(format="aerospike",mode="APPEND",options = { + "aerospike.seedhost":seedhost, + "aerospike.port":port, + "aerospike.namespace":namespace, + "aerospike.user":"${%s_USER}" % name.upper(), + "aerospike.password":"${%s_PASSWORD}" % name.upper(), + "aerospike.set":setname + }) + self.auth = auth + self.name = name + + def support_offline(self) -> bool: + return False + + def support_online(self) -> bool: + return True + + def get_required_properties(self) -> List[str]: + if self.auth: + return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"] + return [] diff --git a/feathr_project/feathr/registry/registry_utils.py b/feathr_project/feathr/registry/registry_utils.py index cc064259d..44e31fe4d 100644 --- a/feathr_project/feathr/registry/registry_utils.py +++ b/feathr_project/feathr/registry/registry_utils.py @@ -87,13 +87,17 @@ def transformation_to_def(v: Transformation) -> dict: elif isinstance(v, WindowAggTransformation): ret = { "defExpr": v.def_expr, + "def_expr": v.def_expr, } if v.agg_func: ret["aggFunc"] = v.agg_func + ret["agg_func"] = v.agg_func + if v.window: ret["window"] = v.window if v.group_by: ret["groupBy"] = v.group_by + ret["group_by"] = v.group_by if v.filter: ret["filter"] = v.filter if v.limit: diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index 48a5ca6fe..5f63ffebb 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -1,7 +1,12 @@ import os from datetime import datetime, timedelta from pathlib import Path +from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING, + DerivedFeature, Feature, FeatureAnchor, HdfsSource, + TypedKey, ValueType, WindowAggTransformation) +from feathr import FeathrClient from feathr.definition.sink import CosmosDbSink, ElasticSearchSink +from feathr.definition.source import HdfsSource import pytest from click.testing import CliRunner @@ -10,7 +15,7 @@ from feathr import FeathrClient from feathr import FeatureQuery from feathr import ObservationSettings -from feathr import RedisSink, HdfsSink, JdbcSink +from feathr import RedisSink, HdfsSink, JdbcSink,AerospikeSink from feathr import TypedKey from feathr import ValueType from feathr.utils.job_utils import get_result_df @@ -265,7 +270,6 @@ def test_feathr_materialize_to_cosmosdb(): # assuming the job can successfully run; otherwise it will throw exception client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) - @pytest.mark.skip(reason="Marked as skipped as we need to setup resources for this test") def test_feathr_materialize_to_es(): """ @@ -295,7 +299,101 @@ def test_feathr_materialize_to_es(): # assuming the job can successfully run; otherwise it will throw exception client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) +@pytest.mark.skip(reason="Marked as skipped as we need to setup resources for this test") +def test_feathr_materialize_to_aerospike(): + """ + Test FeathrClient() CosmosDbSink. + """ + test_workspace_dir = Path( + __file__).parent.resolve() / "test_user_workspace" + # os.chdir(test_workspace_dir) + now = datetime.now() + # set workspace folder by time; make sure we don't have write conflict if there are many CI tests running + os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)]) + + client = FeathrClient(config_path="feathr_config.yaml") + batch_source = HdfsSource(name="nycTaxiBatchSource", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + + f_trip_distance = Feature(name="f_trip_distance", + feature_type=FLOAT, transform="trip_distance") + f_trip_time_duration = Feature(name="f_trip_time_duration", + feature_type=INT32, + transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60") + + features = [ + f_trip_distance, + f_trip_time_duration, + Feature(name="f_is_long_trip_distance", + feature_type=BOOLEAN, + transform="cast_float(trip_distance)>30"), + Feature(name="f_day_of_week", + feature_type=INT32, + transform="dayofweek(lpep_dropoff_datetime)"), + ] + + + request_anchor = FeatureAnchor(name="request_features", + source=INPUT_CONTEXT, + features=features) + + f_trip_time_distance = DerivedFeature(name="f_trip_time_distance", + feature_type=FLOAT, + input_features=[ + f_trip_distance, f_trip_time_duration], + transform="f_trip_distance * f_trip_time_duration") + + f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded", + feature_type=INT32, + input_features=[f_trip_time_duration], + transform="f_trip_time_duration % 10") + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + agg_features = [Feature(name="avgfare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="AVG", + window="90d", + )), + Feature(name="maxfare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="MAX", + window="90d")) + ] + + agg_anchor = FeatureAnchor(name="aggregationFeatures", + source=batch_source, + features=agg_features) + + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[ + f_trip_time_distance, f_trip_time_rounded]) + + backfill_time = BackfillTime(start=datetime( + 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + + now = datetime.now() + os.environ[f"aerospike_USER"] = "feathruser" + os.environ[f"aerospike_PASSWORD"] = "feathr" + as_sink = AerospikeSink(name="aerospike",seedhost="20.57.186.153", port="3000", namespace="test", setname="test") + settings = MaterializationSettings("nycTaxiTable", + sinks=[as_sink], + feature_names=[ + "avgfare", "maxfea"], + backfill_time=backfill_time) + client.materialize_features(settings) + # assuming the job can successfully run; otherwise it will throw exception + client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) if __name__ == "__main__": + test_feathr_materialize_to_aerospike() test_feathr_get_offline_features_to_sql() test_feathr_materialize_to_cosmosdb() \ No newline at end of file diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 699b5cc9a..e68f2a949 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -62,6 +62,51 @@ def test_feathr_register_features_e2e(): output_path=output_path) client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) +def test_feathr_register_features_purview_e2e(): + """ + This test will register features, get all the registered features, then query a set of already registered features. + """ + + test_workspace_dir = Path( + __file__).parent.resolve() / "test_user_workspace" + client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config_purview.yaml")) + + # set output folder based on different runtime + now = datetime.now() + if client.spark_runtime == 'databricks': + output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".parquet"]) + else: + output_path = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/output','_', str(now.minute), '_', str(now.second), ".parquet"]) + + + client.register_features() + # Allow purview to process a bit + time.sleep(5) + # in CI test, the project name is set by the CI pipeline so we read it here + all_features = client.list_registered_features(project_name=client.project_name) + all_feature_names = [x['name'] for x in all_features] + + assert 'f_is_long_trip_distance' in all_feature_names # test regular ones + assert 'f_trip_time_rounded' in all_feature_names # make sure derived features are there + assert 'f_location_avg_fare' in all_feature_names # make sure aggregated features are there + assert 'f_trip_time_rounded_plus' in all_feature_names # make sure derived features are there + assert 'f_trip_time_distance' in all_feature_names # make sure derived features are there + + # Sync workspace from registry, will get all conf files back + client.get_features_from_registry(client.project_name) + + feature_query = FeatureQuery( + feature_list=["f_location_avg_fare", "f_trip_time_rounded", "f_is_long_trip_distance"], + key=TypedKey(key_column="DOLocationID",key_column_type=ValueType.INT32)) + settings = ObservationSettings( + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04_with_index.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + client.get_offline_features(observation_settings=settings, + feature_query=feature_query, + output_path=output_path) + client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) + def test_feathr_register_features_partially(): """ This test will register full set of features into one project, then register another project in two partial registrations. @@ -182,4 +227,3 @@ def test_feathr_get_features_from_registry(): assert len(total_conf_files) == 3 - \ No newline at end of file diff --git a/feathr_project/test/test_user_workspace/feathr_config_purview.yaml b/feathr_project/test/test_user_workspace/feathr_config_purview.yaml new file mode 100644 index 000000000..2d52dda22 --- /dev/null +++ b/feathr_project/test/test_user_workspace/feathr_config_purview.yaml @@ -0,0 +1,113 @@ +# DO NOT MOVE OR DELETE THIS FILE + +# This file contains the configurations that are used by Feathr +# All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of this config file. +# For example, `feathr_runtime_location` for databricks can be overwritten by setting this environment variable: +# SPARK_CONFIG__DATABRICKS__FEATHR_RUNTIME_LOCATION +# Another example would be overwriting Redis host with this config: `ONLINE_STORE__REDIS__HOST` +# For example if you want to override this setting in a shell environment: +# export ONLINE_STORE__REDIS__HOST=feathrazure.redis.cache.windows.net + +# version of API settings +api_version: 1 +project_config: + project_name: 'project_feathr_integration_test' + # Information that are required to be set via environment variables. + required_environment_variables: + # the environemnt variables are required to run Feathr + # Redis password for your online store + - 'REDIS_PASSWORD' + # client IDs and client Secret for the service principal. Read the getting started docs on how to get those information. + - 'AZURE_CLIENT_ID' + - 'AZURE_TENANT_ID' + - 'AZURE_CLIENT_SECRET' + optional_environment_variables: + # the environemnt variables are optional, however you will need them if you want to use some of the services: + - ADLS_ACCOUNT + - ADLS_KEY + - WASB_ACCOUNT + - WASB_KEY + - S3_ACCESS_KEY + - S3_SECRET_KEY + - JDBC_TABLE + - JDBC_USER + - JDBC_PASSWORD + - KAFKA_SASL_JAAS_CONFIG + +offline_store: + # paths starts with abfss:// or abfs:// + # ADLS_ACCOUNT and ADLS_KEY should be set in environment variable if this is set to true + adls: + adls_enabled: true + + # paths starts with wasb:// or wasbs:// + # WASB_ACCOUNT and WASB_KEY should be set in environment variable + wasb: + wasb_enabled: true + + # paths starts with s3a:// + # S3_ACCESS_KEY and S3_SECRET_KEY should be set in environment variable + s3: + s3_enabled: true + # S3 endpoint. If you use S3 endpoint, then you need to provide access key and secret key in the environment variable as well. + s3_endpoint: 's3.amazonaws.com' + + # jdbc endpoint + jdbc: + jdbc_enabled: true + jdbc_database: 'feathrtestdb' + jdbc_table: 'feathrtesttable' + + # snowflake endpoint + snowflake: + snowflake_enabled: true + url: "dqllago-ol19457.snowflakecomputing.com" + user: "feathrintegration" + role: "ACCOUNTADMIN" + +spark_config: + # choice for spark runtime. Currently support: azure_synapse, databricks + # The `databricks` configs will be ignored if `azure_synapse` is set and vice versa. + spark_cluster: 'databricks' + # configure number of parts for the spark output for feature generation job + spark_result_output_parts: '1' + + azure_synapse: + dev_url: 'https://feathrazuretest3synapse.dev.azuresynapse.net' + pool_name: 'spark3' + # workspace dir for storing all the required configuration files and the jar resources + workspace_dir: 'abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_test_workspace' + executor_size: 'Small' + executor_num: 1 + # Feathr Job configuration. Support local paths, path start with http(s)://, and paths start with abfs(s):// + # this is the default location so end users don't have to compile the runtime again. + # feathr_runtime_location: wasbs://public@azurefeathrstorage.blob.core.windows.net/feathr-assembly-LATEST.jar + feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.7.2.jar" + databricks: + # workspace instance + workspace_instance_url: 'https://adb-2474129336842816.16.azuredatabricks.net/' + workspace_token_value: 'dapid8ddd83000dc2863763b7d47f0e8f3db' + # config string including run time information, spark version, machine size, etc. + # the config follows the format in the databricks documentation: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs + config_template: {"run_name":"FEATHR_FILL_IN","new_cluster":{"spark_version":"9.1.x-scala2.12","num_workers":1,"spark_conf":{"FEATHR_FILL_IN":"FEATHR_FILL_IN"},"instance_pool_id":"0403-214809-inlet434-pool-l9dj3kwz"},"libraries":[{"jar":"FEATHR_FILL_IN"}],"spark_jar_task":{"main_class_name":"FEATHR_FILL_IN","parameters":["FEATHR_FILL_IN"]}} + # Feathr Job location. Support local paths, path start with http(s)://, and paths start with dbfs:/ + work_dir: 'dbfs:/feathr_getting_started' + # this is the default location so end users don't have to compile the runtime again. + feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.7.2.jar" + +online_store: + redis: + # Redis configs to access Redis cluster + host: 'feathrazuretest3redis.redis.cache.windows.net' + port: 6380 + ssl_enabled: True + +feature_registry: + # The API endpoint of the registry service + api_endpoint: "https://feathr-sql-registry.azurewebsites.net/api/v1" + +monitoring: + database: + sql: + url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres' + user: "demo" diff --git a/src/main/scala/com/linkedin/feathr/offline/config/location/GenericLocation.scala b/src/main/scala/com/linkedin/feathr/offline/config/location/GenericLocation.scala index 80fa47b22..9a4bbb33a 100644 --- a/src/main/scala/com/linkedin/feathr/offline/config/location/GenericLocation.scala +++ b/src/main/scala/com/linkedin/feathr/offline/config/location/GenericLocation.scala @@ -186,6 +186,18 @@ object GenericLocationAdHocPatches { .mode(location.mode.getOrElse("overwrite")) // I don't see if ElasticSearch uses it in any doc .save() } + case "aerospike" => + val keyDf = if (!df.columns.contains("__key")) { + df.withColumn("__key", (monotonically_increasing_id().cast("string"))) + } + else { + df + } + keyDf.write.format(location.format) + .option("aerospike.updatebykey", "__key") + .options(location.options) + .mode(location.mode.getOrElse("append")) + .save() case _ => // Normal writing procedure, just set format and options then write df.write.format(location.format) From bf952bb38a58f231a6984fb57766e3b468fdbd2b Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 14 Sep 2022 21:11:09 +0800 Subject: [PATCH 2/6] Revert legacy purview client issue --- feathr_project/feathr/registry/registry_utils.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/feathr_project/feathr/registry/registry_utils.py b/feathr_project/feathr/registry/registry_utils.py index 44e31fe4d..cc064259d 100644 --- a/feathr_project/feathr/registry/registry_utils.py +++ b/feathr_project/feathr/registry/registry_utils.py @@ -87,17 +87,13 @@ def transformation_to_def(v: Transformation) -> dict: elif isinstance(v, WindowAggTransformation): ret = { "defExpr": v.def_expr, - "def_expr": v.def_expr, } if v.agg_func: ret["aggFunc"] = v.agg_func - ret["agg_func"] = v.agg_func - if v.window: ret["window"] = v.window if v.group_by: ret["groupBy"] = v.group_by - ret["group_by"] = v.group_by if v.filter: ret["filter"] = v.filter if v.limit: From a8037f0815d370e9e8be3b22b99b20baa58572b7 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 14 Sep 2022 22:30:56 +0800 Subject: [PATCH 3/6] Fix typo --- feathr_project/test/test_azure_spark_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index 5f63ffebb..43d5f4594 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -388,7 +388,7 @@ def test_feathr_materialize_to_aerospike(): settings = MaterializationSettings("nycTaxiTable", sinks=[as_sink], feature_names=[ - "avgfare", "maxfea"], + "avgfare", "maxfare"], backfill_time=backfill_time) client.materialize_features(settings) # assuming the job can successfully run; otherwise it will throw exception From 5a64cd40deefd64327f4303422fd7806605c2aca Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 14 Sep 2022 22:49:44 +0800 Subject: [PATCH 4/6] Remove auth from assink --- feathr_project/feathr/definition/sink.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/feathr_project/feathr/definition/sink.py b/feathr_project/feathr/definition/sink.py index fb3e738a8..dba809874 100644 --- a/feathr_project/feathr/definition/sink.py +++ b/feathr_project/feathr/definition/sink.py @@ -335,7 +335,7 @@ def get_required_properties(self) -> List[str]: return [] class AerospikeSink(GenericSink): - def __init__(self,name:str,seedhost:str,port:str,namespace:str,setname:str,auth: bool = True): + def __init__(self,name:str,seedhost:str,port:str,namespace:str,setname:str): super().__init__(format="aerospike",mode="APPEND",options = { "aerospike.seedhost":seedhost, "aerospike.port":port, @@ -344,7 +344,6 @@ def __init__(self,name:str,seedhost:str,port:str,namespace:str,setname:str,auth: "aerospike.password":"${%s_PASSWORD}" % name.upper(), "aerospike.set":setname }) - self.auth = auth self.name = name def support_offline(self) -> bool: @@ -354,6 +353,4 @@ def support_online(self) -> bool: return True def get_required_properties(self) -> List[str]: - if self.auth: - return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"] - return [] + return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"] From b995ba79fca8d60a6ce14cb04fdee8d69f821687 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 14 Sep 2022 23:14:04 +0800 Subject: [PATCH 5/6] Update aerospike guidance document --- docs/dev_guide/aerospike_setup_guide.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/dev_guide/aerospike_setup_guide.md b/docs/dev_guide/aerospike_setup_guide.md index 4bf26130d..fa8623a96 100644 --- a/docs/dev_guide/aerospike_setup_guide.md +++ b/docs/dev_guide/aerospike_setup_guide.md @@ -76,17 +76,17 @@ https://docs.aerospike.com/server/operations/configure/security/access-control Link to spark connector: https://docs.aerospike.com/connect/spark -3. To operate data in Aerospike database via Spark, spark job should have following configurations: +3. To use Aerospike as the online store, create `AerospikeSink` and add it to the `MaterializationSettings`, then use it with `FeathrClient.materialize_features`, e.g.. + ``` - "aerospike__seedhost": , - "aerospike__port": "3000", - "aerospike__namespace": , - "aerospike__set": , - "aerospike__user": , - "aerospike__password": , - "aerospike__updatebykey": "__key", +name = 'aerospike_output' +os.environ[f"{name.upper()}_USER"] = "as_user_name" +os.environ[f"{name.upper()}_PASSWORD"] = "some_magic_word" +as_sink = AerospikeSink(name=name,seedhost="ip_address", port="3000", namespace="test", setname="test") +client.materialize_features(..., materialization_settings=MaterializationSettings(..., sinks=[as_sink])) ``` + # Known limitations for Aerospike: Aerospike has its own limitations on the data . One limitation is that worth attention is, for any incoming data row, ANY column name should not be longer than 15 bytes. From c57e0f04fb0444693c08397233a0365e1696c8e1 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 14 Sep 2022 23:25:04 +0800 Subject: [PATCH 6/6] Chaneg port param to int --- docs/dev_guide/aerospike_setup_guide.md | 2 +- feathr_project/feathr/definition/sink.py | 4 ++-- feathr_project/test/test_azure_spark_e2e.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/dev_guide/aerospike_setup_guide.md b/docs/dev_guide/aerospike_setup_guide.md index fa8623a96..f7af48131 100644 --- a/docs/dev_guide/aerospike_setup_guide.md +++ b/docs/dev_guide/aerospike_setup_guide.md @@ -82,7 +82,7 @@ https://docs.aerospike.com/connect/spark name = 'aerospike_output' os.environ[f"{name.upper()}_USER"] = "as_user_name" os.environ[f"{name.upper()}_PASSWORD"] = "some_magic_word" -as_sink = AerospikeSink(name=name,seedhost="ip_address", port="3000", namespace="test", setname="test") +as_sink = AerospikeSink(name=name,seedhost="ip_address", port=3000, namespace="test", setname="test") client.materialize_features(..., materialization_settings=MaterializationSettings(..., sinks=[as_sink])) ``` diff --git a/feathr_project/feathr/definition/sink.py b/feathr_project/feathr/definition/sink.py index dba809874..56a5b236a 100644 --- a/feathr_project/feathr/definition/sink.py +++ b/feathr_project/feathr/definition/sink.py @@ -335,10 +335,10 @@ def get_required_properties(self) -> List[str]: return [] class AerospikeSink(GenericSink): - def __init__(self,name:str,seedhost:str,port:str,namespace:str,setname:str): + def __init__(self,name:str,seedhost:str,port:int,namespace:str,setname:str): super().__init__(format="aerospike",mode="APPEND",options = { "aerospike.seedhost":seedhost, - "aerospike.port":port, + "aerospike.port":str(port), "aerospike.namespace":namespace, "aerospike.user":"${%s_USER}" % name.upper(), "aerospike.password":"${%s_PASSWORD}" % name.upper(), diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index 43d5f4594..a7fdf8ae5 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -384,7 +384,7 @@ def test_feathr_materialize_to_aerospike(): now = datetime.now() os.environ[f"aerospike_USER"] = "feathruser" os.environ[f"aerospike_PASSWORD"] = "feathr" - as_sink = AerospikeSink(name="aerospike",seedhost="20.57.186.153", port="3000", namespace="test", setname="test") + as_sink = AerospikeSink(name="aerospike",seedhost="20.57.186.153", port=3000, namespace="test", setname="test") settings = MaterializationSettings("nycTaxiTable", sinks=[as_sink], feature_names=[