Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Training dataset api #25

Merged
merged 21 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/hopsworks/core/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ def __init__(self, query, on, left_on, right_on, join_type):
self._left_on = util.parse_features(left_on)
self._right_on = util.parse_features(right_on)
self._join_type = join_type or self.INNER

def to_dict(self):
return {
"query": self._query,
"on": self._on,
"leftOn": self._left_on,
"rightOn": self._right_on,
"type": self._join_type,
}
9 changes: 8 additions & 1 deletion python/hopsworks/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@ def join(self, sub_query, on=[], left_on=[], right_on=[], join_type="inner"):
return self

def json(self):
return json.dumps(self, cls=util.QueryEncoder)
return json.dumps(self, cls=util.FeatureStoreEncoder)

def to_dict(self):
return {
"leftFeatureGroup": self._left_feature_group,
"leftFeatures": self._left_features,
"joins": self._joins,
SirOibaf marked this conversation as resolved.
Show resolved Hide resolved
}
55 changes: 55 additions & 0 deletions python/hopsworks/core/storage_connector_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from hopsworks import client, storage_connector


class StorageConnectorApi:
def __init__(self, feature_store_id):
self._feature_store_id = feature_store_id

def get(self, name, connector_type):
"""Get storage connector with name and type.

:param name: name of the storage connector
:type name: str
:param connector_type: connector type
:type connector_type: str
:return: the storage connector
:rtype: StorageConnector
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"storageconnectors",
connector_type,
]
result = [
conn
for conn in _client._send_request("GET", path_params)
if conn["name"] == name
]

if len(result) == 1:
return storage_connector.StorageConnector.from_response_json(result[0])
else:
raise Exception(
"Could not find the storage connector `{}` with type `{}`.".format(
name, connector_type
)
)

def get_by_id(self, connector_id, connector_type):
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"storageconnectors",
connector_type,
connector_id,
]
return storage_connector.StorageConnector.from_response_json(
_client._send_request("GET", path_params)
)
42 changes: 42 additions & 0 deletions python/hopsworks/core/training_dataset_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from hopsworks import client

from hopsworks import training_dataset


class TrainingDatasetApi:
def __init__(self, feature_store_id):
self._feature_store_id = feature_store_id

def post(self, training_dataset_instance):
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"trainingdatasets",
]
headers = {"content-type": "application/json"}
return training_dataset_instance.update_from_response_json(
_client._send_request(
"POST",
path_params,
headers=headers,
data=training_dataset_instance.json(),
),
)

def get(self, name, version):
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"trainingdatasets",
name,
]
query_params = {"version": version}
return training_dataset.TrainingDataset.from_response_json(
_client._send_request("GET", path_params, query_params)[0],
)
119 changes: 119 additions & 0 deletions python/hopsworks/core/training_dataset_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from hopsworks import engine
from hopsworks.core import training_dataset_api


class TrainingDatasetEngine:
OVERWRITE = "overwrite"
APPEND = "append"

def __init__(self, feature_store_id):
self._training_dataset_api = training_dataset_api.TrainingDatasetApi(
feature_store_id
)

def create(self, training_dataset, feature_dataframe, user_write_options):
self._training_dataset_api.post(training_dataset)

write_options = engine.get_instance().write_options(
training_dataset.data_format, user_write_options
)

self._write(training_dataset, feature_dataframe, write_options, self.OVERWRITE)

def insert(
self, training_dataset, feature_dataframe, user_write_options, overwrite
):
# validate matching schema
engine.get_instance().schema_matches(feature_dataframe, training_dataset.schema)

write_options = engine.get_instance().write_options(
training_dataset.data_format, user_write_options
)

self._write(
training_dataset,
feature_dataframe,
write_options,
self.OVERWRITE if overwrite else self.APPEND,
)

def read(self, training_dataset, split, user_read_options):
if split is None:
path = training_dataset.location + "/" + "**"
else:
path = training_dataset.location + "/" + str(split)

read_options = engine.get_instance().read_options(
training_dataset.data_format, user_read_options
)

return engine.get_instance().read(
training_dataset.storage_connector,
training_dataset.data_format,
read_options,
path,
)

def _write(self, training_dataset, dataset, write_options, save_mode):
if len(training_dataset.splits) == 0:
path = training_dataset.location + "/" + training_dataset.name
self._write_single(
dataset,
training_dataset.storage_connector,
training_dataset.data_format,
write_options,
save_mode,
path,
)
else:
split_names = sorted([*training_dataset.splits])
split_weights = [training_dataset.splits[i] for i in split_names]
self._write_splits(
dataset.randomSplit(split_weights, training_dataset.seed),
training_dataset.storage_connector,
training_dataset.data_format,
write_options,
save_mode,
training_dataset.location,
split_names,
)

def _write_splits(
self,
feature_dataframe_list,
storage_connector,
data_format,
write_options,
save_mode,
path,
split_names,
):
for i in range(len(feature_dataframe_list)):
split_path = path + "/" + str(split_names[i])
self._write_single(
feature_dataframe_list[i],
storage_connector,
data_format,
write_options,
save_mode,
split_path,
)

def _write_single(
self,
feature_dataframe,
storage_connector,
data_format,
write_options,
save_mode,
path,
):
# TODO: currently not supported petastorm, hdf5 and npy file formats
engine.get_instance().write(
feature_dataframe,
storage_connector,
data_format,
save_mode,
write_options,
path,
)
Loading