Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aerospike sink #632

Merged
merged 7 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ val localAndCloudCommonDependencies = Seq(
"com.github.changvvb" %% "jackson-module-caseclass" % "1.1.1",
"com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.11.1",
"org.elasticsearch" % "elasticsearch-spark-30_2.12" % "7.15.2",
"org.eclipse.jetty" % "jetty-util" % "9.3.24.v20180605"
"org.eclipse.jetty" % "jetty-util" % "9.3.24.v20180605",
"com.aerospike" % "aerospike-spark_2.12" % "3.3.0_spark3.1-allshaded"
) // Common deps

val jdbcDrivers = Seq(
Expand Down
34 changes: 33 additions & 1 deletion docs/dev_guide/aerospike_setup_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,36 @@ Next we will verify the functionality of Aerospike database by performing basic

# View the config
asadm -e "show config"
```
```

# Configure feathr core Spark to connect with Aerospike

1. To connect to Aerospike (with Aerospike SDK, or Spark), username and password need to be configured.
Guidance for setting up username and password:
https://docs.aerospike.com/server/operations/configure/security/access-control

2. To connect to Aerospike with Spark, a spark conector jar needs to be submitted to your Spark runtime.
Link to spark connector:
https://docs.aerospike.com/connect/spark

3. To use Aerospike as the online store, create `AerospikeSink` and add it to the `MaterializationSettings`, then use it with `FeathrClient.materialize_features`, e.g..

```
name = 'aerospike_output'
os.environ[f"{name.upper()}_USER"] = "as_user_name"
os.environ[f"{name.upper()}_PASSWORD"] = "some_magic_word"
as_sink = AerospikeSink(name=name,seedhost="ip_address", port=3000, namespace="test", setname="test")
client.materialize_features(..., materialization_settings=MaterializationSettings(..., sinks=[as_sink]))
```


# Known limitations for Aerospike:
Aerospike has its own limitations on the data .
One limitation is that worth attention is, for any incoming data row, ANY column name should not be longer than 15 bytes.
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved

So when using feathr, do not define feature names longer than 15 ascii characters.

Check
https://docs.aerospike.com/guide/limitations for more details.


1 change: 1 addition & 0 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
'RedisSink',
'HdfsSink',
'MonitoringSqlSink',
'AerospikeSink',
'FeatureQuery',
'LookupFeature',
'Aggregation',
Expand Down
21 changes: 21 additions & 0 deletions feathr_project/feathr/definition/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,24 @@ def get_required_properties(self) -> List[str]:
if self.auth:
return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"]
return []

class AerospikeSink(GenericSink):
def __init__(self,name:str,seedhost:str,port:int,namespace:str,setname:str):
super().__init__(format="aerospike",mode="APPEND",options = {
"aerospike.seedhost":seedhost,
"aerospike.port":str(port),
"aerospike.namespace":namespace,
"aerospike.user":"${%s_USER}" % name.upper(),
"aerospike.password":"${%s_PASSWORD}" % name.upper(),
"aerospike.set":setname
})
self.name = name

def support_offline(self) -> bool:
return False

def support_online(self) -> bool:
return True

def get_required_properties(self) -> List[str]:
return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"]
102 changes: 100 additions & 2 deletions feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
from datetime import datetime, timedelta
from pathlib import Path
from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING,
DerivedFeature, Feature, FeatureAnchor, HdfsSource,
TypedKey, ValueType, WindowAggTransformation)
from feathr import FeathrClient
from feathr.definition.sink import CosmosDbSink, ElasticSearchSink
from feathr.definition.source import HdfsSource

import pytest
from click.testing import CliRunner
Expand All @@ -10,7 +15,7 @@
from feathr import FeathrClient
from feathr import FeatureQuery
from feathr import ObservationSettings
from feathr import RedisSink, HdfsSink, JdbcSink
from feathr import RedisSink, HdfsSink, JdbcSink,AerospikeSink
from feathr import TypedKey
from feathr import ValueType
from feathr.utils.job_utils import get_result_df
Expand Down Expand Up @@ -265,7 +270,6 @@ def test_feathr_materialize_to_cosmosdb():
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)


