From 2abc91fc45ee7f88b406e8a6a550ffba704e17db Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Wed, 28 Oct 2020 16:45:25 +0100 Subject: [PATCH 1/2] add label and docs --- auto_doc.py | 7 + python/hsfs/core/training_dataset_api.py | 5 +- python/hsfs/core/training_dataset_engine.py | 4 +- python/hsfs/feature.py | 12 ++ python/hsfs/feature_store.py | 96 +++++++++++-- python/hsfs/training_dataset.py | 146 ++++++++++++++++++-- python/hsfs/training_dataset_feature.py | 25 +++- 7 files changed, 264 insertions(+), 31 deletions(-) diff --git a/auto_doc.py b/auto_doc.py index 6a638076b6..52b1cd9fa3 100644 --- a/auto_doc.py +++ b/auto_doc.py @@ -88,6 +88,13 @@ def generate(dest_dir): project_url="https://github.com/logicalclocks/feature-store-api/blob/master/python", template_dir="./docs/templates", titles_size="###", + extra_aliases={ + "hsfs.core.query.Query": "hsfs.Query", + "hsfs.storage_connector.StorageConnector": "hsfs.StorageConnector", + "hsfs.statistics_config.StatisticsConfig": "hsfs.StatisticsConfig", + "hsfs.training_dataset_feature.TrainingDatasetFeature": "hsfs.TrainingDatasetFeature", + "pandas.core.frame.DataFrame": "pandas.DataFrame", + }, ) shutil.copyfile(hsfs_dir / "CONTRIBUTING.md", dest_dir / "CONTRIBUTING.md") shutil.copyfile(hsfs_dir / "README.md", dest_dir / "index.md") diff --git a/python/hsfs/core/training_dataset_api.py b/python/hsfs/core/training_dataset_api.py index ef5fb7d46a..519db3b98c 100644 --- a/python/hsfs/core/training_dataset_api.py +++ b/python/hsfs/core/training_dataset_api.py @@ -55,7 +55,7 @@ def get(self, name, version): _client._send_request("GET", path_params, query_params)[0], ) - def get_query(self, training_dataset_instance): + def get_query(self, training_dataset_instance, with_label): _client = client.get_instance() path_params = [ "project", @@ -66,4 +66,5 @@ def get_query(self, training_dataset_instance): training_dataset_instance.id, "query", ] - return _client._send_request("GET", path_params) + query_params = {"withLabel": with_label} + return _client._send_request("GET", path_params, query_params) diff --git a/python/hsfs/core/training_dataset_engine.py b/python/hsfs/core/training_dataset_engine.py index e26e228842..b357f28add 100644 --- a/python/hsfs/core/training_dataset_engine.py +++ b/python/hsfs/core/training_dataset_engine.py @@ -74,8 +74,8 @@ def read(self, training_dataset, split, user_read_options): path, ) - def query(self, training_dataset, online): - return self._training_dataset_api.get_query(training_dataset)[ + def query(self, training_dataset, online, with_label): + return self._training_dataset_api.get_query(training_dataset, with_label)[ "queryOnline" if online else "query" ] diff --git a/python/hsfs/feature.py b/python/hsfs/feature.py index 3898b67e48..a403071b08 100644 --- a/python/hsfs/feature.py +++ b/python/hsfs/feature.py @@ -55,6 +55,7 @@ def from_response_json(cls, json_dict): @property def name(self): + """Name of the feature.""" return self._name @name.setter @@ -63,6 +64,13 @@ def name(self, name): @property def type(self): + """Data type of the feature in the feature store. + + !!! danger "Not a Python type" + This type property is not to be confused with Python types. + The type property represents the actual data type of the feature in + the feature store. + """ return self._type @type.setter @@ -71,6 +79,7 @@ def type(self, type): @property def primary(self): + """Whether the feature is part of the primary key of the feature group.""" return self._primary @primary.setter @@ -79,6 +88,7 @@ def primary(self, primary): @property def partition(self): + """Whether the feature is part of the partition key of the feature group.""" return self._partition @partition.setter @@ -87,6 +97,8 @@ def partition(self, partition): @property def default_value(self): + """Default value of the feature as string, if the feature was appended to the + feature group.""" return self._default_value @default_value.setter diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 246938bc83..3cc0103078 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -16,9 +16,15 @@ import warnings import humps -from typing import Optional, Union, List +from typing import Optional, Union, List, Dict -from hsfs import training_dataset, feature_group, util, training_dataset_feature +from hsfs import ( + training_dataset, + feature_group, + util, + storage_connector, + training_dataset_feature, +) from hsfs.core import ( feature_group_api, storage_connector_api, @@ -163,10 +169,10 @@ def create_feature_group( DataFrame. # Arguments - name: Name of the feature group to get. + name: Name of the feature group to create. version: Version of the feature group to retrieve, defaults to `None` and will create the feature group with incremented version from the last - verison in the feature store. + version in the feature store. description: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -209,16 +215,79 @@ def create_feature_group( def create_training_dataset( self, - name, - version=None, - description="", - data_format="tfrecords", - storage_connector=None, - splits={}, - location="", - seed=None, - statistics_config=None, + name: str, + version: Optional[int] = None, + description: Optional[str] = "", + data_format: Optional[str] = "tfrecords", + storage_connector: Optional[storage_connector.StorageConnector] = None, + splits: Optional[Dict[str, float]] = {}, + location: Optional[str] = "", + seed: Optional[int] = None, + statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, + label: Optional[List[str]] = [], ): + """Create a training dataset metadata object. + + !!! note "Lazy" + This method is lazy and does not persist any metadata or feature data in the + feature store on its own. To materialize the training dataset and save + feature data along the metadata in the feature store, call the `save()` + method with a `DataFrame` or `Query`. + + !!! info "Data Formats" + The feature store currently supports the following data formats for + training datasets: + + 1. tfrecord + 2. csv + 3. tsv + 4. parquet + 5. avro + 6. orc + + Currently not supported petastorm, hdf5 and npy file formats. + + + # Arguments + name: Name of the training dataset to create. + version: Version of the training dataset to retrieve, defaults to `None` and + will create the training dataset with incremented version from the last + version in the feature store. + description: A string describing the contents of the training dataset to + improve discoverability for Data Scientists, defaults to empty string + `""`. + data_format: The data format used to save the training dataset, + defaults to `"tfrecords"`-format. + storage_connector: Storage connector defining the sink location for the + training dataset, defaults to `None`, and materializes training dataset + on HopsFS. + splits: A dictionary defining training dataset splits to be created. Keys in + the dictionary define the name of the split as `str`, values represent + percentage of samples in the split as `float`. Currently, only random + splits are supported. Defaults to empty dict`{}`, creating only a single + training dataset without splits. + location: Path to complement the sink storage connector with, e.g if the + storage connector points to an S3 bucket, this path can be used to + define a sub-directory inside the bucket to place the training dataset. + Defaults to `""`, saving the training dataset at the root defined by the + storage connector. + seed: Optionally, define a seed to create the random splits with, in order + to guarantee reproducability, defaults to `None`. + statistics_config: A configuration object, or a dictionary with keys + "`enabled`" to generally enable descriptive statistics computation for + this feature group, `"correlations`" to turn on feature correlation + computation and `"histograms"` to compute feature value frequencies. The + values should be booleans indicating the setting. To fully turn off + statistics computation pass `statistics_config=False`. Defaults to + `None` and will compute only descriptive statistics. + label: A list of feature names constituting the prediction label/feature of + the training dataset. When replaying a `Query` during model inference, + the label features can be omitted from the feature vector retrieval. + Defaults to `[]`, no label. + + # Returns: + `TrainingDataset`: The training dataset metadata object. + """ return training_dataset.TrainingDataset( name=name, version=version, @@ -230,4 +299,5 @@ def create_training_dataset( splits=splits, seed=seed, statistics_config=statistics_config, + label=label, ) diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index e65ccaccf3..7094a60b69 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -14,10 +14,13 @@ # limitations under the License. # -import humps import json import warnings -from typing import Optional +from typing import Optional, Union, Any, Dict, List, TypeVar + +import humps +import pandas as pd +import numpy as np from hsfs import util, engine, training_dataset_feature from hsfs.statistics_config import StatisticsConfig @@ -30,6 +33,7 @@ tfdata_engine, statistics_engine, ) +from hsfs.client import exceptions class TrainingDataset: @@ -62,6 +66,7 @@ def __init__( training_dataset_type=None, from_query=None, querydto=None, + label=None, ): self._id = id self._name = name @@ -96,6 +101,7 @@ def __init__( self.storage_connector = storage_connector self.splits = splits self.statistics_config = statistics_config + self._label = label else: # type available -> init from backend response # make rest call to get all connector information, description etc. @@ -109,9 +115,39 @@ def __init__( self._splits = splits self._training_dataset_type = training_dataset_type self.statistics_config = None + self._label = [feat.name for feat in self._features if feat.label] + + def save( + self, + features: Union[ + query.Query, + pd.DataFrame, + TypeVar("pyspark.sql.DataFrame"), # noqa: F821 + TypeVar("pyspark.RDD"), # noqa: F821 + np.ndarray, + List[list], + ], + write_options: Optional[Dict[Any, Any]] = {}, + ): + """Materialize the training dataset to storage. + + This method materializes the training dataset either from a Feature Store + `Query`, a Spark or Pandas `DataFrame`, a Spark RDD, two-dimensional Python + lists or Numpy ndarrays. + + # Arguments + features: Feature data to be materialized. + write_options: Additional write options as key/value pairs. + Defaults to `{}`. + + # Returns + `TrainingDataset`: The updated training dataset metadata object, the + previous `TrainingDataset` object on which you call `save` is also + updated. - def save(self, features, write_options={}): - # TODO: Decide if we want to have potentially dangerous defaults like {} + # Raises + `RestAPIError`: Unable to create training dataset metadata. + """ if isinstance(features, query.Query): feature_dataframe = features.read() self._querydto = features @@ -119,9 +155,12 @@ def save(self, features, write_options={}): feature_dataframe = engine.get_instance().convert_to_default_dataframe( features ) - self._features = engine.get_instance().parse_schema_training_dataset( - feature_dataframe - ) + + self._features = engine.get_instance().parse_schema_training_dataset( + feature_dataframe + ) + + self._set_label_features() user_version = self._version user_stats_config = self._statistics_config @@ -139,7 +178,41 @@ def save(self, features, write_options={}): ) return self - def insert(self, features, overwrite, write_options={}): + def insert( + self, + features: Union[ + query.Query, + pd.DataFrame, + TypeVar("pyspark.sql.DataFrame"), # noqa: F821 + TypeVar("pyspark.RDD"), # noqa: F821 + np.ndarray, + List[list], + ], + overwrite: bool, + write_options: Optional[Dict[Any, Any]] = {}, + ): + """Insert additional feature data into the training dataset. + + This method appends data to the training dataset either from a Feature Store + `Query`, a Spark or Pandas `DataFrame`, a Spark RDD, two-dimensional Python + lists or Numpy ndarrays. The schemas must match for this operation. + + This can also be used to overwrite all data in an existing training dataset. + + # Arguments + features: Feature data to be materialized. + overwrite: Whether to overwrite the entire data in the training dataset. + write_options: Additional write options as key/value pairs. + Defaults to `{}`. + + # Returns + `TrainingDataset`: The updated training dataset metadata object, the + previous `TrainingDataset` object on which you call `save` is also + updated. + + # Raises + `RestAPIError`: Unable to create training dataset metadata. + """ if isinstance(features, query.Query): feature_dataframe = features.read() else: @@ -153,6 +226,18 @@ def insert(self, features, overwrite, write_options={}): self.compute_statistics() def read(self, split=None, read_options={}): + """Read the training dataset into a dataframe. + + It is also possible to read only a specific split. + + # Arguments + split: Name of the split to read, defaults to `None`, reading the entire + training dataset. + read_options: Additional read options as key/value pairs, defaults to `{}`. + # Returns + `DataFrame`: The spark dataframe containing the feature data of the + training dataset. + """ return self._training_dataset_engine.read(self, split, read_options) def compute_statistics(self): @@ -196,7 +281,16 @@ def tf_data( cycle_length=cycle_length, ) - def show(self, n, split=None): + def show(self, n: int, split: str = None): + """Show the first `n` rows of the training dataset. + + You can specify a split from which to retrieve the rows. + + # Arguments + n: Number of rows to show. + split: Name of the split to show, defaults to `None`, showing the first rows + when taking all splits together. + """ self.read(split).show(n) def add_tag(self, name: str, value: str = None): @@ -438,7 +532,7 @@ def get_statistics(self, commit_time: str = None): @property def query(self): """Query to generate this training dataset from online feature store.""" - return self._training_dataset_engine.query(self, "online") + return self._training_dataset_engine.query(self, True) def get_query(self, online: bool = True): """Returns the query used to generate this training dataset @@ -446,9 +540,39 @@ def get_query(self, online: bool = True): # Arguments online: boolean, optional. Return the query for the online storage, else for offline storage, defaults to `True` - for online storage. + with_label: Indicator whether the query should contain features which were + marked as prediction label/feature when the training dataset was + created, defaults to `"False"`. # Returns `str`. Query string for the chosen storage used to generate this training dataset. """ - return self._training_dataset_engine.query(self, online) + return self._training_dataset_engine.query(self, online, with_label) + + @property + def label(self): + """The label/prediction feature of the training dataset. + + Can be a composite of multiple features. + """ + return self._label + + @label.setter + def label(self, label): + self._label = label + + def _set_label_features(self): + for f_name in self._label: + found = False + for f in self._features: + if f_name == f.name: + f.label = True + found = True + break + if not found: + raise exceptions.FeatureStoreException( + "The specified label `{}` could not be found among the features: {}.".format( + f_name, [feat.name for feat in self._features] + ) + ) diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index 220b72688e..d39ffe8a26 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -18,19 +18,19 @@ class TrainingDatasetFeature: - def __init__( - self, name, type, index=None, featuregroup=None, - ): + def __init__(self, name, type, index=None, featuregroup=None, label=False): self._name = name self._type = type self._index = index self._featuregroup = featuregroup + self._label = label def to_dict(self): return { "name": self._name, "type": self._type, "index": self._index, + "label": self._label, } @classmethod @@ -40,12 +40,31 @@ def from_response_json(cls, json_dict): @property def name(self): + """Name of the feature.""" return self._name @property def type(self): + """Data type of the feature in the feature store. + + !!! danger "Not a Python type" + This type property is not to be confused with Python types. + The type property represents the actual data type of the feature in + the feature store. + """ return self._type @property def index(self): + """Index of the feature in the training dataset, required to restore the correct + order of features.""" return self._index + + @property + def label(self): + """Indicator if the feature is part of the prediction label.""" + return self._label + + @label.setter + def label(self, label): + self._label = label From b8437760d4371f22c832dd3530a1fce70cefe646 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Fri, 6 Nov 2020 14:58:17 +0100 Subject: [PATCH 2/2] fix rebase --- python/hsfs/core/feature_group_engine.py | 6 ++++-- python/hsfs/training_dataset.py | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index d890d98052..c3018a992e 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -69,7 +69,9 @@ def save(self, feature_group, feature_dataframe, write_options): feature_group, feature_dataframe, self.APPEND, - hudi_engine.HudiEngine.HUDI_BULK_INSERT if feature_group.time_travel_format == "HUDI" else None, + hudi_engine.HudiEngine.HUDI_BULK_INSERT + if feature_group.time_travel_format == "HUDI" + else None, feature_group.online_enabled, None, offline_write_options, @@ -134,7 +136,7 @@ def commit_delete(feature_group, delete_df, write_options): def update_statistics_config(self, feature_group): """Update the statistics configuration of a feature group.""" - self._feature_group_api.update_statistics_config( + self._feature_group_api.update_metadata( feature_group, feature_group, "updateStatsSettings" ) diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 7094a60b69..f747e0297b 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -532,9 +532,9 @@ def get_statistics(self, commit_time: str = None): @property def query(self): """Query to generate this training dataset from online feature store.""" - return self._training_dataset_engine.query(self, True) + return self._training_dataset_engine.query(self, True, True) - def get_query(self, online: bool = True): + def get_query(self, online: bool = True, with_label: bool = False): """Returns the query used to generate this training dataset # Arguments @@ -542,7 +542,7 @@ def get_query(self, online: bool = True): for offline storage, defaults to `True` - for online storage. with_label: Indicator whether the query should contain features which were marked as prediction label/feature when the training dataset was - created, defaults to `"False"`. + created, defaults to `False`. # Returns `str`. Query string for the chosen storage used to generate this training