From cafe316c9b025547db79fee75df906380f4b93bf Mon Sep 17 00:00:00 2001 From: Moritz Meister <8422705+moritzmeister@users.noreply.github.com> Date: Mon, 21 Dec 2020 13:30:47 +0100 Subject: [PATCH] [Append] Hopsworks client to work with Hive Engine (#201) --- python/hsfs/client/external.py | 15 +++++++++++---- python/hsfs/client/hopsworks.py | 8 +++++--- python/hsfs/connection.py | 8 +------- python/hsfs/engine/__init__.py | 4 ++-- python/hsfs/engine/hive.py | 17 ++++++++--------- python/hsfs/util.py | 2 +- 6 files changed, 28 insertions(+), 26 deletions(-) diff --git a/python/hsfs/client/external.py b/python/hsfs/client/external.py index 3ffe52be40..58294bab37 100644 --- a/python/hsfs/client/external.py +++ b/python/hsfs/client/external.py @@ -69,6 +69,7 @@ def __init__( self._project_id = str(project_info["projectId"]) self._cert_key = None + self._cert_folder_base = None if engine == "hive": # On external Spark clients (Databricks, Spark Cluster), @@ -80,11 +81,11 @@ def __init__( credentials = self._get_credentials(self._project_id) self._write_b64_cert_to_bytes( str(credentials["kStore"]), - path=os.path.join(self._cert_folder, "keyStore.jks"), + path=self._get_jks_key_store_path(), ) self._write_b64_cert_to_bytes( str(credentials["tStore"]), - path=os.path.join(self._cert_folder, "trustStore.jks"), + path=self._get_jks_trust_store_path(), ) self._cert_key = str(credentials["password"]) @@ -99,8 +100,8 @@ def _close(self): return # Clean up only on AWS - self._cleanup_file(os.path.join(self._cert_folder, "keyStore.jks")) - self._cleanup_file(os.path.join(self._cert_folder, "trustStore.jks")) + self._cleanup_file(self._get_jks_key_store_path()) + self._cleanup_file(self._get_jks_trust_store_path()) self._cleanup_file(os.path.join(self._cert_folder, "material_passwd")) try: @@ -114,6 +115,12 @@ def _close(self): pass self._connected = False + def _get_jks_trust_store_path(self): + return os.path.join(self._cert_folder, "trustStore.jks") + + def _get_jks_key_store_path(self): + return os.path.join(self._cert_folder, "keyStore.jks") + def _get_secret(self, secrets_store, secret_key=None, api_key_file=None): """Returns secret value from the AWS Secrets Manager or Parameter Store. diff --git a/python/hsfs/client/hopsworks.py b/python/hsfs/client/hopsworks.py index 4f77a840df..c802e83947 100644 --- a/python/hsfs/client/hopsworks.py +++ b/python/hsfs/client/hopsworks.py @@ -39,14 +39,16 @@ class Client(base.Client): HDFS_USER = "HDFS_USER" T_CERTIFICATE = "t_certificate" K_CERTIFICATE = "k_certificate" - TRUSTSTORE_SUFFIX = "__tstore.key" - KEYSTORE_SUFFIX = "__kstore.key" + TRUSTSTORE_SUFFIX = "__tstore.jks" + KEYSTORE_SUFFIX = "__kstore.jks" PEM_CA_CHAIN = "ca_chain.pem" def __init__(self): """Initializes a client being run from a job/notebook directly on Hopsworks.""" self._base_url = self._get_hopsworks_rest_endpoint() self._host, self._port = self._get_host_port_pair() + + self._cert_key = util.get_cert_pw() trust_store_path = self._get_trust_store_path() hostname_verification = ( os.environ[self.REQUESTS_VERIFY] @@ -76,7 +78,7 @@ def _write_ca_chain(self, ca_chain_path): """ Converts JKS trustore file into PEM to be compatible with Python libraries """ - keystore_pw = util.get_cert_pw() + keystore_pw = self._cert_key keystore_ca_cert = self._convert_jks_to_pem( self._get_jks_key_store_path(), keystore_pw ) diff --git a/python/hsfs/connection.py b/python/hsfs/connection.py index 6cd422b634..e53f6f5615 100644 --- a/python/hsfs/connection.py +++ b/python/hsfs/connection.py @@ -210,13 +210,7 @@ def connect(self): client.init("hopsworks") # init engine - engine.init( - self._engine, - self._host, - self._cert_folder, - self._project, - client.get_instance()._cert_key, - ) + engine.init(self._engine) self._feature_store_api = feature_store_api.FeatureStoreApi() self._project_api = project_api.ProjectApi() diff --git a/python/hsfs/engine/__init__.py b/python/hsfs/engine/__init__.py index 30cb5e06f5..a6cf5cb1a1 100644 --- a/python/hsfs/engine/__init__.py +++ b/python/hsfs/engine/__init__.py @@ -20,7 +20,7 @@ _engine = None -def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None): +def init(engine_type): global _engine if not _engine: if engine_type == "spark": @@ -34,7 +34,7 @@ def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None): "missing in HSFS installation. Install with `pip install " "hsfs[hive]`." ) - _engine = hive.Engine(host, cert_folder, project, cert_key) + _engine = hive.Engine() def get_instance(): diff --git a/python/hsfs/engine/hive.py b/python/hsfs/engine/hive.py index 7c3d64b28a..66a77e0ecc 100644 --- a/python/hsfs/engine/hive.py +++ b/python/hsfs/engine/hive.py @@ -14,17 +14,16 @@ # limitations under the License. # -import os import pandas as pd from pyhive import hive from sqlalchemy import create_engine +from hsfs import client + class Engine: - def __init__(self, host, cert_folder, project, cert_key): - self._host = host - self._cert_folder = os.path.join(cert_folder, host, project) - self._cert_key = cert_key + def __init__(self): + pass def sql(self, sql_query, feature_store, online_conn, dataframe_type): if not online_conn: @@ -70,14 +69,14 @@ def set_job_group(self, group_id, description): def _create_hive_connection(self, feature_store): return hive.Connection( - host=self._host, + host=client.get_instance()._host, port=9085, # database needs to be set every time, 'default' doesn't work in pyhive database=feature_store, auth="CERTIFICATES", - truststore=os.path.join(self._cert_folder, "trustStore.jks"), - keystore=os.path.join(self._cert_folder, "keyStore.jks"), - keystore_password=self._cert_key, + truststore=client.get_instance()._get_jks_trust_store_path(), + keystore=client.get_instance()._get_jks_key_store_path(), + keystore_password=client.get_instance()._cert_key, ) def _create_mysql_connection(self, online_conn): diff --git a/python/hsfs/util.py b/python/hsfs/util.py index a116cfc21a..90679c0496 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -52,7 +52,7 @@ def get_cert_pw(): Returns: Certificate password """ - hadoop_user_name = "hadoop_user_name" + hadoop_user_name = "HADOOP_USER_NAME" crypto_material_password = "material_passwd" material_directory = "MATERIAL_DIRECTORY" password_suffix = "__cert.key"