@pytest.mark.skip(reason="Marked as skipped as we need to setup resources for this test")
def test_feathr_materialize_to_es():
"""
Expand Down Expand Up @@ -295,7 +299,101 @@ def test_feathr_materialize_to_es():
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

@pytest.mark.skip(reason="Marked as skipped as we need to setup resources for this test")
def test_feathr_materialize_to_aerospike():
"""
Test FeathrClient() CosmosDbSink.
"""
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
# os.chdir(test_workspace_dir)
now = datetime.now()
# set workspace folder by time; make sure we don't have write conflict if there are many CI tests running
os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)])

client = FeathrClient(config_path="feathr_config.yaml")
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
batch_source = HdfsSource(name="nycTaxiBatchSource",
path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")

f_trip_distance = Feature(name="f_trip_distance",
feature_type=FLOAT, transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
feature_type=INT32,
transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")

features = [
f_trip_distance,
f_trip_time_duration,
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
transform="cast_float(trip_distance)>30"),
Feature(name="f_day_of_week",
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)"),
]


request_anchor = FeatureAnchor(name="request_features",
source=INPUT_CONTEXT,
features=features)

f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
feature_type=FLOAT,
input_features=[
f_trip_distance, f_trip_time_duration],
transform="f_trip_distance * f_trip_time_duration")

f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
feature_type=INT32,
input_features=[f_trip_time_duration],
transform="f_trip_time_duration % 10")

location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
agg_features = [Feature(name="avgfare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="AVG",
window="90d",
)),
Feature(name="maxfare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="MAX",
window="90d"))
]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
source=batch_source,
features=agg_features)

client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[
f_trip_time_distance, f_trip_time_rounded])


backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

now = datetime.now()
os.environ[f"aerospike_USER"] = "feathruser"
os.environ[f"aerospike_PASSWORD"] = "feathr"
as_sink = AerospikeSink(name="aerospike",seedhost="20.57.186.153", port=3000, namespace="test", setname="test")
settings = MaterializationSettings("nycTaxiTable",
sinks=[as_sink],
feature_names=[
"avgfare", "maxfare"],
backfill_time=backfill_time)
client.materialize_features(settings)
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
if __name__ == "__main__":
test_feathr_materialize_to_aerospike()
test_feathr_get_offline_features_to_sql()
test_feathr_materialize_to_cosmosdb()
46 changes: 45 additions & 1 deletion feathr_project/test/test_feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,51 @@ def test_feathr_register_features_e2e():
output_path=output_path)
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

def test_feathr_register_features_purview_e2e():
"""
This test will register features, get all the registered features, then query a set of already registered features.
"""

test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config_purview.yaml"))

# set output folder based on different runtime
now = datetime.now()
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".parquet"])
else:
output_path = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/output','_', str(now.minute), '_', str(now.second), ".parquet"])


client.register_features()
# Allow purview to process a bit
time.sleep(5)
# in CI test, the project name is set by the CI pipeline so we read it here
all_features = client.list_registered_features(project_name=client.project_name)
all_feature_names = [x['name'] for x in all_features]

assert 'f_is_long_trip_distance' in all_feature_names # test regular ones
assert 'f_trip_time_rounded' in all_feature_names # make sure derived features are there
assert 'f_location_avg_fare' in all_feature_names # make sure aggregated features are there
assert 'f_trip_time_rounded_plus' in all_feature_names # make sure derived features are there
assert 'f_trip_time_distance' in all_feature_names # make sure derived features are there

# Sync workspace from registry, will get all conf files back
client.get_features_from_registry(client.project_name)

feature_query = FeatureQuery(
feature_list=["f_location_avg_fare", "f_trip_time_rounded", "f_is_long_trip_distance"],
key=TypedKey(key_column="DOLocationID",key_column_type=ValueType.INT32))
settings = ObservationSettings(
observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04_with_index.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")
client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path)
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

def test_feathr_register_features_partially():
"""
This test will register full set of features into one project, then register another project in two partial registrations.
Expand Down Expand Up @@ -182,4 +227,3 @@ def test_feathr_get_features_from_registry():
assert len(total_conf_files) == 3



113 changes: 113 additions & 0 deletions feathr_project/test/test_user_workspace/feathr_config_purview.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# DO NOT MOVE OR DELETE THIS FILE

# This file contains the configurations that are used by Feathr
# All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of this config file.
# For example, `feathr_runtime_location` for databricks can be overwritten by setting this environment variable:
# SPARK_CONFIG__DATABRICKS__FEATHR_RUNTIME_LOCATION
# Another example would be overwriting Redis host with this config: `ONLINE_STORE__REDIS__HOST`
# For example if you want to override this setting in a shell environment:
# export ONLINE_STORE__REDIS__HOST=feathrazure.redis.cache.windows.net

