Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming support #137

Merged
merged 5 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ batch_source = HdfsSource(
timestamp_format="yyyy-MM-dd HH:mm:ss") # Supports various fromats inculding epoch
```

### Beyond Features on Raw Data Sources - Derived Features
### Define features on top of other features - Derived Features

```python
# Compute a new feature(a.k.a. derived feature) on top of an existing feature
Expand All @@ -177,6 +177,49 @@ user_item_similarity = DerivedFeature(name="user_item_similarity",
transform="cosine_similarity(user_embedding, item_embedding)")
```

## Define Streaming Features

```python
# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
"type": "record",
"name": "DriverTrips",
"fields": [
{"name": "driver_id", "type": "long"},
{"name": "trips_today", "type": "int"},
{
"name": "datetime",
"type": {"type": "long", "logicalType": "timestamp-micros"}
}
]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
topics=["feathrcieventhub"],
schema=schema)
)

driver_id = TypedKey(key_column="driver_id",
key_column_type=ValueType.INT64,
description="driver id",
full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
source=stream_source,
features=[Feature(name="f_modified_streaming_count",
feature_type=INT32,
transform="trips_today + 1",
key=driver_id),
Feature(name="f_modified_streaming_count2",
feature_type=INT32,
transform="trips_today + 2",
key=driver_id)]
)

```

## Running Feathr Examples

Follow the [quick start Jupyter Notebook](./feathr_project/feathrcli/data/feathr_user_workspace/nyc_driver_demo.ipynb) to try it out. There is also a companion [quick start guide](./docs/quickstart.md) containing a bit more explanation on the notebook.
Expand All @@ -200,9 +243,10 @@ Follow the [quick start Jupyter Notebook](./feathr_project/feathrcli/data/feathr
- [x] Private Preview release
- [x] Public Preview release
- [ ] Future release
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
- [ ] Support streaming and online transformation
- [x] Support streaming
- [x] Support common data sources
- [ ] Support online transformation
- [ ] Support feature versioning
- [ ] Support more data sources

## Community Guidelines

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

ThisBuild / resolvers += Resolver.mavenLocal
ThisBuild / scalaVersion := "2.12.15"
ThisBuild / version := "0.1.0"
Expand All @@ -18,6 +17,7 @@ val localAndCloudDiffDependencies = Seq(
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"org.apache.avro" % "avro" % "1.8.2",
"org.apache.xbean" % "xbean-asm6-shaded" % "4.10",
"org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.1.2"
)

val cloudProvidedDeps = localAndCloudDiffDependencies.map(x => x % "provided")
Expand All @@ -35,7 +35,7 @@ val localAndCloudCommonDependencies = Seq(
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.5",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % "2.4.4",
"com.jasonclawson" % "jackson-dataformat-hocon" % "1.1.0",
"com.redislabs" %% "spark-redis" % "2.6.0",
"com.redislabs" %% "spark-redis" % "3.0.0",
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
"org.apache.xbean" % "xbean-asm6-shaded" % "4.10",
"com.google.protobuf" % "protobuf-java" % "3.19.4",
Expand Down
45 changes: 44 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ batch_source = HdfsSource(
timestamp_format="yyyy-MM-dd HH:mm:ss") # Supports various fromats inculding epoch
```

## Beyond Features on Raw Data Sources - Derived Features
## Define features on top of other features - Derived Features

```python
# Compute a new feature(a.k.a. derived feature) on top of an existing feature
Expand All @@ -171,6 +171,49 @@ user_item_similarity = DerivedFeature(name="user_item_similarity",
transform="cosine_similarity(user_embedding, item_embedding)")
```

## Define Streaming Features

