Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aerospike sink #632

Merged
merged 7 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ val localAndCloudCommonDependencies = Seq(
"com.github.changvvb" %% "jackson-module-caseclass" % "1.1.1",
"com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.11.1",
"org.elasticsearch" % "elasticsearch-spark-30_2.12" % "7.15.2",
"org.eclipse.jetty" % "jetty-util" % "9.3.24.v20180605"
"org.eclipse.jetty" % "jetty-util" % "9.3.24.v20180605",
"com.aerospike" % "aerospike-spark_2.12" % "3.3.0_spark3.1-allshaded"
) // Common deps

val jdbcDrivers = Seq(
Expand Down
34 changes: 33 additions & 1 deletion docs/dev_guide/aerospike_setup_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,36 @@ Next we will verify the functionality of Aerospike database by performing basic

# View the config
asadm -e "show config"
```
```

# Configure feathr core Spark to connect with Aerospike

1. To connect to Aerospike (with Aerospike SDK, or Spark), username and password need to be configured.
Guidance for setting up username and password:
https://docs.aerospike.com/server/operations/configure/security/access-control

2. To connect to Aerospike with Spark, a spark conector jar needs to be submitted to your Spark runtime.
Link to spark connector:
https://docs.aerospike.com/connect/spark

3. To operate data in Aerospike database via Spark, spark job should have following configurations:
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
```
"aerospike__seedhost": <your Aerospike server ip>,
"aerospike__port": "3000",
"aerospike__namespace": <the namespace on Aerospike>,
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
"aerospike__set": <Aerospike set name>,
"aerospike__user": <username>,
"aerospike__password": <password>,
"aerospike__updatebykey": "__key",
```

# Known limitations for Aerospike:
Aerospike has its own limitations on the data .
One limitation is that worth attention is, for any incoming data row, ANY column name should not be longer than 15 bytes.
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved

So when using feathr, do not define feature names longer than 15 ascii characters.

Check
https://docs.aerospike.com/guide/limitations for more details.


1 change: 1 addition & 0 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
'RedisSink',
'HdfsSink',
'MonitoringSqlSink',
'AerospikeSink',
'FeatureQuery',
'LookupFeature',
'Aggregation',
Expand Down
24 changes: 24 additions & 0 deletions feathr_project/feathr/definition/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,27 @@ def get_required_properties(self) -> List[str]:
if self.auth:
return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"]
return []

class AerospikeSink(GenericSink):
def __init__(self,name:str,seedhost:str,port:str,namespace:str,setname:str,auth: bool = True):
super().__init__(format="aerospike",mode="APPEND",options = {
"aerospike.seedhost":seedhost,
"aerospike.port":port,
"aerospike.namespace":namespace,
"aerospike.user":"${%s_USER}" % name.upper(),
"aerospike.password":"${%s_PASSWORD}" % name.upper(),
"aerospike.set":setname
})
self.auth = auth
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
self.name = name

def support_offline(self) -> bool:
return False

def support_online(self) -> bool:
return True

def get_required_properties(self) -> List[str]:
if self.auth:
return [self.name.upper() + "_USER", self.name.upper() + "_PASSWORD"]
return []
4 changes: 4 additions & 0 deletions feathr_project/feathr/registry/registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ def transformation_to_def(v: Transformation) -> dict:
elif isinstance(v, WindowAggTransformation):
ret = {
"defExpr": v.def_expr,
"def_expr": v.def_expr,
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
}
if v.agg_func:
ret["aggFunc"] = v.agg_func
ret["agg_func"] = v.agg_func

if v.window:
ret["window"] = v.window
if v.group_by:
ret["groupBy"] = v.group_by
ret["group_by"] = v.group_by
if v.filter:
ret["filter"] = v.filter
if v.limit:
Expand Down
102 changes: 100 additions & 2 deletions feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
from datetime import datetime, timedelta
from pathlib import Path
from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING,
DerivedFeature, Feature, FeatureAnchor, HdfsSource,
TypedKey, ValueType, WindowAggTransformation)
from feathr import FeathrClient
from feathr.definition.sink import CosmosDbSink, ElasticSearchSink
from feathr.definition.source import HdfsSource

