Skip to content

Commit

Permalink
Python feature group (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Jun 30, 2020
1 parent ccf36a0 commit c714d33
Show file tree
Hide file tree
Showing 19 changed files with 860 additions and 154 deletions.
52 changes: 9 additions & 43 deletions python/hsfs/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

import os
import socket
from OpenSSL import SSL
from cryptography import x509
from cryptography.x509.oid import NameOID
import idna
import furl
from abc import ABC, abstractmethod

Expand All @@ -44,57 +40,25 @@ def __init__(self):
"""To be implemented by clients."""
pass

def _get_verify(self, host, port, verify, trust_store_path):
def _get_verify(self, verify, trust_store_path):
"""Get verification method for sending HTTP requests to Hopsworks.
Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c
:param host: hopsworks hostname
:type host: str
:param port: hopsworks port
:type port: str or int
:param verify: perform hostname verification, 'true' or 'false'
:type verify: str
:param trust_store_path: path of the truststore locally if it was uploaded manually to
the external environment such as AWS Sagemaker
:type trust_store_path: str
:return: if env var HOPS_UTIL_VERIFY is not false
then if hopsworks certificate is self-signed, return the path to the truststore (PEM)
else if hopsworks is not self-signed, return true
return false
:return: if verify is true and the truststore is provided, then return the trust store location
if verify is true but the truststore wasn't provided, then return true
if verify is false, then return false
:rtype: str or boolean
"""
if verify == "true":

hostname_idna = idna.encode(host)
sock = socket.socket()

sock.connect((host, int(port)))
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.check_hostname = False
ctx.verify_mode = SSL.VERIFY_NONE

sock_ssl = SSL.Connection(ctx, sock)
sock_ssl.set_connect_state()
sock_ssl.set_tlsext_host_name(hostname_idna)
sock_ssl.do_handshake()
cert = sock_ssl.get_peer_certificate()
crypto_cert = cert.to_cryptography()
sock_ssl.close()
sock.close()

try:
commonname = crypto_cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME
)[0].value
issuer = crypto_cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[
0
].value
if commonname == issuer and trust_store_path:
return trust_store_path
else:
return True
except x509.ExtensionNotFound:
if trust_store_path != None:
return trust_store_path
else:
return True