```python
# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
"type": "record",
"name": "DriverTrips",
"fields": [
{"name": "driver_id", "type": "long"},
{"name": "trips_today", "type": "int"},
{
"name": "datetime",
"type": {"type": "long", "logicalType": "timestamp-micros"}
}
]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
topics=["feathrcieventhub"],
schema=schema)
)

driver_id = TypedKey(key_column="driver_id",
key_column_type=ValueType.INT64,
description="driver id",
full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
source=stream_source,
features=[Feature(name="f_modified_streaming_count",
feature_type=INT32,
transform="trips_today + 1",
key=driver_id),
Feature(name="f_modified_streaming_count2",
feature_type=INT32,
transform="trips_today + 2",
key=driver_id)]
)

```

## Cloud Architecture

Feathr has native integration with Azure and other cloud services, and here's the high-level architecture to help you get started.
Expand Down
45 changes: 44 additions & 1 deletion docs/concepts/feathr-capabilities.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ batch_source = HdfsSource(
timestamp_format="yyyy-MM-dd HH:mm:ss") # Supports various fromats inculding epoch
```

### Beyond Features on Raw Data Sources - Derived Features
### Define features on top of other features - Derived Features

```python
# Compute a new feature(a.k.a. derived feature) on top of an existing feature
Expand All @@ -122,3 +122,46 @@ user_item_similarity = DerivedFeature(name="user_item_similarity",
input_features=[user_embedding, item_embedding],
transform="cosine_similarity(user_embedding, item_embedding)")
```

### Define streaming features

```python
# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
"type": "record",
"name": "DriverTrips",
"fields": [
{"name": "driver_id", "type": "long"},
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
{"name": "trips_today", "type": "int"},
{
"name": "datetime",
"type": {"type": "long", "logicalType": "timestamp-micros"}
}
]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
topics=["feathrcieventhub"],
schema=schema)
)

driver_id = TypedKey(key_column="driver_id",
key_column_type=ValueType.INT64,
description="driver id",
full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
source=stream_source,
features=[Feature(name="f_modified_streaming_count",
feature_type=INT32,
transform="trips_today + 1",
key=driver_id),
Feature(name="f_modified_streaming_count2",
feature_type=INT32,
transform="trips_today + 2",
key=driver_id)]
)

```
74 changes: 74 additions & 0 deletions docs/concepts/feature-generation.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,77 @@ res = client.get_online_features('nycTaxiDemoFeature', '265', [
```

After we finish running the materialization job, we can get the online features by querying the feature name, with the corresponding keys. In the exmaple above, we query the online features called `f_location_avg_fare` and `f_location_max_fare`, and query with a key `265` (which is the location ID).

## Streaming feature ingestion
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved

1. Define Kafka streaming input source

```python
# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
"type": "record",
"name": "DriverTrips",
"fields": [
{"name": "driver_id", "type": "long"},
{"name": "trips_today", "type": "int"},
{
"name": "datetime",
"type": {"type": "long", "logicalType": "timestamp-micros"}
}
]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
topics=["feathrcieventhub"],
schema=schema)
)
```

2. Define feature definition with the Kafka source

```python
driver_id = TypedKey(key_column="driver_id",
key_column_type=ValueType.INT64,
description="driver id",
full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
source=stream_source,
features=[Feature(name="f_modified_streaming_count",
feature_type=INT32,
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
transform="trips_today + 1",
key=driver_id),
Feature(name="f_modified_streaming_count2",
feature_type=INT32,
transform="trips_today + 2",
key=driver_id)]
)

```
Note that only Feathr ExpressionTransformation is allowed in streaming anchor at the moment.
Other transformations support are in the roadmap.

3. Start streaming job

```python
redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=10000)
settings = MaterializationSettings(name="kafkaSampleDemo",
sinks=[redisSink],
feature_names=['f_modified_streaming_count']
)
client.materialize_features(settings) # Will streaming for 10 seconds since streamingTimeoutMs is 10000
```
4. Fetch streaming feature values

