From 4f7f13983bbefc5c10f35fa282f3d85be80d2883 Mon Sep 17 00:00:00 2001 From: Dhananjay Mukhedkar Date: Sat, 10 Sep 2022 13:01:20 +0200 Subject: [PATCH 1/3] add import data to feature group job --- utils/python/hsfs_utils.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index a925ab54f8..2ebe9e5787 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -142,6 +142,30 @@ def ge_validate(job_conf: Dict[Any, Any]) -> None: ) +def import_fg(job_conf: Dict[Any, Any]) -> None: + """ + Import data to a feature group using storage connector. + """ + print("====Input JSON config====", job_conf) + feature_store = job_conf.pop("feature_store") + fs = get_feature_store_handle(feature_store) + # retrieve connector + st = fs.get_storage_connector(name=job_conf["storageConnectorName"]) + # read data + spark_options = job_conf.pop("options") + df = st.read(query=job_conf["query"], options=spark_options) + # create fg and insert + fg = fs.get_or_create_feature_group( + name=job_conf["featureGroupName"], + version=job_conf["version"], + primary_key=job_conf["primaryKey"], + online_enabled=job_conf.pop("onlineEnabled", False) or False, + statistics_config=job_conf.pop("statisticsConfig", None) or None, + partition_key=job_conf.pop("partitionKey", []) or [], + ) + fg.insert(df) + + if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors # Otherwise the resource manager will wait until the spark application master @@ -158,6 +182,7 @@ def ge_validate(job_conf: Dict[Any, Any]) -> None: "create_fv_td", "compute_stats", "ge_validate", + "import_fg", ], help="Operation type", ) @@ -180,3 +205,5 @@ def ge_validate(job_conf: Dict[Any, Any]) -> None: compute_stats(job_conf) elif args.op == "ge_validate": ge_validate(job_conf) + elif args.op == "import_fg": + import_fg(job_conf) From 89c07fdc09d457f24057c55ecafc74109f045a78 Mon Sep 17 00:00:00 2001 From: Dhananjay Mukhedkar Date: Thu, 15 Sep 2022 10:19:38 +0200 Subject: [PATCH 2/3] add more fields support --- utils/python/hsfs_utils.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 2ebe9e5787..6b278834fe 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -8,6 +8,7 @@ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, _parse_datatype_string, StructField from hsfs.core import feature_view_engine +from hsfs.statistics_config import StatisticsConfig def read_job_conf(path: str) -> Dict[Any, Any]: @@ -146,22 +147,28 @@ def import_fg(job_conf: Dict[Any, Any]) -> None: """ Import data to a feature group using storage connector. """ - print("====Input JSON config====", job_conf) feature_store = job_conf.pop("feature_store") fs = get_feature_store_handle(feature_store) # retrieve connector st = fs.get_storage_connector(name=job_conf["storageConnectorName"]) - # read data + # first read data from connector spark_options = job_conf.pop("options") df = st.read(query=job_conf["query"], options=spark_options) + # store dataframe into feature group + if job_conf["statisticsConfig"]: + stat_config = StatisticsConfig.from_response_json(job_conf["statisticsConfig"]) + else: + stat_config = None # create fg and insert fg = fs.get_or_create_feature_group( name=job_conf["featureGroupName"], version=job_conf["version"], primary_key=job_conf["primaryKey"], online_enabled=job_conf.pop("onlineEnabled", False) or False, - statistics_config=job_conf.pop("statisticsConfig", None) or None, + statistics_config=stat_config, partition_key=job_conf.pop("partitionKey", []) or [], + description=job_conf["description"], + event_time=job_conf.pop("eventTime", None) or None, ) fg.insert(df) From 1fcf2983cf3f22230f79a0ddcd36fdf9a35e19d4 Mon Sep 17 00:00:00 2001 From: Dhananjay Mukhedkar Date: Sat, 17 Sep 2022 13:00:49 +0200 Subject: [PATCH 3/3] conditional query argument --- utils/python/hsfs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 6b278834fe..7b78696f25 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -153,7 +153,7 @@ def import_fg(job_conf: Dict[Any, Any]) -> None: st = fs.get_storage_connector(name=job_conf["storageConnectorName"]) # first read data from connector spark_options = job_conf.pop("options") - df = st.read(query=job_conf["query"], options=spark_options) + df = st.read(query=(job_conf.pop("query", "") or ""), options=spark_options) # store dataframe into feature group if job_conf["statisticsConfig"]: stat_config = StatisticsConfig.from_response_json(job_conf["statisticsConfig"])