Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOPSWORKS-2177] Refactor storage connectors so that they belong to t… #786

Merged
merged 1 commit into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
=begin
This file is part of Hopsworks
Copyright (C) 2020, Logical Clocks AB. All rights reserved

Hopsworks is free software: you can redistribute it and/or modify it under the terms of
the GNU Affero General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version.

Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program.
If not, see <https://www.gnu.org/licenses/>.
=end
class FeatureStoreConnector < ActiveRecord::Base
self.inheritance_column = 'xname'

def self.table_name
"feature_store_connector"
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
You should have received a copy of the GNU Affero General Public License along with this program.
If not, see <https://www.gnu.org/licenses/>.
=end
class FeatureStoreJDBCConnector < ActiveRecord::Base
class FeatureStoreJDBCConnector< ActiveRecord::Base
def self.table_name
"feature_store_jdbc_connector"
end
Expand Down
27 changes: 20 additions & 7 deletions hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ def update_hopsfs_training_dataset_metadata(project_id, featurestore_id, trainin
version: 1,
dataFormat: dataFormat,
trainingDatasetType: trainingDatasetType,
storageConnectorId: hopsfs_connector.id
storageConnector: {
id: hopsfs_connector.id
}
}
json_data = json_data.to_json
json_result = put update_training_dataset_metadata_endpoint, json_data
Expand All @@ -274,7 +276,9 @@ def update_external_training_dataset_metadata(project_id, featurestore_id, train
version: 1,
dataFormat: "parquet",
trainingDatasetType: trainingDatasetType,
storageConnectorId: s3_connector_id
storageConnector: {
id: s3_connector_id
}
}
json_data = json_data.to_json
json_result = put update_training_dataset_metadata_endpoint, json_data
Expand All @@ -296,8 +300,6 @@ def create_hopsfs_training_dataset(project_id, featurestore_id, hopsfs_connector
create_training_dataset_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/trainingdatasets"
name = name == nil ? "training_dataset_#{random_id}" : name
data_format = data_format == nil ? "tfrecords" : data_format
connector_id = hopsfs_connector == nil ? nil : hopsfs_connector.id
connector_name = hopsfs_connector == nil ? nil : hopsfs_connector.name
description = description == nil ? "testtrainingdatasetdescription" : description
if features == nil && query == nil
features = [
Expand All @@ -318,13 +320,18 @@ def create_hopsfs_training_dataset(project_id, featurestore_id, hopsfs_connector
version: version,
dataFormat: data_format,
trainingDatasetType: trainingDatasetType,
storageConnectorId: connector_id,
storageConnectorName: connector_name,
features: features,
splits: splits,
seed: 1234,
queryDTO: query
}

unless hopsfs_connector.nil?
json_data["storageConnector"] = {
id: hopsfs_connector.id
}
end

json_result = post create_training_dataset_endpoint, json_data.to_json
[json_result, name]
end
Expand Down Expand Up @@ -356,11 +363,17 @@ def create_external_training_dataset(project_id, featurestore_id, s3_connector_i
dataFormat: "tfrecords",
location: location,
trainingDatasetType: trainingDatasetType,
storageConnectorId: s3_connector_id,
features: features == nil ? default_features : features,
splits: splits,
seed: 1234
}

unless s3_connector_id.nil?
json_data["storageConnector"] = {
id: s3_connector_id
}
end

json_data = json_data.to_json
json_result = post create_training_dataset_endpoint, json_data
return json_result, training_dataset_name
Expand Down
180 changes: 55 additions & 125 deletions hopsworks-IT/src/test/ruby/spec/helpers/storage_connector_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@
module StorageConnectorHelper

def get_storage_connectors(project_id, featurestore_id, type)
get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{type}"
json_result = get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/"
connectors = JSON.parse(json_result)
connectors.select{|c| c['storageConnectorType'].eql?(type)}.map { |c| c.with_indifferent_access }
end

def get_jdbc_storate_connector(project_id, featurestore_id, name)
get_storage_connector(project_id, featurestore_id, "JDBC", name)
end

def get_storage_connector(project_id, featurestore_id, type, name)
connectors_json = get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{type}"
expect_status(200)
connectors = JSON.parse(connectors_json)

connectors.select{|connector| connector['name'] == name}[0]
end

def get_storage_connector_by_name(project_id, featurestore_id, type, name)
get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{type}/#{name}"
def get_storage_connector(project_id, featurestore_id, name)
get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{name}"
end

def create_hopsfs_connector(project_id, featurestore_id, datasetName: "Resources")
type = "featurestoreHopsfsConnectorDTO"
storageConnectorType = "HopsFS"
create_hopsfs_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/HOPSFS"
storageConnectorType = "HOPSFS"
create_hopsfs_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors"
hopsfs_connector_name = "hopsfs_connector_#{random_id}"
json_data = {
name: hopsfs_connector_name,
Expand All @@ -48,32 +38,29 @@ def create_hopsfs_connector(project_id, featurestore_id, datasetName: "Resources
storageConnectorType: storageConnectorType,
datasetName: datasetName
}
json_data = json_data.to_json
json_result = post create_hopsfs_connector_endpoint, json_data
return json_result, hopsfs_connector_name
json_result = post create_hopsfs_connector_endpoint, json_data.to_json
[json_result, hopsfs_connector_name]
end

def update_hopsfs_connector(project_id, featurestore_id, connector_name, datasetName: "Resources")
type = "featurestoreHopsfsConnectorDTO"
storageConnectorType = "HopsFS"
update_hopsfs_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/HOPSFS/" + connector_name
hopsfs_connector_name = "hopsfs_connector_#{random_id}"
storageConnectorType = "HOPSFS"
update_hopsfs_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{connector_name}"
json_data = {
name: hopsfs_connector_name,
name: connector_name,
description: "testhopsfsconnectordescription",
type: type,
storageConnectorType: storageConnectorType,
datasetName: datasetName
}
json_data = json_data.to_json
json_result = put update_hopsfs_connector_endpoint, json_data
return json_result, hopsfs_connector_name
json_result = put update_hopsfs_connector_endpoint, json_data.to_json
[json_result, connector_name]
end

def create_jdbc_connector(project_id, featurestore_id, connectionString: "jdbc://test")
type = "featurestoreJdbcConnectorDTO"
storageConnectorType = "JDBC"
create_jdbc_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/JDBC"
create_jdbc_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors"
jdbc_connector_name = "jdbc_connector_#{random_id}"
json_data = {
name: jdbc_connector_name,
Expand All @@ -91,120 +78,72 @@ def create_jdbc_connector(project_id, featurestore_id, connectionString: "jdbc:/
def update_jdbc_connector(project_id, featurestore_id, connector_name, connectionString: "jdbc://test")
type = "featurestoreJdbcConnectorDTO"
storageConnectorType = "JDBC"
update_jdbc_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/JDBC/" + connector_name
jdbc_connector_name = "jdbc_connector_#{random_id}"
update_jdbc_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{connector_name}"
json_data = {
name: jdbc_connector_name,
name: connector_name,
description: "testfeaturegroupdescription",
type: type,
storageConnectorType: storageConnectorType,
connectionString: connectionString,
arguments: "test1,test2"
}
json_data = json_data.to_json
json_result = put update_jdbc_connector_endpoint, json_data
return json_result, jdbc_connector_name
end

def create_s3_connector_with_or_without_access_key_and_secret_key(project_id, featurestore_id, with_access_and_secret_key,
access_key, secret_key, bucket: "test")
type = "featurestoreS3ConnectorDTO"
storageConnectorType = "S3"
create_s3_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/S3"
s3_connector_name = "s3_connector_#{random_id}"
json_data = {
name: s3_connector_name,
description: "tests3connectordescription",
type: type,
storageConnectorType: storageConnectorType,
bucket: bucket
}
if with_access_and_secret_key
json_data["secretKey"] = access_key
json_data["accessKey"] = secret_key
else

end
json_data = json_data.to_json
json_result = post create_s3_connector_endpoint, json_data
return json_result, s3_connector_name
end

def create_s3_connector_without_encryption(project_id, featurestore_id, bucket: "test")
type = "featurestoreS3ConnectorDTO"
storageConnectorType = "S3"
create_s3_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/S3"
s3_connector_name = "s3_connector_#{random_id}"
json_data = {
name: s3_connector_name,
description: "tests3connectordescription",
type: type,
storageConnectorType: storageConnectorType,
bucket: bucket,
secretKey: "test",
accessKey: "test"
}
json_data = json_data.to_json
json_result = post create_s3_connector_endpoint, json_data
return json_result, s3_connector_name
put update_jdbc_connector_endpoint, json_data.to_json
end

def create_s3_connector_with_encryption(project_id, featurestore_id, with_encryption_key, encryption_algorithm,
encryption_key, access_key, secret_key,
bucket: "test")
def create_s3_connector(project_id, featurestore_id, encryption_algorithm: nil, encryption_key: nil,
access_key: nil, secret_key: nil, bucket: "test")
type = "featurestoreS3ConnectorDTO"
storageConnectorType = "S3"
create_s3_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/S3"
create_s3_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors"
s3_connector_name = "s3_connector_#{random_id}"
json_data = {
name: s3_connector_name,
description: "tests3connectordescription",
type: type,
storageConnectorType: storageConnectorType,
bucket: bucket,
secretKey: access_key,
accessKey: secret_key
}

if with_encryption_key
json_data["serverEncryptionAlgorithm"] = encryption_algorithm
json_data["serverEncryptionAlgorithm"] = encryption_algorithm
unless encryption_key.nil?
json_data["serverEncryptionKey"] = encryption_key
else
json_data["serverEncryptionAlgorithm"] = encryption_algorithm
end

json_data = json_data.to_json
json_result = post create_s3_connector_endpoint, json_data
return json_result, s3_connector_name
end
unless access_key.nil?
json_data["secretKey"] = access_key
json_data["accessKey"] = secret_key
end

json_result = post create_s3_connector_endpoint, json_data.to_json
[json_result, s3_connector_name]
end

def update_s3_connector(project_id, featurestore_id, connector_name, s3_connector_name, with_access_keys, bucket:
"test")
def update_s3_connector(project_id, featurestore_id, connector_name, access_key: nil, secret_key: nil, bucket: "test")
type = "featurestoreS3ConnectorDTO"
storageConnectorType = "S3"
update_s3_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/storageconnectors/S3/" + connector_name
update_s3_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{connector_name}"
json_data = {
name: s3_connector_name,
name: connector_name,
description: "tests3connectordescription",
type: type,
storageConnectorType: storageConnectorType,
bucket: bucket
}
if with_access_keys
json_data["secretKey"] = "test2"
json_data["accessKey"] = "test2"
unless secret_key.nil?
json_data["secretKey"] = secret_key
end
json_data = json_data.to_json
json_result = put update_s3_connector_endpoint, json_data
return json_result, s3_connector_name
unless access_key.nil?
json_data["accessKey"] = access_key
end
json_result = put update_s3_connector_endpoint, json_data.to_json
[json_result, connector_name]
end

def create_redshift_connector(project_id, featurestore_id, redshift_connector_name: nil, clusterIdentifier: "redshift-connector",
databaseUserName: "awsUser", databasePassword: nil, iamRole: nil)
type = "featurestoreRedshiftConnectorDTO"
storageConnectorType = "REDSHIFT"
create_redshift_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{storageConnectorType}"
create_redshift_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/"
redshift_connector_name ||= "redshift_connector_#{random_id}"
json_data = {
name: redshift_connector_name,
Expand All @@ -222,20 +161,15 @@ def create_redshift_connector(project_id, featurestore_id, redshift_connector_na
iamRole: iamRole,
arguments: "test1,test2"
}
json_data = json_data.to_json
json_result = post create_redshift_connector_endpoint, json_data
return json_result, redshift_connector_name
json_result = post create_redshift_connector_endpoint, json_data.to_json
[json_result, redshift_connector_name]
end

def update_redshift_connector(project_id, featurestore_id, connector_name, redshift_connector_json)
type = "featurestoreRedshiftConnectorDTO"
storageConnectorType = "REDSHIFT"
update_redshift_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{storageConnectorType}/#{connector_name}"
redshift_connector_json["type"] = type
redshift_connector_json["storageConnectorType"] = storageConnectorType
json_data = redshift_connector_json.to_json
json_result = put update_redshift_connector_endpoint, json_data
return json_result
update_redshift_connector_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{connector_name}"
redshift_connector_json["type"] = "featurestoreRedshiftConnectorDTO"
redshift_connector_json["storageConnectorType"] = "REDSHIFT"
put update_redshift_connector_endpoint, redshift_connector_json.to_json
end

def with_jdbc_connector(project_id)
Expand All @@ -253,32 +187,28 @@ def get_jdbc_connector_id

def get_hopsfs_training_datasets_connector(project_name)
connector_name = project_name + "_Training_Datasets"
return FeatureStoreHopsfsConnector.find_by(name: connector_name)
return FeatureStoreConnector.find_by(name: connector_name)
end

def get_s3_connector_id
@s3_connector_id
end

def with_s3_connector(project_id)
encryption_algorithm = "AES256"
encryption_key = "Test"
secret_key = "test"
access_key = "test"
with_encryption_key = true
featurestore_id = get_featurestore_id(project_id)
json_result, connector_name = create_s3_connector_with_encryption(project_id, featurestore_id,
with_encryption_key, encryption_algorithm,
encryption_key, access_key, secret_key, bucket:
"testbucket")
json_result, _ = create_s3_connector(project_id, featurestore_id,
encryption_algorithm: "AES256",
access_key: "test", secret_key: "test",
bucket: "testbucket")

parsed_json = JSON.parse(json_result)
expect_status_details(201)
connector_id = parsed_json["id"]
@s3_connector_id = connector_id
end

def delete_connector(project_id, featurestore_id, type, name)
delete "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{type}/#{name}"
def delete_connector(project_id, featurestore_id, name)
delete "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/#{name}"
end

def with_redshift_connectors
Expand Down
Loading