return False
Expand Down Expand Up @@ -171,6 +135,8 @@ def _send_request(
if response.status_code == 401 and self.REST_ENDPOINT in os.environ:
# refresh token and retry request - only on hopsworks
self._auth = auth.BearerAuth(self._read_jwt())
# Update request with the new token
request.auth = self._auth
prepped = self._session.prepare_request(request)
response = self._session.send(prepped, verify=self._verify, stream=stream)

Expand Down
4 changes: 4 additions & 0 deletions python/hsfs/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class UnknownSecretStorageError(Exception):
"""This exception will be raised if an unused secrets storage is passed as a parameter."""


class FeatureStoreException(Exception):
"""Generic feature store exception"""


class ExternalClientError(TypeError):
"""Raised when external client cannot be initialized due to missing arguments."""

Expand Down
4 changes: 1 addition & 3 deletions python/hsfs/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def __init__(

self._session = requests.session()
self._connected = True
self._verify = self._get_verify(
self._host, self._port, hostname_verification, trust_store_path
)
self._verify = self._get_verify(self._host, trust_store_path)

project_info = self._get_project_info(self._project_name)
self._project_id = str(project_info["projectId"])
Expand Down
138 changes: 130 additions & 8 deletions python/hsfs/client/hopsworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,39 @@

import os
import requests
import textwrap
import base64

from pathlib import Path
from hsfs.client import base, auth

try:
import jks
except:
pass


class Client(base.Client):
REQUESTS_VERIFY = "REQUESTS_VERIFY"
DOMAIN_CA_TRUSTSTORE_PEM = "DOMAIN_CA_TRUSTSTORE_PEM"
PROJECT_ID = "HOPSWORKS_PROJECT_ID"
PROJECT_NAME = "HOPSWORKS_PROJECT_NAME"
HADOOP_USER_NAME = "HADOOP_USER_NAME"
MATERIAL_DIRECTORY = "MATERIAL_DIRECTORY"
CRYPTO_MATERIAL_PASSWORD = "material_passwd"
HDFS_USER = "HDFS_USER"
T_CERTIFICATE = "t_certificate"
K_CERTIFICATE = "k_certificate"
PASSWORD_SUFFIX = "__cert.key"
TRUSTSTORE_SUFFIX = "__tstore.key"
KEYSTORE_SUFFIX = "__kstore.key"
PEM_CA_CHAIN = "ca_chain.pem"

def __init__(self):
"""Initializes a client being run from a job/notebook directly on Hopsworks."""
self._base_url = self._get_hopsworks_rest_endpoint()
self._host, self._port = self._get_host_port_pair()
trust_store_path = (
os.environ[self.DOMAIN_CA_TRUSTSTORE_PEM]
if self.DOMAIN_CA_TRUSTSTORE_PEM in os.environ
else None
)
trust_store_path = self._get_trust_store_path()
hostname_verification = (
os.environ[self.REQUESTS_VERIFY]
if self.REQUESTS_VERIFY in os.environ
Expand All @@ -45,9 +57,7 @@ def __init__(self):
self._project_id = os.environ[self.PROJECT_ID]
self._project_name = self._project_name()
self._auth = auth.BearerAuth(self._read_jwt())
self._verify = self._get_verify(
self._host, self._port, hostname_verification, trust_store_path
)
self._verify = self._get_verify(hostname_verification, trust_store_path)
self._session = requests.session()

self._connected = True
Expand All @@ -56,6 +66,118 @@ def _get_hopsworks_rest_endpoint(self):
"""Get the hopsworks REST endpoint for making requests to the REST API."""
return os.environ[self.REST_ENDPOINT]

def _get_trust_store_path(self):
"""Convert truststore from jks to pem and return the location
"""
ca_chain_path = Path(self.PEM_CA_CHAIN)
if not ca_chain_path.exists():
self._write_ca_chain(ca_chain_path)
return str(ca_chain_path)

def _write_ca_chain(self, ca_chain_path):
"""
Converts JKS trustore file into PEM to be compatible with Python libraries
"""
keystore_pw = self._get_cert_pw()
keystore_ca_cert = self._convert_jks_to_pem(
self._get_jks_key_store_path(), keystore_pw
)
truststore_ca_cert = self._convert_jks_to_pem(
self._get_jks_trust_store_path(), keystore_pw
)

with ca_chain_path.open("w") as f:
f.write(keystore_ca_cert + truststore_ca_cert)

def _convert_jks_to_pem(self, jks_path, keystore_pw):
"""
Converts a keystore JKS that contains client private key,
client certificate and CA certificate that was used to
sign the certificate to PEM format and returns the CA certificate.
Args:
:jks_path: path to the JKS file
:pw: password for decrypting the JKS file
Returns:
strings: (ca_cert)
"""
# load the keystore and decrypt it with password
ks = jks.KeyStore.load(jks_path, keystore_pw, try_decrypt_keys=True)
ca_certs = ""

# Convert CA Certificates into PEM format and append to string
for alias, c in ks.certs.items():
ca_certs = ca_certs + self._bytes_to_pem_str(c.cert, "CERTIFICATE")
return ca_certs

def _bytes_to_pem_str(self, der_bytes, pem_type):
"""
Utility function for creating PEM files
Args:
der_bytes: DER encoded bytes
pem_type: type of PEM, e.g Certificate, Private key, or RSA private key
Returns:
PEM String for a DER-encoded certificate or private key
"""
pem_str = ""
pem_str = pem_str + "-----BEGIN {}-----".format(pem_type) + "\n"
pem_str = (
pem_str
+ "\r\n".join(
textwrap.wrap(base64.b64encode(der_bytes).decode("ascii"), 64)
)
+ "\n"
)
pem_str = pem_str + "-----END {}-----".format(pem_type) + "\n"
return pem_str

def _get_cert_pw(self):
"""
Get keystore password from local container
Returns:
Certificate password
"""
pwd_path = Path(self.CRYPTO_MATERIAL_PASSWORD)
if not pwd_path.exists():
username = os.environ[self.HADOOP_USER_NAME]
material_directory = Path(os.environ[self.MATERIAL_DIRECTORY])
pwd_path = material_directory.joinpath(username + self.PASSWORD_SUFFIX)

with pwd_path.open() as f:
return f.read()

def _get_jks_trust_store_path(self):
"""
Get truststore location
Returns:
truststore location
"""
t_certificate = Path(self.T_CERTIFICATE)
if t_certificate.exists():
return str(t_certificate)
else:
username = os.environ[self.HADOOP_USER_NAME]
material_directory = Path(os.environ[self.MATERIAL_DIRECTORY])
return str(material_directory.joinpath(username + self.TRUSTSTORE_SUFFIX))

def _get_jks_key_store_path(self):
"""
Get keystore location
Returns:
keystore location
"""
k_certificate = Path(self.K_CERTIFICATE)
if k_certificate.exists():
return str(k_certificate)
else:
username = os.environ[self.HADOOP_USER_NAME]
material_directory = Path(os.environ[self.MATERIAL_DIRECTORY])
return str(material_directory.joinpath(username + self.KEYSTORE_SUFFIX))

def _project_name(self):
try:
return os.environ[self.PROJECT_NAME]
Expand Down
1 change: 1 addition & 0 deletions python/hsfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def connect(self):
"hive",
self._host,
self._cert_folder,
self._project,
client.get_instance()._cert_key,
)
else:
Expand Down
Loading

0 comments on commit c714d33

Please sign in to comment.