diff --git a/python/hopsworks/connection.py b/python/hopsworks/connection.py index 00ab51a7a4..136a60c8c0 100644 --- a/python/hopsworks/connection.py +++ b/python/hopsworks/connection.py @@ -1,6 +1,7 @@ import os -import functools +from requests.exceptions import ConnectionError +from hopsworks import util, engine from hopsworks.core import client, feature_store_api @@ -31,6 +32,7 @@ def __init__( self._hostname_verification = ( hostname_verification or self.HOSTNAME_VERIFICATION_DEFAULT ) + # what's the difference between trust store path and cert folder self._trust_store_path = trust_store_path self._cert_folder = cert_folder or self.CERT_FOLDER_DEFAULT self._api_key_file = api_key_file @@ -77,40 +79,26 @@ def connect(self): self._cert_folder, self._api_key_file, ) + engine.init( + "hive", self._host, self._cert_folder, self._client._cert_key + ) else: self._client = client.HopsworksClient() - except TypeError: + engine.init("spark") + self._feature_store_api = feature_store_api.FeatureStoreApi(self._client) + except (TypeError, ConnectionError): self._connected = False raise - self._feature_store_api = feature_store_api.FeatureStoreApi(self._client) print("CONNECTED") def close(self): self._client._close() self._feature_store_api = None - self._client = None + engine.stop() self._connected = False print("CONNECTION CLOSED") - def not_connected(fn): - @functools.wraps(fn) - def if_not_connected(inst, *args, **kwargs): - if inst._connected: - raise ConnectionError - return fn(inst, *args, **kwargs) - - return if_not_connected - - def connected(fn): - @functools.wraps(fn) - def if_connected(inst, *args, **kwargs): - if not inst._connected: - raise NoConnectionError - return fn(inst, *args, **kwargs) - - return if_connected - - @connected + @util.connected def get_feature_store(self, name=None): """Get a reference to a feature store, to perform operations on. @@ -124,6 +112,8 @@ def get_feature_store(self, name=None): """ if not name: name = self._client._project_name + "_featurestore" + # TODO: this won't work with multiple feature stores + engine.get_instance()._feature_store = name return self._feature_store_api.get(name) @property @@ -131,7 +121,7 @@ def host(self): return self._host @host.setter - @not_connected + @util.not_connected def host(self, host): self._host = host @@ -140,7 +130,7 @@ def port(self): return self._port @port.setter - @not_connected + @util.not_connected def port(self, port): self._port = port @@ -149,7 +139,7 @@ def project(self): return self._project @project.setter - @not_connected + @util.not_connected def project(self, project): self._project = project @@ -158,7 +148,7 @@ def region_name(self): return self._region_name @region_name.setter - @not_connected + @util.not_connected def region_name(self, region_name): self._region_name = region_name @@ -167,7 +157,7 @@ def secrets_store(self): return self._secrets_store @secrets_store.setter - @not_connected + @util.not_connected def secrets_store(self, secrets_store): self._secrets_store = secrets_store @@ -176,7 +166,7 @@ def hostname_verification(self): return self._hostname_verification @hostname_verification.setter - @not_connected + @util.not_connected def hostname_verification(self, hostname_verification): self._hostname_verification = hostname_verification @@ -185,7 +175,7 @@ def trust_store_path(self): return self._trust_store_path @trust_store_path.setter - @not_connected + @util.not_connected def trust_store_path(self, trust_store_path): self._trust_store_path = trust_store_path @@ -194,7 +184,7 @@ def cert_folder(self): return self._cert_folder @cert_folder.setter - @not_connected + @util.not_connected def cert_folder(self, cert_folder): self._cert_folder = cert_folder @@ -203,7 +193,7 @@ def api_key_file(self): return self._api_key_file @api_key_file.setter - @not_connected + @util.not_connected def api_key_file(self, api_key_file): self._api_key_file = api_key_file @@ -215,7 +205,7 @@ def __exit__(self, type, value, traceback): self.close() -class ConnectionError(Exception): +class HopsworksConnectionError(Exception): """Thrown when attempted to change connection attributes while connected.""" def __init__(self): @@ -224,7 +214,7 @@ def __init__(self): ) -class NoConnectionError(Exception): +class NoHopsworksConnectionError(Exception): """Thrown when attempted to perform operation on connection while not connected.""" def __init__(self): diff --git a/python/hopsworks/core/client.py b/python/hopsworks/core/client.py index 9c80e61887..e39e406f7d 100644 --- a/python/hopsworks/core/client.py +++ b/python/hopsworks/core/client.py @@ -13,6 +13,8 @@ import requests import urllib3 +from hopsworks import util + urllib3.disable_warnings(urllib3.exceptions.SecurityWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -102,6 +104,7 @@ def _read_jwt(self): with open(self.TOKEN_FILE, "r") as jwt: return jwt.read() + @util.connected def _send_request( self, method, path_params, query_params=None, headers=None, data=None ): @@ -158,7 +161,7 @@ def _send_request( def _close(self): """Closes a client. Can be implemented for clean up purposes, not mandatory.""" - pass + self._connected = False class HopsworksClient(BaseClient): @@ -172,7 +175,7 @@ class HopsworksClient(BaseClient): def __init__(self): """Initializes a client being run from a job/notebook directly on Hopsworks.""" self._base_url = self._get_hopsworks_rest_endpoint() - host, port = self._get_host_port_pair() + self._host, self._port = self._get_host_port_pair() trust_store_path = ( os.environ[self.DOMAIN_CA_TRUSTSTORE_PEM] if self.DOMAIN_CA_TRUSTSTORE_PEM in os.environ @@ -187,10 +190,12 @@ def __init__(self): self._project_name = self._project_name() self._auth = BearerAuth(self._read_jwt()) self._verify = self._get_verify( - host, port, hostname_verification, trust_store_path + self._host, self._port, hostname_verification, trust_store_path ) self._session = requests.session() + self._connected = True + def _get_hopsworks_rest_endpoint(self): """Get the hopsworks REST endpoint for making requests to the REST API.""" return os.environ[self.REST_ENDPOINT] @@ -240,7 +245,9 @@ def __init__( if not project: raise ExternalClientError("project") - self._base_url = "https://" + host + ":" + str(port) + self._host = host + self._port = port + self._base_url = "https://" + self._host + ":" + str(self._port) self._project_name = project self._region_name = region_name self._cert_folder = cert_folder @@ -250,8 +257,9 @@ def __init__( ) self._session = requests.session() + self._connected = True self._verify = self._get_verify( - host, port, hostname_verification, trust_store_path + self._host, self._port, hostname_verification, trust_store_path ) project_info = self._get_project_info(self._project_name) @@ -259,10 +267,12 @@ def __init__( credentials = self._get_credentials(self._project_id) self._write_b64_cert_to_bytes( - str(credentials["kStore"]), path=os.path.join(cert_folder, "keyStore.jks") + str(credentials["kStore"]), + path=os.path.join(self._cert_folder, "keyStore.jks"), ) self._write_b64_cert_to_bytes( - str(credentials["tStore"]), path=os.path.join(cert_folder, "trustStore.jks") + str(credentials["tStore"]), + path=os.path.join(self._cert_folder, "trustStore.jks"), ) self._cert_key = str(credentials["password"]) @@ -271,6 +281,7 @@ def _close(self): """Closes a client and deletes certificates.""" self._cleanup_file(os.path.join(self._cert_folder, "keyStore.jks")) self._cleanup_file(os.path.join(self._cert_folder, "trustStore.jks")) + self._connected = False def _get_secret(self, secrets_store, secret_key=None, api_key_file=None): """Returns secret value from the AWS Secrets Manager or Parameter Store. diff --git a/python/hopsworks/core/feature_group_api.py b/python/hopsworks/core/feature_group_api.py new file mode 100644 index 0000000000..0f596ea7d9 --- /dev/null +++ b/python/hopsworks/core/feature_group_api.py @@ -0,0 +1,30 @@ +from hopsworks import feature_group + + +class FeatureGroupApi: + def __init__(self, client, feature_store_id): + # or should the client be passed with every call to the API + self._client = client + self._feature_store_id = feature_store_id + + def get(self, name, version): + """Get feature store with specific id or name. + + :param identifier: id or name of the feature store + :type identifier: int, str + :return: the featurestore metadata + :rtype: FeatureStore + """ + path_params = [ + "project", + self._client._project_id, + "featurestores", + self._feature_store_id, + "featuregroups", + name, + ] + query_params = {"version": version} + return feature_group.FeatureGroup.from_response_json( + self._client, + self._client._send_request("GET", path_params, query_params)[0], + ) diff --git a/python/hopsworks/core/feature_store_api.py b/python/hopsworks/core/feature_store_api.py index e5d9d83a91..988b0df9f9 100644 --- a/python/hopsworks/core/feature_store_api.py +++ b/python/hopsworks/core/feature_store_api.py @@ -15,5 +15,5 @@ def get(self, identifier): """ path_params = ["project", self._client._project_id, "featurestores", identifier] return FeatureStore.from_response_json( - self._client._send_request("GET", path_params) + self._client, self._client._send_request("GET", path_params) ) diff --git a/python/hopsworks/core/query.py b/python/hopsworks/core/query.py new file mode 100644 index 0000000000..3f6d603497 --- /dev/null +++ b/python/hopsworks/core/query.py @@ -0,0 +1,24 @@ +import json + +from hopsworks import util, engine + + +class Query: + def __init__( + self, query_constructor_api, left_feature_group, left_features, joins=None, + ): + self._left_feature_group = left_feature_group + self._left_features = left_features + self._joins = joins + self._query_constructor_api = query_constructor_api + + def read(self, dataframe_type="default"): + sql_query = self._query_constructor_api.construct_query(self)["query"] + return engine.get_instance().sql(sql_query, dataframe_type) + + def show(self, n): + sql_query = self._query_constructor_api.construct_query(self)["query"] + return engine.get_instance().show(sql_query, n) + + def json(self): + return json.dumps(self, cls=util.QueryEncoder) diff --git a/python/hopsworks/core/query_constructor_api.py b/python/hopsworks/core/query_constructor_api.py new file mode 100644 index 0000000000..5b29d2ca4c --- /dev/null +++ b/python/hopsworks/core/query_constructor_api.py @@ -0,0 +1,10 @@ +class QueryConstructorApi: + def __init__(self, client): + self._client = client + + def construct_query(self, query): + path_params = ["project", self._client._project_id, "featurestores", "query"] + headers = {"content-type": "application/json"} + return self._client._send_request( + "PUT", path_params, headers=headers, data=query.json() + ) diff --git a/python/hopsworks/engine/__init__.py b/python/hopsworks/engine/__init__.py new file mode 100644 index 0000000000..2b7be58333 --- /dev/null +++ b/python/hopsworks/engine/__init__.py @@ -0,0 +1,24 @@ +from hopsworks.engine import spark, hive + +_engine = None + + +def init(engine_type, host=None, cert_folder=None, cert_key=None): + global _engine + if not _engine: + if engine_type == "spark": + _engine = spark.Engine() + elif engine_type == "hive": + _engine = hive.Engine(host, cert_folder, cert_key) + + +def get_instance(): + global _engine + if _engine: + return _engine + raise Exception("Couldn't find execution engine. Try reconnecting to Hopsworks.") + + +def stop(): + global _engine + _engine = None diff --git a/python/hopsworks/engine/hive.py b/python/hopsworks/engine/hive.py new file mode 100644 index 0000000000..e419e85f82 --- /dev/null +++ b/python/hopsworks/engine/hive.py @@ -0,0 +1,46 @@ +import os +import pandas as pd +from pyhive import hive + + +class Engine: + def __init__(self, host, cert_folder, cert_key): + self._host = host + self._feature_store = None + self._cert_folder = cert_folder + self._cert_key = cert_key + + def sql(self, sql_query, dataframe_type): + print("Lazily executing query: {}".format(sql_query)) + with self._create_hive_connection() as hive_conn: + result_df = pd.read_sql(sql_query, hive_conn) + return self._return_dataframe_type(result_df, dataframe_type) + + def show(self, sql_query, n): + return self.sql(sql_query, "default").head(n) + + def set_job_group(self, group_id, description): + pass + + def _create_hive_connection(self): + return hive.Connection( + host=self._host, + port=9085, + database=self._feature_store, + auth="CERTIFICATES", + truststore=os.path.join(self._cert_folder, "trustStore.jks"), + keystore=os.path.join(self._cert_folder, "keyStore.jks"), + keystore_password=self._cert_key, + ) + + def _return_dataframe_type(self, dataframe, dataframe_type): + if dataframe_type.lower() in ["default", "pandas"]: + return dataframe + if dataframe_type.lower() == "numpy": + return dataframe.values + if dataframe_type == "python": + return dataframe.values.tolist() + + raise TypeError( + "Dataframe type `{}` not supported on this platform.".format(dataframe_type) + ) diff --git a/python/hopsworks/engine/spark.py b/python/hopsworks/engine/spark.py new file mode 100644 index 0000000000..3d82e266ed --- /dev/null +++ b/python/hopsworks/engine/spark.py @@ -0,0 +1,37 @@ +# in case importing in %%local +try: + from pyspark.sql import SparkSession +except ModuleNotFoundError: + pass + + +class Engine: + def __init__(self): + self._spark_session = SparkSession.builder.getOrCreate() + self._feature_store = None + + def sql(self, sql_query, dataframe_type): + print("Lazily executing query: {}".format(sql_query)) + result_df = self._spark_session.sql(sql_query) + self.set_job_group("", "") + return self._return_dataframe_type(result_df, dataframe_type) + + def show(self, sql_query, n): + return self.sql(sql_query, "default").show(n) + + def set_job_group(self, group_id, description): + self._spark_session.sparkContext.setJobGroup(group_id, description) + + def _return_dataframe_type(self, dataframe, dataframe_type): + if dataframe_type.lower() in ["default", "spark"]: + return dataframe + if dataframe_type.lower() == "pandas": + return dataframe.toPandas() + if dataframe_type.lower() == "numpy": + return dataframe.toPandas().values + if dataframe_type == "python": + return dataframe.toPandas().values.tolist() + + raise TypeError( + "Dataframe type `{}` not supported on this platform.".format(dataframe_type) + ) diff --git a/python/hopsworks/feature.py b/python/hopsworks/feature.py new file mode 100644 index 0000000000..2111d660f7 --- /dev/null +++ b/python/hopsworks/feature.py @@ -0,0 +1,3 @@ +class Feature: + def __init__(self, name): + self._name = name diff --git a/python/hopsworks/feature_group.py b/python/hopsworks/feature_group.py index ac3c9d2e2a..8af463c407 100644 --- a/python/hopsworks/feature_group.py +++ b/python/hopsworks/feature_group.py @@ -1,20 +1,113 @@ -import json import humps +from hopsworks.core import query, query_constructor_api +from hopsworks import engine + class FeatureGroup: - def __init__(self): - pass + def __init__( + self, + client, + type, + featurestore_id, + featurestore_name, + description, + created, + creator, + version, + descriptive_statistics, + feature_correlation_matrix, + features_histogram, + cluster_analysis, + name, + id, + features, + location, + jobs, + featuregroup_type, + desc_stats_enabled, + feat_corr_enabled, + feat_hist_enabled, + cluster_analysis_enabled, + statistic_columns, + num_bins, + num_clusters, + corr_method, + hdfs_store_paths, + hive_table_id, + hive_table_type, + inode_id, + input_format, + online_featuregroup_enabled, + ): + self._type = type + self._feature_store_id = featurestore_id + self._feature_store_name = featurestore_name + self._description = description + self._created = created + self._creator = creator + self._version = version + self._descriptive_statistics = descriptive_statistics + self._feature_correlation_matrix = feature_correlation_matrix + self._features_histogram = features_histogram + self._cluster_analysis = cluster_analysis + self._name = name + self._id = id + self._features = features + self._location = location + self._jobs = jobs + self._feature_group_typ = featuregroup_type + self._desc_stats_enabled = desc_stats_enabled + self._feat_corr_enabled = feat_corr_enabled + self._feat_hist_enabled = feat_hist_enabled + self._cluster_analysis_enabled = cluster_analysis_enabled + self._statistic_columns = statistic_columns + self._num_bins = num_bins + self._num_clusters = num_clusters + self._corr_method = corr_method + self._hdfs_store_paths = hdfs_store_paths + self._hive_table_id = hive_table_id + self._hive_table_type = hive_table_type + self._inode_id = inode_id + self._input_format = input_format + self._online_feature_group_enabled = online_featuregroup_enabled - def sample(self, num_rows=None): - pass + self._query_constructor_api = query_constructor_api.QueryConstructorApi(client) - def features(self, feature_list=["*"]): - pass + def read(self, dataframe_type="default"): + """Get the feature group as a DataFrame.""" + engine.get_instance().set_job_group( + "Fetching Feature group", + "Getting feature group: {} from the featurestore {}".format( + self._name, self._feature_store_name + ), + ) + return self.select_all().read(dataframe_type) + + def show(self, n): + """Show the first n rows of the feature group.""" + engine.get_instance().set_job_group( + "Fetching Feature group", + "Getting feature group: {} from the featurestore {}".format( + self._name, self._feature_store_name + ), + ) + return self.select_all().show(n) + + def select_all(self): + """Select all features in the feature group and return a query object.""" + return query.Query(self._query_constructor_api, self, self._features) + + def select(self, features=[]): + return query.Query(self._query_constructor_api, self, features) @classmethod - def from_json(cls, json_str): - json_dict = json.loads(json_str) - # Json is coming from Java, convert it from camel case to snake case - json_dec = humps.decamelize(json_dict) - return cls(**json_dec) + def from_response_json(cls, client, json_dict): + json_decamelized = humps.decamelize(json_dict) + # TODO(Moritz): Later we can add a factory here to generate featuregroups depending on the type in the return json + # i.e. offline, online, on-demand + return cls(client, **json_decamelized) + + @classmethod + def new_featuregroup(cls): + pass diff --git a/python/hopsworks/feature_store.py b/python/hopsworks/feature_store.py index 5db0ff1b5e..5d1d67ec08 100644 --- a/python/hopsworks/feature_store.py +++ b/python/hopsworks/feature_store.py @@ -1,9 +1,12 @@ import humps +from hopsworks.core import feature_group_api + class FeatureStore: def __init__( self, + client, featurestore_id, featurestore_name, created, @@ -21,13 +24,13 @@ def __init__( mysql_server_endpoint, online_enabled, ): - self._feature_store_id = featurestore_id - self._feature_store_name = featurestore_name + self._id = featurestore_id + self._name = featurestore_name self._created = created self._hdfs_store_path = hdfs_store_path self._project_name = project_name self._project_id = project_id - self._feature_store_description = featurestore_description + self._description = featurestore_description self._inode_id = inode_id self._online_feature_store_type = online_featurestore_type self._online_feature_store_name = online_featurestore_name @@ -38,7 +41,12 @@ def __init__( self._mysql_server_endpoint = mysql_server_endpoint self._online_enabled = online_enabled + self._feature_group_api = feature_group_api.FeatureGroupApi(client, self._id) + @classmethod - def from_response_json(cls, json_dict): + def from_response_json(cls, client, json_dict): json_decamelized = humps.decamelize(json_dict) - return cls(**json_decamelized) + return cls(client, **json_decamelized) + + def get_feature_group(self, name, version): + return self._feature_group_api.get(name, version) diff --git a/python/hopsworks/util.py b/python/hopsworks/util.py new file mode 100644 index 0000000000..cbde6ead05 --- /dev/null +++ b/python/hopsworks/util.py @@ -0,0 +1,42 @@ +import json +import functools + + +from hopsworks import feature_group, feature, connection +from hopsworks.core import query + + +class QueryEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, feature.Feature): + return {"name": o._name} + elif isinstance(o, feature_group.FeatureGroup): + return {"id": o._id} + elif isinstance(o, query.Query): + return { + "leftFeatureGroup": o._left_feature_group, + "leftFeatures": o._left_features, + # "joins": o._joins, + } + else: + return super().default(o) + + +def not_connected(fn): + @functools.wraps(fn) + def if_not_connected(inst, *args, **kwargs): + if inst._connected: + raise connection.HopsworksConnectionError + return fn(inst, *args, **kwargs) + + return if_not_connected + + +def connected(fn): + @functools.wraps(fn) + def if_connected(inst, *args, **kwargs): + if not inst._connected: + raise connection.NoHopsworksConnectionError + return fn(inst, *args, **kwargs) + + return if_connected diff --git a/python/setup.py b/python/setup.py index 2152ec68f3..f56cd93676 100644 --- a/python/setup.py +++ b/python/setup.py @@ -21,7 +21,10 @@ def read(fname): "pyopenssl", "idna", "furl", - "boto3" + "boto3", + "pandas", + "numpy", + "pyhopshive[thrift]" ], extras_require={ "dev": [