From d574c698a3ff36ff311d52d9e33e486c361d7923 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 21 Dec 2020 10:42:38 +0100 Subject: [PATCH 1/3] fix hopsworks client cert_key --- python/hsfs/client/hopsworks.py | 4 +++- python/hsfs/util.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/hsfs/client/hopsworks.py b/python/hsfs/client/hopsworks.py index 4f77a840df..6092394b96 100644 --- a/python/hsfs/client/hopsworks.py +++ b/python/hsfs/client/hopsworks.py @@ -47,6 +47,8 @@ def __init__(self): """Initializes a client being run from a job/notebook directly on Hopsworks.""" self._base_url = self._get_hopsworks_rest_endpoint() self._host, self._port = self._get_host_port_pair() + + self._cert_key = util.get_cert_pw() trust_store_path = self._get_trust_store_path() hostname_verification = ( os.environ[self.REQUESTS_VERIFY] @@ -76,7 +78,7 @@ def _write_ca_chain(self, ca_chain_path): """ Converts JKS trustore file into PEM to be compatible with Python libraries """ - keystore_pw = util.get_cert_pw() + keystore_pw = self._cert_key keystore_ca_cert = self._convert_jks_to_pem( self._get_jks_key_store_path(), keystore_pw ) diff --git a/python/hsfs/util.py b/python/hsfs/util.py index a116cfc21a..90679c0496 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -52,7 +52,7 @@ def get_cert_pw(): Returns: Certificate password """ - hadoop_user_name = "hadoop_user_name" + hadoop_user_name = "HADOOP_USER_NAME" crypto_material_password = "material_passwd" material_directory = "MATERIAL_DIRECTORY" password_suffix = "__cert.key" From 88d7ad3bfcf7f8120f3d240206fd7032f3c8d703 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 21 Dec 2020 11:04:43 +0100 Subject: [PATCH 2/3] fix keystore truststore suffix --- python/hsfs/client/hopsworks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/client/hopsworks.py b/python/hsfs/client/hopsworks.py index 6092394b96..c802e83947 100644 --- a/python/hsfs/client/hopsworks.py +++ b/python/hsfs/client/hopsworks.py @@ -39,8 +39,8 @@ class Client(base.Client): HDFS_USER = "HDFS_USER" T_CERTIFICATE = "t_certificate" K_CERTIFICATE = "k_certificate" - TRUSTSTORE_SUFFIX = "__tstore.key" - KEYSTORE_SUFFIX = "__kstore.key" + TRUSTSTORE_SUFFIX = "__tstore.jks" + KEYSTORE_SUFFIX = "__kstore.jks" PEM_CA_CHAIN = "ca_chain.pem" def __init__(self): From 7b5f7ec55edb8c9d641f269f3a96d5fc2de53b99 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 21 Dec 2020 11:40:24 +0100 Subject: [PATCH 3/3] get cert info only from client --- python/hsfs/client/external.py | 15 +++++++++++---- python/hsfs/connection.py | 8 +------- python/hsfs/engine/__init__.py | 4 ++-- python/hsfs/engine/hive.py | 17 ++++++++--------- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/python/hsfs/client/external.py b/python/hsfs/client/external.py index 3ffe52be40..58294bab37 100644 --- a/python/hsfs/client/external.py +++ b/python/hsfs/client/external.py @@ -69,6 +69,7 @@ def __init__( self._project_id = str(project_info["projectId"]) self._cert_key = None + self._cert_folder_base = None if engine == "hive": # On external Spark clients (Databricks, Spark Cluster), @@ -80,11 +81,11 @@ def __init__( credentials = self._get_credentials(self._project_id) self._write_b64_cert_to_bytes( str(credentials["kStore"]), - path=os.path.join(self._cert_folder, "keyStore.jks"), + path=self._get_jks_key_store_path(), ) self._write_b64_cert_to_bytes( str(credentials["tStore"]), - path=os.path.join(self._cert_folder, "trustStore.jks"), + path=self._get_jks_trust_store_path(), ) self._cert_key = str(credentials["password"]) @@ -99,8 +100,8 @@ def _close(self): return # Clean up only on AWS - self._cleanup_file(os.path.join(self._cert_folder, "keyStore.jks")) - self._cleanup_file(os.path.join(self._cert_folder, "trustStore.jks")) + self._cleanup_file(self._get_jks_key_store_path()) + self._cleanup_file(self._get_jks_trust_store_path()) self._cleanup_file(os.path.join(self._cert_folder, "material_passwd")) try: @@ -114,6 +115,12 @@ def _close(self): pass self._connected = False + def _get_jks_trust_store_path(self): + return os.path.join(self._cert_folder, "trustStore.jks") + + def _get_jks_key_store_path(self): + return os.path.join(self._cert_folder, "keyStore.jks") + def _get_secret(self, secrets_store, secret_key=None, api_key_file=None): """Returns secret value from the AWS Secrets Manager or Parameter Store. diff --git a/python/hsfs/connection.py b/python/hsfs/connection.py index 6cd422b634..e53f6f5615 100644 --- a/python/hsfs/connection.py +++ b/python/hsfs/connection.py @@ -210,13 +210,7 @@ def connect(self): client.init("hopsworks") # init engine - engine.init( - self._engine, - self._host, - self._cert_folder, - self._project, - client.get_instance()._cert_key, - ) + engine.init(self._engine) self._feature_store_api = feature_store_api.FeatureStoreApi() self._project_api = project_api.ProjectApi() diff --git a/python/hsfs/engine/__init__.py b/python/hsfs/engine/__init__.py index 30cb5e06f5..a6cf5cb1a1 100644 --- a/python/hsfs/engine/__init__.py +++ b/python/hsfs/engine/__init__.py @@ -20,7 +20,7 @@ _engine = None -def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None): +def init(engine_type): global _engine if not _engine: if engine_type == "spark": @@ -34,7 +34,7 @@ def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None): "missing in HSFS installation. Install with `pip install " "hsfs[hive]`." ) - _engine = hive.Engine(host, cert_folder, project, cert_key) + _engine = hive.Engine() def get_instance(): diff --git a/python/hsfs/engine/hive.py b/python/hsfs/engine/hive.py index 7c3d64b28a..66a77e0ecc 100644 --- a/python/hsfs/engine/hive.py +++ b/python/hsfs/engine/hive.py @@ -14,17 +14,16 @@ # limitations under the License. # -import os import pandas as pd from pyhive import hive from sqlalchemy import create_engine +from hsfs import client + class Engine: - def __init__(self, host, cert_folder, project, cert_key): - self._host = host - self._cert_folder = os.path.join(cert_folder, host, project) - self._cert_key = cert_key + def __init__(self): + pass def sql(self, sql_query, feature_store, online_conn, dataframe_type): if not online_conn: @@ -70,14 +69,14 @@ def set_job_group(self, group_id, description): def _create_hive_connection(self, feature_store): return hive.Connection( - host=self._host, + host=client.get_instance()._host, port=9085, # database needs to be set every time, 'default' doesn't work in pyhive database=feature_store, auth="CERTIFICATES", - truststore=os.path.join(self._cert_folder, "trustStore.jks"), - keystore=os.path.join(self._cert_folder, "keyStore.jks"), - keystore_password=self._cert_key, + truststore=client.get_instance()._get_jks_trust_store_path(), + keystore=client.get_instance()._get_jks_key_store_path(), + keystore_password=client.get_instance()._cert_key, ) def _create_mysql_connection(self, online_conn):