Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOPSWORKS-2085] redshift connector #177

Merged
merged 16 commits into from
Dec 5, 2020
80 changes: 78 additions & 2 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.parquet.Strings;

import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -60,6 +63,54 @@ public class StorageConnector {
@Setter
private String bucket;

@Getter
@Setter
private String clusterIdentifier;

@Getter
@Setter
private String databaseDriver;

@Getter
@Setter
private String databaseEndpoint;

@Getter
@Setter
private String databaseName;

@Getter
@Setter
private Integer databasePort;

@Getter
@Setter
private String tableName;

@Getter
@Setter
private String databaseUserName;

@Getter
@Setter
private Boolean autoCreate;

@Getter
@Setter
private String databaseGroup;

@Getter
@Setter
private Date expiration;

@Getter
@Setter
private String databasePassword;

@Getter
@Setter
private String sessionToken;

@Getter
@Setter
private String connectionString;
Expand All @@ -72,12 +123,37 @@ public class StorageConnector {
@Setter
private StorageConnectorType storageConnectorType;

public Map<String, String> getSparkOptions() {
public Map<String, String> getSparkOptions() throws FeatureStoreException {
if (StorageConnectorType.JDBC.equals(storageConnectorType)) {
return getJdbcOptions();
} else if (StorageConnectorType.REDSHIFT.equals(storageConnectorType)) {
return getRedshiftOptions();
}
throw new FeatureStoreException("Spark options are not supported for connector " + storageConnectorType);
}

private Map<String, String> getJdbcOptions() throws FeatureStoreException {
Map<String, String> options = Arrays.stream(arguments.split(","))
.map(arg -> arg.split("="))
.collect(Collectors.toMap(a -> a[0], a -> a[1]));

options.put(Constants.JDBC_URL, connectionString);
return options;
}

private Map<String, String> getRedshiftOptions() {
String constr =
"jdbc:redshift://" + clusterIdentifier + "." + databaseEndpoint + ":" + databasePort + "/" + databaseName;
if (!Strings.isNullOrEmpty(arguments)) {
constr += "?" + arguments;
}
Map<String, String> options = new HashMap<>();
options.put(Constants.JDBC_DRIVER, databaseDriver);
options.put(Constants.JDBC_URL, constr);
options.put(Constants.JDBC_USER, databaseUserName);
options.put(Constants.JDBC_PWD, databasePassword);
if (!Strings.isNullOrEmpty(tableName)) {
options.put(Constants.JDBC_TABLE, tableName);
}
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
public enum StorageConnectorType {
HOPSFS,
S3,
JDBC
JDBC,
REDSHIFT
}
19 changes: 14 additions & 5 deletions java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StorageConnectorType;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.util.Constants;
import lombok.Getter;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -79,7 +79,9 @@ public Dataset<Row> sql(String query) {

public Dataset<Row> jdbc(StorageConnector storageConnector, String query) throws FeatureStoreException {
Map<String, String> readOptions = storageConnector.getSparkOptions();
readOptions.put("query", query);
if (!Strings.isNullOrEmpty(query)) {
readOptions.put("query", query);
}
return sparkSession.read()
.format(Constants.JDBC_FORMAT)
.options(readOptions)
Expand Down Expand Up @@ -112,9 +114,16 @@ public static String sparkPath(String path) {
}

private void configureS3Connector(StorageConnector storageConnector) {
if (!Strings.isNullOrEmpty(storageConnector.getAccessKey())) {
sparkSession.conf().set("fs.s3a.access.key", storageConnector.getAccessKey());
sparkSession.conf().set("fs.s3a.secret.key", storageConnector.getSecretKey());
if (!Strings.isNullOrEmpty(storageConnector.getAccessKey())
&& Strings.isNullOrEmpty(storageConnector.getSessionToken())) {
sparkSession.conf().set(Constants.S3_ACCESS_KEY_ENV, storageConnector.getAccessKey());
sparkSession.conf().set(Constants.S3_SECRET_KEY_ENV, storageConnector.getSecretKey());
}
if (!Strings.isNullOrEmpty(storageConnector.getSessionToken())) {
sparkSession.conf().set(Constants.S3_CREDENTIAL_PROVIDER_ENV, Constants.S3_TEMPORARY_CREDENTIAL_PROVIDER);
sparkSession.conf().set(Constants.S3_ACCESS_KEY_ENV, storageConnector.getAccessKey());
sparkSession.conf().set(Constants.S3_SECRET_KEY_ENV, storageConnector.getSecretKey());
sparkSession.conf().set(Constants.S3_SESSION_KEY_ENV, storageConnector.getSessionToken());
}
if (!Strings.isNullOrEmpty(storageConnector.getServerEncryptionAlgorithm())) {
sparkSession.conf().set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
public class StorageConnectorApi {

private static final String CONNECTOR_PATH = "/storageconnectors";
private static final String CONNECTOR_TYPE_PATH = CONNECTOR_PATH + "{/connType}";
private static final String CONNECTOR_TYPE_PATH =
CONNECTOR_PATH + "{/connType}{/name}{?temporaryCredentials}";
private static final String ONLINE_CONNECTOR_PATH = CONNECTOR_PATH + "/onlinefeaturestore";
private static final String ONLINE_FEATURE_STORE_CONNECTOR_SUFFIX = "_onlinefeaturestore";

Expand All @@ -48,15 +49,12 @@ public StorageConnector getByNameAndType(FeatureStore featureStore, String name,
.set("projectId", featureStore.getProjectId())
.set("fsId", featureStore.getId())
.set("connType", type)
.set("name", name)
.set("temporaryCredentials", true)
.expand();

LOGGER.info("Sending metadata request: " + uri);
StorageConnector[] storageConnectors = hopsworksClient.handleRequest(new HttpGet(uri), StorageConnector[].class);

return Arrays.stream(storageConnectors).filter(s -> s.getName().equals(name))
.findFirst()
.orElseThrow(() ->
new FeatureStoreException("Could not find storage connector " + name + " with type " + type));
return hopsworksClient.handleRequest(new HttpGet(uri), StorageConnector.class);
}

public StorageConnector getOnlineStorageConnector(FeatureStore featureStore)
Expand Down
8 changes: 8 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ public class Constants {
public static final String JDBC_PWD = "password";
public static final String JDBC_URL = "url";
public static final String JDBC_TABLE = "dbtable";
public static final String JDBC_DRIVER = "driver";

public static final String TF_CONNECTOR_RECORD_TYPE = "recordType";

public static final String S3_SCHEME = "s3://";
public static final String S3_SPARK_SCHEME = "s3a://";

public static final String S3_ACCESS_KEY_ENV = "fs.s3a.access.key";
public static final String S3_SECRET_KEY_ENV = "fs.s3a.secret.key";
public static final String S3_SESSION_KEY_ENV = "fs.s3a.session.token";
public static final String S3_CREDENTIAL_PROVIDER_ENV = "fs.s3a.aws.credentials.provider";
public static final String S3_TEMPORARY_CREDENTIAL_PROVIDER =
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
}
30 changes: 3 additions & 27 deletions python/hsfs/core/storage_connector_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,11 @@ def get(self, name, connector_type):
self._feature_store_id,
"storageconnectors",
connector_type,
name,
]
result = [
conn
for conn in _client._send_request("GET", path_params)
if conn["name"] == name
]

if len(result) == 1:
return storage_connector.StorageConnector.from_response_json(result[0])
else:
raise Exception(
"Could not find the storage connector `{}` with type `{}`.".format(
name, connector_type
)
)

def get_by_id(self, connector_id, connector_type):
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"storageconnectors",
connector_type,
connector_id,
]
query_params = {"temporaryCredentials": True}
return storage_connector.StorageConnector.from_response_json(
_client._send_request("GET", path_params)
_client._send_request("GET", path_params, query_params=query_params)
)

def get_online_connector(self):
Expand Down
26 changes: 22 additions & 4 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def _sql_offline(self, sql_query, feature_store):

def _jdbc(self, sql_query, connector):
options = connector.spark_options()
options["query"] = sql_query
if sql_query:
options["query"] = sql_query

return (
self._spark_session.read.format(self.JDBC_FORMAT).options(**options).load()
Expand Down Expand Up @@ -184,7 +185,13 @@ def save_dataframe(
)

def _save_offline_dataframe(
self, table_name, feature_group, dataframe, save_mode, operation, write_options,
self,
table_name,
feature_group,
dataframe,
save_mode,
operation,
write_options,
):
if feature_group.time_travel_format == "HUDI":
hudi_engine_instance = hudi_engine.HudiEngine(
Expand Down Expand Up @@ -239,8 +246,10 @@ def read(self, storage_connector, data_format, read_options, path):

def profile(self, dataframe, relevant_columns, correlations, histograms):
"""Profile a dataframe with Deequ."""
return self._jvm.com.logicalclocks.hsfs.engine.SparkEngine.getInstance().profile(
dataframe._jdf, relevant_columns, correlations, histograms
return (
self._jvm.com.logicalclocks.hsfs.engine.SparkEngine.getInstance().profile(
dataframe._jdf, relevant_columns, correlations, histograms
)
)

def write_options(self, data_format, provided_options):
Expand Down Expand Up @@ -347,6 +356,15 @@ def _setup_s3(self, storage_connector, path):
"fs.s3a.server-side-encryption-key",
storage_connector.server_encryption_key,
)
if storage_connector.session_token:
self._spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
self._spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.session.token",
storage_connector.session_token,
)
return path.replace("s3", "s3a", 1)

def _setup_pydoop(self):
Expand Down
Loading