Skip to content

Commit

Permalink
extract historical: Add functionality (#277)
Browse files Browse the repository at this point in the history
* extract historical: Add funcationality

* extract historical: Seperate rest endpoints
  • Loading branch information
aditya-nambiar authored Sep 12, 2023
1 parent 9ba599c commit 5dc6f87
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 12 deletions.
33 changes: 30 additions & 3 deletions docs/pages/api-reference/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,14 @@ This api is an asynchronous api that returns a request id and the path to the ou
* `timestamp_column: str` - The name of the column containing the timestamps.
* `format: str` - The format of the input data. Can be either "pandas", "csv", "json" or "parquet". Default is "pandas".
* `input_dataframe: Optional[pd.DataFrame]` - Dataframe containing the input features. Only relevant when format is "pandas".
* `input_bucket: Optional[str]` - The name of the S3 bucket containing the input data. Only relevant when format is "csv", "json" or "parquet".
* `input_prefix: Optional[str]` - The prefix of the S3 key containing the input data. Only relevant when format is "csv", "json" or "parquet".
* `output_bucket: Optional[str]` - The name of the S3 bucket where the output data should be stored.
* `output_prefix: Optional[str]` - The prefix of the S3 key where the output data should be stored.

The following parameters are only relevant when format is "csv", "json" or "parquet".

* `input_bucket: Optional[str]` - The name of the S3 bucket containing the input data.
* `input_prefix: Optional[str]` - The prefix of the S3 key containing the input data.
* ` feature_to_column_map (Optional[Dict[Feature, str]])`: A dictionary mapping features to column names.

**Returns:**

Expand All @@ -99,6 +105,7 @@ This api is an asynchronous api that returns a request id and the path to the ou
* output s3 path prefix
* completion rate.
* failure rate.
* status

A completion rate of 1.0 indicates that all processing has been completed.
A completion rate of 1.0 and a failure rate of 0.0 indicates that all processing has been completed successfully.
Expand All @@ -122,7 +129,6 @@ The response format of this function and the `extract_historical_features` funct

* `request_id: str` - The request ID returned by the `extract_historical_features` method. This ID uniquely identifies the feature extraction operation


**Returns:**

* `Dict[str, Any]` - A dictionary containing the following information:
Expand All @@ -131,6 +137,7 @@ The response format of this function and the `extract_historical_features` funct
* output s3 path prefix
* completion rate.
* failure rate.
* status

A completion rate of 1.0 indicates that all processing has been completed.
A completion rate of 1.0 and a failure rate of 0.0 indicates that all processing has been completed successfully.
Expand All @@ -142,3 +149,23 @@ client.extract_historical_features_progress(request_id='bf5dfe5d-0040-4405-a224-
>>> {'request_id': 'bf5dfe5d-0040-4405-a224-b82c7a5bf085', 'output_bucket': <bucket_name>, 'output_prefix': <output_prefix>, 'completion_rate': 0.76, 'failure_rate': 0.0}
```


### **extract_historical_cancel_request**

The `extract_historical_cancel_request` method allows users to cancel an extract_historical_features asynchronous operation.
The response format of this function and the `extract_historical_features` function are identical.&#x20;

**Arguments:**


* `request_id: str` - The request ID returned by the `extract_historical_features` method. This ID uniquely identifies the feature extraction operation

**Returns:**

* `Dict[str, Any]` - A dictionary containing the following information:
* request_id
* output s3 bucket
* output s3 path prefix
* completion rate.
* failure rate.
* status
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [0.18.1] - 2023-09-08
- Add support to specify output bucket and prefix for extract historical, and support to map output columns to different features.

## [0.18.0] - 2023-08-30
- Added support for Debezium data in Avro format via Kafka connector

Expand Down
60 changes: 56 additions & 4 deletions fennel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,11 @@ def extract_historical_features(
timestamp_column: str,
format: str = "pandas",
input_dataframe: Optional[pd.DataFrame] = None,
output_bucket: Optional[str] = None,
output_prefix: Optional[str] = None,
input_bucket: Optional[str] = None,
input_prefix: Optional[str] = None,
feature_to_column_map: Optional[Dict[Feature, str]] = None,
) -> Dict[str, Any]:
"""
Extract point in time correct features from a dataframe, where the
Expand All @@ -412,14 +415,21 @@ def extract_historical_features(
format (str): The format of the input data. Can be either "pandas",
"csv", "json" or "parquet". Default is "pandas".
input_dataframe (Optional[pd.DataFrame]): Dataframe containing the input features. Only relevant when format is "pandas".
input_bucket (Optional[str]): The name of the S3 bucket containing the input data. Only relevant when format is "csv", "json" or "parquet".
input_prefix (Optional[str]): The prefix of the S3 key containing the input data. Only relevant when format is "csv", "json" or "parquet".
output_bucket (Optional[str]): The name of the S3 bucket to store the output data.
output_prefix (Optional[str]): The prefix of the S3 key to store the output data.
The following parameters are only relevant when format is "csv", "json" or "parquet".
input_bucket (Optional[str]): The name of the S3 bucket containing the input data.
input_prefix (Optional[str]): The prefix of the S3 key containing the input data.
feature_to_column_map (Optional[Dict[Feature, str]]): A dictionary that maps columns in the S3 data to the required features.
Returns:
Dict[str, Any]: A dictionary containing the request_id, the output s3 bucket and prefix, the completion rate and the failure rate.
A completion rate of 1.0 indicates that all processing has been completed.
A failure rate of 0.0 indicates that all processing has been completed successfully.
The status of the request.
"""

if format not in ["pandas", "csv", "json", "parquet"]:
Expand Down Expand Up @@ -487,6 +497,23 @@ def extract_historical_features(
input_info["input_prefix"] = input_prefix
input_info["format"] = format.upper()
input_info["compression"] = "None"

if feature_to_column_map is not None:
if len(feature_to_column_map) != len(input_feature_names):
raise Exception(
"Column mapping does not contain all the required features. "
f"Required features: {input_feature_names}. "
f"Column mapping: {feature_to_column_map}"
)
for input_feature_name in input_feature_names:
if input_feature_name not in feature_to_column_map:
raise Exception(
f"Column mapping does not contain all the required features. Feature: {input_feature_name},"
f" not found in column mapping: {feature_to_column_map}"
)

input_info["column_mapping"] = feature_to_column_map # type: ignore

extract_historical_input["S3"] = input_info

output_feature_names = []
Expand All @@ -504,25 +531,50 @@ def extract_historical_features(
"input": extract_historical_input,
"timestamp_column": timestamp_column,
}
if output_bucket is not None:
req["output_bucket"] = output_bucket
if output_prefix is None:
raise Exception(
"Output prefix not specified, but output bucket is specified. "
)
req["output_prefix"] = output_prefix

return self._post_json(
"{}/extract_historical_features".format(V1_API), req
)

def extract_historical_features_progress(self, request_id):
"""
Get progress of extract historical features request.
Get the status of extract historical features request.
:param request_id: The request id returned by extract_historical_features.
Returns:
Dict[str, Any]: A dictionary containing the request_id, the output s3 bucket and prefix, the completion rate and the failure rate.
A completion rate of 1.0 indicates that all processing has been completed.
A failure rate of 0.0 indicates that all processing has been completed successfully.
The status of the request.
"""
req = {"request_id": request_id}
return self._post_json(
"{}/extract_historical_request/status".format(V1_API), req
)

def extract_historical_cancel_request(self, request_id):
"""
Cancel the extract historical features request.
:param request_id: The request id returned by extract_historical_features.
Returns:
Dict[str, Any]: A dictionary containing the request_id, the output s3 bucket and prefix, the completion rate and the failure rate.
A completion rate of 1.0 indicates that all processing has been completed.
A failure rate of 0.0 indicates that all processing has been completed successfully.
The status of the request.
"""
req = {"request_id": request_id}
return self._post_json(
"{}/extract_historical_progress".format(V1_API), req
"{}/extract_historical_request/cancel".format(V1_API), req
)

def lookup(
Expand Down
10 changes: 10 additions & 0 deletions fennel/client_tests/test_featureset.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ class TestSimpleExtractor(unittest.TestCase):
def test_get_age_and_name_features(self):
age = pd.Series([32, 24])
name = pd.Series(["John", "Rahul"])
assert UserInfoMultipleExtractor.all() == [
"UserInfoMultipleExtractor.userid",
"UserInfoMultipleExtractor.name",
"UserInfoMultipleExtractor.country_geoid",
"UserInfoMultipleExtractor.age",
"UserInfoMultipleExtractor.age_squared",
"UserInfoMultipleExtractor.age_cubed",
"UserInfoMultipleExtractor.is_name_common",
"UserInfoMultipleExtractor.age_reciprocal",
]
ts = pd.Series([datetime(2020, 1, 1), datetime(2020, 1, 1)])
df = UserInfoMultipleExtractor.get_age_and_name_features(
UserInfoMultipleExtractor.original_cls, ts, age, name
Expand Down
3 changes: 3 additions & 0 deletions fennel/featuresets/featureset.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ def _add_feature_names_as_attributes(self):
for feature in self._features:
setattr(self, feature.name, feature)

def all(self) -> List[Feature]:
return self._features

def _get_extractors(self) -> List[Extractor]:
extractors = []
for name, method in inspect.getmembers(self.__fennel_original_cls__):
Expand Down
7 changes: 7 additions & 0 deletions fennel/featuresets/test_featureset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def get_user_info(
view.add(UserInfoDataset)
view.add(UserInfo)
view.add(User)
assert UserInfo.all() == [
"UserInfo.userid",
"UserInfo.home_geoid",
"UserInfo.gender",
"UserInfo.age",
"UserInfo.income",
]
sync_request = view._get_sync_request_proto()
assert len(sync_request.feature_sets) == 2
assert len(sync_request.extractors) == 1
Expand Down
27 changes: 27 additions & 0 deletions fennel/gen/auth_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

118 changes: 118 additions & 0 deletions fennel/gen/auth_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
"""
import builtins
import google.protobuf.descriptor
import google.protobuf.internal.enum_type_wrapper
import sys
import typing

if sys.version_info >= (3, 10):
import typing as typing_extensions
else:
import typing_extensions

DESCRIPTOR: google.protobuf.descriptor.FileDescriptor

class _OrgPermission:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _OrgPermissionEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_OrgPermission.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
ORG_ALL: _OrgPermission.ValueType # 0
INVITE: _OrgPermission.ValueType # 1
"""Can invite users to the organization."""
CREATE_ROLE: _OrgPermission.ValueType # 2
"""Can create roles for the organization."""
EDIT_ROLE: _OrgPermission.ValueType # 3
"""Can edit roles for the organization."""
VIEW_ROLE: _OrgPermission.ValueType # 4
"""Can view roles for the organization."""
ASSIGN_ROLE: _OrgPermission.ValueType # 5
"""Can assign roles to users for the organization."""
PROVISION_TIER: _OrgPermission.ValueType # 6
"""Can provision tiers for the organization."""
DELETE_TIER: _OrgPermission.ValueType # 7
"""Can delete tiers for the organization."""
SET_DEFAULT_ROLE: _OrgPermission.ValueType # 8
"""Can choose a default role for the organization."""
ASSUME_IDENTITY: _OrgPermission.ValueType # 9
"""Can assume a user's identity for the organization and perform actions as them."""

class OrgPermission(_OrgPermission, metaclass=_OrgPermissionEnumTypeWrapper): ...

ORG_ALL: OrgPermission.ValueType # 0
INVITE: OrgPermission.ValueType # 1
"""Can invite users to the organization."""
CREATE_ROLE: OrgPermission.ValueType # 2
"""Can create roles for the organization."""
EDIT_ROLE: OrgPermission.ValueType # 3
"""Can edit roles for the organization."""
VIEW_ROLE: OrgPermission.ValueType # 4
"""Can view roles for the organization."""
ASSIGN_ROLE: OrgPermission.ValueType # 5
"""Can assign roles to users for the organization."""
PROVISION_TIER: OrgPermission.ValueType # 6
"""Can provision tiers for the organization."""
DELETE_TIER: OrgPermission.ValueType # 7
"""Can delete tiers for the organization."""
SET_DEFAULT_ROLE: OrgPermission.ValueType # 8
"""Can choose a default role for the organization."""
ASSUME_IDENTITY: OrgPermission.ValueType # 9
"""Can assume a user's identity for the organization and perform actions as them."""
global___OrgPermission = OrgPermission

class _TierPermission:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _TierPermissionEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_TierPermission.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
TIER_ALL: _TierPermission.ValueType # 0
MODIFY_ENVIRONMENT: _TierPermission.ValueType # 1
"""Can modify the tier's environment."""
ADD_TAGS: _TierPermission.ValueType # 2
"""Can add new tags to the tier."""
DELETE_TIER_ONLY: _TierPermission.ValueType # 3
"""Can delete ONLY the given tier."""
VIEW_TIER: _TierPermission.ValueType # 4
"""Can view the tier."""
TIER_ACCESS: _TierPermission.ValueType # 5
"""Can give other user's access to the tier."""
VIEW_ENTITY_DEFINITION: _TierPermission.ValueType # 6
"""Can view definitions of an entity that has a given tag."""
EDIT_ENTITY_DEFINITION: _TierPermission.ValueType # 7
"""Can edit definitions of an entity that has a given tag."""
READ_ENTITY_DATA: _TierPermission.ValueType # 8
"""Can read data for a given entity that has a given tag."""
WRITE_ENTITY_DATA: _TierPermission.ValueType # 9
"""Can write data for a given entity that has a given tag."""
EXTRACT_HISTORICAL_FEATURES: _TierPermission.ValueType # 10
"""Can run extract_historical_features on the tier."""

class TierPermission(_TierPermission, metaclass=_TierPermissionEnumTypeWrapper): ...

TIER_ALL: TierPermission.ValueType # 0
MODIFY_ENVIRONMENT: TierPermission.ValueType # 1
"""Can modify the tier's environment."""
ADD_TAGS: TierPermission.ValueType # 2
"""Can add new tags to the tier."""
DELETE_TIER_ONLY: TierPermission.ValueType # 3
"""Can delete ONLY the given tier."""
VIEW_TIER: TierPermission.ValueType # 4
"""Can view the tier."""
TIER_ACCESS: TierPermission.ValueType # 5
"""Can give other user's access to the tier."""
VIEW_ENTITY_DEFINITION: TierPermission.ValueType # 6
"""Can view definitions of an entity that has a given tag."""
EDIT_ENTITY_DEFINITION: TierPermission.ValueType # 7
"""Can edit definitions of an entity that has a given tag."""
READ_ENTITY_DATA: TierPermission.ValueType # 8
"""Can read data for a given entity that has a given tag."""
WRITE_ENTITY_DATA: TierPermission.ValueType # 9
"""Can write data for a given entity that has a given tag."""
EXTRACT_HISTORICAL_FEATURES: TierPermission.ValueType # 10
"""Can run extract_historical_features on the tier."""
global___TierPermission = TierPermission
Loading

0 comments on commit 5dc6f87

Please sign in to comment.