Skip to content

Commit

Permalink
Add possibility to execute pure sql on feature store (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored Mar 13, 2020
1 parent b9b73ad commit 58483eb
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 18 deletions.
2 changes: 0 additions & 2 deletions python/hopsworks/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ def get_feature_store(self, name=None):
"""
if not name:
name = self._client._project_name + "_featurestore"
# TODO: this won't work with multiple feature stores
engine.get_instance()._feature_store = name
return self._feature_store_api.get(name)

@property
Expand Down
9 changes: 6 additions & 3 deletions python/hopsworks/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@


class Query:
def __init__(self, query_constructor_api, left_feature_group, left_features):
def __init__(
self, feature_store, query_constructor_api, left_feature_group, left_features
):
self._feature_store = feature_store
self._left_feature_group = left_feature_group
self._left_features = util.parse_features(left_features)
self._joins = []
self._query_constructor_api = query_constructor_api

def read(self, dataframe_type="default"):
sql_query = self._query_constructor_api.construct_query(self)["query"]
return engine.get_instance().sql(sql_query, dataframe_type)
return engine.get_instance().sql(sql_query, self._feature_store, dataframe_type)

def show(self, n):
sql_query = self._query_constructor_api.construct_query(self)["query"]
return engine.get_instance().show(sql_query, n)
return engine.get_instance().show(sql_query, self._feature_store, n)

def join(self, sub_query, on=[], left_on=[], right_on=[], join_type="inner"):
self._joins.append(
Expand Down
14 changes: 7 additions & 7 deletions python/hopsworks/engine/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,27 @@
class Engine:
def __init__(self, host, cert_folder, cert_key):
self._host = host
self._feature_store = None
self._cert_folder = cert_folder
self._cert_key = cert_key

def sql(self, sql_query, dataframe_type):
def sql(self, sql_query, feature_store, dataframe_type):
print("Lazily executing query: {}".format(sql_query))
with self._create_hive_connection() as hive_conn:
with self._create_hive_connection(feature_store) as hive_conn:
result_df = pd.read_sql(sql_query, hive_conn)
return self._return_dataframe_type(result_df, dataframe_type)

def show(self, sql_query, n):
return self.sql(sql_query, "default").head(n)
def show(self, sql_query, feature_store, n):
return self.sql(sql_query, feature_store, "default").head(n)

def set_job_group(self, group_id, description):
pass

def _create_hive_connection(self):
def _create_hive_connection(self, feature_store):
return hive.Connection(
host=self._host,
port=9085,
database=self._feature_store,
# database needs to be set every time, 'default' doesn't work in pyhive
database=feature_store,
auth="CERTIFICATES",
truststore=os.path.join(self._cert_folder, "trustStore.jks"),
keystore=os.path.join(self._cert_folder, "keyStore.jks"),
Expand Down
9 changes: 5 additions & 4 deletions python/hopsworks/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
class Engine:
def __init__(self):
self._spark_session = SparkSession.builder.getOrCreate()
self._feature_store = None

def sql(self, sql_query, dataframe_type):
def sql(self, sql_query, feature_store, dataframe_type):
print("Lazily executing query: {}".format(sql_query))
# set feature store
self._spark_session.sql("USE {}".format(feature_store))
result_df = self._spark_session.sql(sql_query)
self.set_job_group("", "")
return self._return_dataframe_type(result_df, dataframe_type)

def show(self, sql_query, n):
return self.sql(sql_query, "default").show(n)
def show(self, sql_query, feature_store, n):
return self.sql(sql_query, feature_store, "default").show(n)

def set_job_group(self, group_id, description):
self._spark_session.sparkContext.setJobGroup(group_id, description)
Expand Down
8 changes: 6 additions & 2 deletions python/hopsworks/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ def show(self, n):

def select_all(self):
"""Select all features in the feature group and return a query object."""
return query.Query(self._query_constructor_api, self, self._features)
return query.Query(
self._feature_store_name, self._query_constructor_api, self, self._features
)

def select(self, features=[]):
return query.Query(self._query_constructor_api, self, features)
return query.Query(
self._feature_store_name, self._query_constructor_api, self, features
)

@classmethod
def from_response_json(cls, client, json_dict):
Expand Down
4 changes: 4 additions & 0 deletions python/hopsworks/feature_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import humps

from hopsworks import engine
from hopsworks.core import feature_group_api


Expand Down Expand Up @@ -50,3 +51,6 @@ def from_response_json(cls, client, json_dict):

def get_feature_group(self, name, version):
return self._feature_group_api.get(name, version)

def sql(self, query, dataframe_type="default"):
return engine.get_instance().sql(query, self._name, dataframe_type)

0 comments on commit 58483eb

Please sign in to comment.