```python

res = client.get_online_features('kafkaSampleDemoFeature', '1',
['f_modified_streaming_count'])
# Get features for multiple feature keys
res = client.multi_get_online_features('kafkaSampleDemoFeature',
['1', '2'],
['f_modified_streaming_count'])

```
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 2 additions & 3 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from .feature_derivations import *
from .anchor import *
from .feature import *
from .source import *
from .dtype import *
from .source import *
from .transformation import *
Expand All @@ -12,5 +11,5 @@
from .sink import RedisSink
from .query_feature_list import FeatureQuery
from .lookup_feature import LookupFeature
from .aggregation import Aggregation
from .feathr_configurations import SparkExecutionConfiguration
from .aggregation import Aggregation
from .feathr_configurations import SparkExecutionConfiguration
5 changes: 4 additions & 1 deletion feathr_project/feathr/_envvariableutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_environment_variable_with_default(self, *args):
try:
assert os.path.exists(os.path.abspath(self.config_path))
except:
logger.info("{} is not set and configuration file {} cannot be found. One of those shoudl be set." , env_keyword, self.config_path)
logger.info("{} is not set and configuration file {} cannot be found. One of those should be set." , env_keyword, self.config_path)

with open(os.path.abspath(self.config_path), 'r') as stream:
try:
Expand All @@ -42,6 +42,9 @@ def get_environment_variable_with_default(self, *args):
for arg in args:
yaml_layer = yaml_layer[arg]
return yaml_layer
except KeyError as exc:
logger.info(exc)
return ""
except yaml.YAMLError as exc:
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
logger.info(exc)

Expand Down
28 changes: 27 additions & 1 deletion feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir
self.s3_endpoint = envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_endpoint')

# Kafka configs
self.kafka_endpoint = envutils.get_environment_variable_with_default(
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
'streaming', 'kafka', 'kafka_endpoint')

# spark configs
self.output_num_parts = envutils.get_environment_variable_with_default(
'spark_config', 'spark_result_output_parts')
Expand Down Expand Up @@ -576,7 +580,8 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur
'--adls-config', self._get_adls_config_str(),
'--blob-config', self._get_blob_config_str(),
'--sql-config', self._get_sql_config_str(),
'--snowflake-config', self._get_snowflake_config_str()
'--snowflake-config', self._get_snowflake_config_str(),
'--kafka-config', self._get_kafka_config_str()
],
reference_files_path=[],
configuration=execution_configuratons,
Expand Down Expand Up @@ -685,3 +690,24 @@ def _get_snowflake_config_str(self):
JDBC_SF_PASSWORD: {JDBC_SF_PASSWORD}
""".format(JDBC_SF_URL=sf_url, JDBC_SF_USER=sf_user, JDBC_SF_PASSWORD=sf_password, JDBC_SF_ROLE=sf_role)
return config_str

def _get_kafka_config_str(self):
"""Construct the Kafka config string. The endpoint, access key, secret key, and other parameters can be set via
environment variables."""
kafka_endpoint = self.kafka_endpoint
# if kafka endpoint is set in the feathr_config, then we need other environment variables
# keys can't be only accessed through environment
access_key_name = _EnvVaraibleUtil.get_environment_variable('KAFKA_SHARED_ACCESS_KEY_NAME')
access_key = _EnvVaraibleUtil.get_environment_variable('KAFKA_SHARED_ACCESS_KEY')
username = _EnvVaraibleUtil.get_environment_variable('KAFKA_USERNAME')
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
# HOCCON format will be parsed by the Feathr job
config_str = """
ENDPOINT: "{ENDPOINT}"
SHARED_ACCESS_KEY_NAME: "{SHARED_ACCESS_KEY_NAME}"
SHARED_ACCESS_KEY: "{SHARED_ACCESS_KEY}"
USERNAME:"{USERNAME}"
""".format(ENDPOINT=kafka_endpoint,
SHARED_ACCESS_KEY_NAME=access_key_name,
SHARED_ACCESS_KEY=access_key,
USERNAME=username)
return config_str
Loading