# version of API settings
api_version: 1
project_config:
project_name: 'project_feathr_integration_test'
# Information that are required to be set via environment variables.
required_environment_variables:
# the environemnt variables are required to run Feathr
# Redis password for your online store
- 'REDIS_PASSWORD'
# client IDs and client Secret for the service principal. Read the getting started docs on how to get those information.
- 'AZURE_CLIENT_ID'
- 'AZURE_TENANT_ID'
- 'AZURE_CLIENT_SECRET'
optional_environment_variables:
# the environemnt variables are optional, however you will need them if you want to use some of the services:
- ADLS_ACCOUNT
- ADLS_KEY
- WASB_ACCOUNT
- WASB_KEY
- S3_ACCESS_KEY
- S3_SECRET_KEY
- JDBC_TABLE
- JDBC_USER
- JDBC_PASSWORD
- KAFKA_SASL_JAAS_CONFIG

offline_store:
# paths starts with abfss:// or abfs://
# ADLS_ACCOUNT and ADLS_KEY should be set in environment variable if this is set to true
adls:
adls_enabled: true

# paths starts with wasb:// or wasbs://
# WASB_ACCOUNT and WASB_KEY should be set in environment variable
wasb:
wasb_enabled: true

# paths starts with s3a://
# S3_ACCESS_KEY and S3_SECRET_KEY should be set in environment variable
s3:
s3_enabled: true
# S3 endpoint. If you use S3 endpoint, then you need to provide access key and secret key in the environment variable as well.
s3_endpoint: 's3.amazonaws.com'

# jdbc endpoint
jdbc:
jdbc_enabled: true
jdbc_database: 'feathrtestdb'
jdbc_table: 'feathrtesttable'

# snowflake endpoint
snowflake:
snowflake_enabled: true
url: "dqllago-ol19457.snowflakecomputing.com"
user: "feathrintegration"
role: "ACCOUNTADMIN"

spark_config:
# choice for spark runtime. Currently support: azure_synapse, databricks
# The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.
spark_cluster: 'databricks'
# configure number of parts for the spark output for feature generation job
spark_result_output_parts: '1'

azure_synapse:
dev_url: 'https://feathrazuretest3synapse.dev.azuresynapse.net'
pool_name: 'spark3'
# workspace dir for storing all the required configuration files and the jar resources
workspace_dir: 'abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_test_workspace'
executor_size: 'Small'
executor_num: 1
# Feathr Job configuration. Support local paths, path start with http(s)://, and paths start with abfs(s)://
# this is the default location so end users don't have to compile the runtime again.
# feathr_runtime_location: wasbs://public@azurefeathrstorage.blob.core.windows.net/feathr-assembly-LATEST.jar
feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.7.2.jar"
databricks:
# workspace instance
workspace_instance_url: 'https://adb-2474129336842816.16.azuredatabricks.net/'
workspace_token_value: 'dapid8ddd83000dc2863763b7d47f0e8f3db'
# config string including run time information, spark version, machine size, etc.
# the config follows the format in the databricks documentation: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs
config_template: {"run_name":"FEATHR_FILL_IN","new_cluster":{"spark_version":"9.1.x-scala2.12","num_workers":1,"spark_conf":{"FEATHR_FILL_IN":"FEATHR_FILL_IN"},"instance_pool_id":"0403-214809-inlet434-pool-l9dj3kwz"},"libraries":[{"jar":"FEATHR_FILL_IN"}],"spark_jar_task":{"main_class_name":"FEATHR_FILL_IN","parameters":["FEATHR_FILL_IN"]}}
# Feathr Job location. Support local paths, path start with http(s)://, and paths start with dbfs:/
work_dir: 'dbfs:/feathr_getting_started'
# this is the default location so end users don't have to compile the runtime again.
feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.7.2.jar"

online_store:
redis:
# Redis configs to access Redis cluster
host: 'feathrazuretest3redis.redis.cache.windows.net'
port: 6380
ssl_enabled: True

feature_registry:
# The API endpoint of the registry service
api_endpoint: "https://feathr-sql-registry.azurewebsites.net/api/v1"

monitoring:
database:
sql:
url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres'
user: "demo"
Loading