import pytest
from click.testing import CliRunner
Expand All @@ -10,7 +15,7 @@
from feathr import FeathrClient
from feathr import FeatureQuery
from feathr import ObservationSettings
from feathr import RedisSink, HdfsSink, JdbcSink
from feathr import RedisSink, HdfsSink, JdbcSink,AerospikeSink
from feathr import TypedKey
from feathr import ValueType
from feathr.utils.job_utils import get_result_df
Expand Down Expand Up @@ -265,7 +270,6 @@ def test_feathr_materialize_to_cosmosdb():
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)


@pytest.mark.skip(reason="Marked as skipped as we need to setup resources for this test")
def test_feathr_materialize_to_es():
"""
Expand Down Expand Up @@ -295,7 +299,101 @@ def test_feathr_materialize_to_es():
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

@pytest.mark.skip(reason="Marked as skipped as we need to setup resources for this test")
def test_feathr_materialize_to_aerospike():
"""
Test FeathrClient() CosmosDbSink.
"""
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
# os.chdir(test_workspace_dir)
now = datetime.now()
# set workspace folder by time; make sure we don't have write conflict if there are many CI tests running
os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)])

client = FeathrClient(config_path="feathr_config.yaml")
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
batch_source = HdfsSource(name="nycTaxiBatchSource",
path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")

f_trip_distance = Feature(name="f_trip_distance",
feature_type=FLOAT, transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
feature_type=INT32,
transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")

features = [
f_trip_distance,
f_trip_time_duration,
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
transform="cast_float(trip_distance)>30"),
Feature(name="f_day_of_week",
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)"),
]


request_anchor = FeatureAnchor(name="request_features",
source=INPUT_CONTEXT,
features=features)

f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
feature_type=FLOAT,
input_features=[
f_trip_distance, f_trip_time_duration],
transform="f_trip_distance * f_trip_time_duration")

f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
feature_type=INT32,
input_features=[f_trip_time_duration],
transform="f_trip_time_duration % 10")

location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
agg_features = [Feature(name="avgfare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="AVG",
window="90d",
)),
Feature(name="maxfare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="MAX",
window="90d"))
]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
source=batch_source,
features=agg_features)

client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[
f_trip_time_distance, f_trip_time_rounded])


backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

now = datetime.now()
os.environ[f"aerospike_USER"] = "feathruser"
os.environ[f"aerospike_PASSWORD"] = "feathr"
as_sink = AerospikeSink(name="aerospike",seedhost="20.57.186.153", port="3000", namespace="test", setname="test")
settings = MaterializationSettings("nycTaxiTable",
sinks=[as_sink],
feature_names=[
"avgfare", "maxfea"],
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
backfill_time=backfill_time)
client.materialize_features(settings)
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
if __name__ == "__main__":
test_feathr_materialize_to_aerospike()
test_feathr_get_offline_features_to_sql()
test_feathr_materialize_to_cosmosdb()
46 changes: 45 additions & 1 deletion feathr_project/test/test_feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,51 @@ def test_feathr_register_features_e2e():
output_path=output_path)
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

def test_feathr_register_features_purview_e2e():
"""
This test will register features, get all the registered features, then query a set of already registered features.
"""

test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config_purview.yaml"))

# set output folder based on different runtime
now = datetime.now()
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".parquet"])
else:
output_path = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/output','_', str(now.minute), '_', str(now.second), ".parquet"])


client.register_features()
# Allow purview to process a bit
time.sleep(5)
# in CI test, the project name is set by the CI pipeline so we read it here
all_features = client.list_registered_features(project_name=client.project_name)
all_feature_names = [x['name'] for x in all_features]

assert 'f_is_long_trip_distance' in all_feature_names # test regular ones
assert 'f_trip_time_rounded' in all_feature_names # make sure derived features are there
assert 'f_location_avg_fare' in all_feature_names # make sure aggregated features are there
assert 'f_trip_time_rounded_plus' in all_feature_names # make sure derived features are there
assert 'f_trip_time_distance' in all_feature_names # make sure derived features are there

# Sync workspace from registry, will get all conf files back
client.get_features_from_registry(client.project_name)

feature_query = FeatureQuery(
feature_list=["f_location_avg_fare", "f_trip_time_rounded", "f_is_long_trip_distance"],
key=TypedKey(key_column="DOLocationID",key_column_type=ValueType.INT32))
settings = ObservationSettings(
observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04_with_index.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")
client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path)
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

def test_feathr_register_features_partially():
"""
This test will register full set of features into one project, then register another project in two partial registrations.
Expand Down Expand Up @@ -182,4 +227,3 @@ def test_feathr_get_features_from_registry():
assert len(total_conf_files) == 3



Loading