Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Append] Hopsworks client to work with Hive Engine #201

Merged
merged 3 commits into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions python/hsfs/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self._project_id = str(project_info["projectId"])

self._cert_key = None
self._cert_folder_base = None

if engine == "hive":
# On external Spark clients (Databricks, Spark Cluster),
Expand All @@ -80,11 +81,11 @@ def __init__(
credentials = self._get_credentials(self._project_id)
self._write_b64_cert_to_bytes(
str(credentials["kStore"]),
path=os.path.join(self._cert_folder, "keyStore.jks"),
path=self._get_jks_key_store_path(),
)
self._write_b64_cert_to_bytes(
str(credentials["tStore"]),
path=os.path.join(self._cert_folder, "trustStore.jks"),
path=self._get_jks_trust_store_path(),
)

self._cert_key = str(credentials["password"])
Expand All @@ -99,8 +100,8 @@ def _close(self):
return

# Clean up only on AWS
self._cleanup_file(os.path.join(self._cert_folder, "keyStore.jks"))
self._cleanup_file(os.path.join(self._cert_folder, "trustStore.jks"))
self._cleanup_file(self._get_jks_key_store_path())
self._cleanup_file(self._get_jks_trust_store_path())
self._cleanup_file(os.path.join(self._cert_folder, "material_passwd"))

try:
Expand All @@ -114,6 +115,12 @@ def _close(self):
pass
self._connected = False

def _get_jks_trust_store_path(self):
return os.path.join(self._cert_folder, "trustStore.jks")

def _get_jks_key_store_path(self):
return os.path.join(self._cert_folder, "keyStore.jks")

def _get_secret(self, secrets_store, secret_key=None, api_key_file=None):
"""Returns secret value from the AWS Secrets Manager or Parameter Store.

Expand Down
8 changes: 5 additions & 3 deletions python/hsfs/client/hopsworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ class Client(base.Client):
HDFS_USER = "HDFS_USER"
T_CERTIFICATE = "t_certificate"
K_CERTIFICATE = "k_certificate"
TRUSTSTORE_SUFFIX = "__tstore.key"
KEYSTORE_SUFFIX = "__kstore.key"
TRUSTSTORE_SUFFIX = "__tstore.jks"
KEYSTORE_SUFFIX = "__kstore.jks"
PEM_CA_CHAIN = "ca_chain.pem"

def __init__(self):
"""Initializes a client being run from a job/notebook directly on Hopsworks."""
self._base_url = self._get_hopsworks_rest_endpoint()
self._host, self._port = self._get_host_port_pair()

self._cert_key = util.get_cert_pw()
trust_store_path = self._get_trust_store_path()
hostname_verification = (
os.environ[self.REQUESTS_VERIFY]
Expand Down Expand Up @@ -76,7 +78,7 @@ def _write_ca_chain(self, ca_chain_path):
"""
Converts JKS trustore file into PEM to be compatible with Python libraries
"""
keystore_pw = util.get_cert_pw()
keystore_pw = self._cert_key
keystore_ca_cert = self._convert_jks_to_pem(
self._get_jks_key_store_path(), keystore_pw
)
Expand Down
8 changes: 1 addition & 7 deletions python/hsfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,7 @@ def connect(self):
client.init("hopsworks")

# init engine
engine.init(
self._engine,
self._host,
self._cert_folder,
self._project,
client.get_instance()._cert_key,
)
engine.init(self._engine)

self._feature_store_api = feature_store_api.FeatureStoreApi()
self._project_api = project_api.ProjectApi()
Expand Down
4 changes: 2 additions & 2 deletions python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
_engine = None


def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None):
def init(engine_type):
global _engine
if not _engine:
if engine_type == "spark":
Expand All @@ -34,7 +34,7 @@ def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None):
"missing in HSFS installation. Install with `pip install "
"hsfs[hive]`."
)
_engine = hive.Engine(host, cert_folder, project, cert_key)
_engine = hive.Engine()


def get_instance():
Expand Down
17 changes: 8 additions & 9 deletions python/hsfs/engine/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
# limitations under the License.
#

import os
import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine

from hsfs import client


class Engine:
def __init__(self, host, cert_folder, project, cert_key):
self._host = host
self._cert_folder = os.path.join(cert_folder, host, project)
self._cert_key = cert_key
def __init__(self):
pass

def sql(self, sql_query, feature_store, online_conn, dataframe_type):
if not online_conn:
Expand Down Expand Up @@ -70,14 +69,14 @@ def set_job_group(self, group_id, description):

def _create_hive_connection(self, feature_store):
return hive.Connection(
host=self._host,
host=client.get_instance()._host,
port=9085,
# database needs to be set every time, 'default' doesn't work in pyhive
database=feature_store,
auth="CERTIFICATES",
truststore=os.path.join(self._cert_folder, "trustStore.jks"),
keystore=os.path.join(self._cert_folder, "keyStore.jks"),
keystore_password=self._cert_key,
truststore=client.get_instance()._get_jks_trust_store_path(),
keystore=client.get_instance()._get_jks_key_store_path(),
keystore_password=client.get_instance()._cert_key,
)

def _create_mysql_connection(self, online_conn):
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_cert_pw():
Returns:
Certificate password
"""
hadoop_user_name = "hadoop_user_name"
hadoop_user_name = "HADOOP_USER_NAME"
crypto_material_password = "material_passwd"
material_directory = "MATERIAL_DIRECTORY"
password_suffix = "__cert.key"
Expand Down