Skip to content

Commit

Permalink
Add Connection module in Java/Scala API (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Feb 5, 2020
1 parent 7f6e235 commit 3c8cf08
Show file tree
Hide file tree
Showing 23 changed files with 862 additions and 200 deletions.
49 changes: 48 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,56 @@
<scope>provided</scope>
</dependency>

</dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>ssm</artifactId>
<version>2.10.40</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>2.10.40</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>secretsmanager</artifactId>
<version>2.10.40</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.specto</groupId>
<artifactId>hoverfly-java</artifactId>
<version>0.12.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.logicalclocks.featurestore;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Strings;
import com.logicalclocks.featurestore.engine.SparkEngine;
import com.logicalclocks.featurestore.metadata.FeatureGroupApi;
Expand All @@ -13,9 +14,11 @@
public class FeatureStore {

@Getter @Setter
@JsonProperty("featurestoreId")
private Integer id;

@Getter @Setter
@JsonProperty("featurestoreName")
private String name;

@Getter @Setter
Expand Down Expand Up @@ -45,4 +48,14 @@ public FeatureGroup getFeatureGroup(String name, Integer version)
public Dataset<Row> sql(String query) {
return SparkEngine.getInstance().sql(query);
}

@Override
public String toString() {
return "FeatureStore{" +
"id=" + id +
", name='" + name + '\'' +
", projectId=" + projectId +
", featureGroupApi=" + featureGroupApi +
'}';
}
}
15 changes: 15 additions & 0 deletions java/src/main/java/com/logicalclocks/featurestore/FsQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.logicalclocks.featurestore;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@JsonIgnoreProperties(ignoreUnknown = true)
@AllArgsConstructor
@NoArgsConstructor
public class FsQuery {
@Getter @Setter
private String query;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.logicalclocks.featurestore;

import com.google.common.base.Strings;
import com.logicalclocks.featurestore.metadata.FeatureStoreApi;
import com.logicalclocks.featurestore.metadata.HopsworksClient;
import com.logicalclocks.featurestore.metadata.ProjectApi;
import com.logicalclocks.featurestore.util.Constants;
import lombok.Builder;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

import java.io.Closeable;
import java.io.IOException;

public class HopsworksConnection implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(HopsworksConnection.class);

@Getter
private String host;

@Getter
private int port;

@Getter
private String project;

@Getter
private Region region;

@Getter
private SecretStore secretStore;

@Getter
private boolean hostnameVerification;

@Getter
private String trustStorePath;

@Getter
private String certPath;

@Getter
private String APIKeyFilePath;


private FeatureStoreApi featureStoreApi = new FeatureStoreApi();
private ProjectApi projectApi = new ProjectApi();

private Project projectObj;

@Builder
public HopsworksConnection(String host, int port, String project, Region region, SecretStore secretStore,
boolean hostnameVerification, String trustStorePath,
String certPath, String APIKeyFilePath) throws IOException, FeatureStoreException {
this.host = host;
this.port = port;
this.project = project;
this.region = region;
this.secretStore = secretStore;
this.hostnameVerification = hostnameVerification;
this.trustStorePath = trustStorePath;
this.certPath = certPath;
this.APIKeyFilePath = APIKeyFilePath;

HopsworksClient hopsworksClient = HopsworksClient.setupHopsworksClient(host, port, region, secretStore,
hostnameVerification, trustStorePath, APIKeyFilePath);
projectObj = getProject();
hopsworksClient.downloadCredentials(projectObj, certPath);
}

/**
* Retrieve the project feature store
* @return
* @throws IOException
* @throws FeatureStoreException
*/
public FeatureStore getFeatureStore() throws IOException, FeatureStoreException{
return getFeatureStore(project + Constants.FEATURESTORE_SUFFIX);
}

/**
* Retrieve a feature store based on name. The feature store needs to be shared with
* the connection's project.
* @param name
* @return
* @throws IOException
* @throws FeatureStoreException
*/
public FeatureStore getFeatureStore(String name) throws IOException, FeatureStoreException {
return featureStoreApi.get(projectObj.getProjectId(), name);
}

/**
* Close the connection and clean up the certificates.
*/
public void close() {
// Close the client
}

private Project getProject() throws IOException, FeatureStoreException {
if (Strings.isNullOrEmpty(project)) {
// User didn't specify a project in the connection construction. Assume they are running
// from within Hopsworks and the project name is available a system property
project = System.getProperty(Constants.PROJECTNAME_ENV);
}
LOGGER.info("Getting information for project name: " + project);
return projectApi.get(project);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ public class MainClass {
private final static Logger LOGGER = LoggerFactory.getLogger(MainClass.class);

public static void main(String[] args) throws Exception {
FeatureStore fs = new FeatureStore();
fs.setProjectId(120);
fs.setId(67);

HopsworksConnection connection = HopsworksConnection.builder().build();

FeatureStore fs = connection.getFeatureStore();
LOGGER.info("Feature Store " + fs);

FeatureGroup fg = fs.getFeatureGroup("attendances_features", 1);

LOGGER.info("Name " + fg.getName());

LOGGER.info("Brace yourself, I'm running the query");
Expand Down
20 changes: 20 additions & 0 deletions java/src/main/java/com/logicalclocks/featurestore/Project.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.logicalclocks.featurestore;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
public class Project {

@Getter @Setter
private Integer projectId;
@Getter @Setter
private String projectName;
@Getter @Setter
private String owner;

public Project(Integer projectId) {
this.projectId = projectId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.logicalclocks.featurestore;

public enum SecretStore {
PARAMETER_STORE,
SECRET_MANAGER
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.logicalclocks.featurestore.metadata;

import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.Charset;

public class AuthorizationHandler<T> implements ResponseHandler<T> {

private ResponseHandler<T> originalResponseHandler;

AuthorizationHandler(ResponseHandler<T> originalResponseHandler) {
this.originalResponseHandler = originalResponseHandler;
}

@Override
public T handleResponse(HttpResponse response) throws ClientProtocolException, IOException {
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
throw new UnauthorizedException();
} else if (response.getStatusLine().getStatusCode() / 100 == 4) {
throw new IOException("Error: " + response.getStatusLine().getStatusCode() +
EntityUtils.toString(response.getEntity(), Charset.defaultCharset()));
} else if (response.getStatusLine().getStatusCode() / 100 == 5) {
// TODO(fabio): Propagate http error upstream
throw new InternalException();
}

return originalResponseHandler.handleResponse(response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.logicalclocks.featurestore.metadata;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
@AllArgsConstructor
public class Credentials {

@Getter @Setter
private String kStore;
@Getter @Setter
private String tStore;
@Getter @Setter
private String password;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ public class FeatureGroupApi {

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);

private HopsworksClient hopsworksClient;

public FeatureGroupApi() throws FeatureStoreException {
hopsworksClient = HopsworksClient.getInstance();
}
public FeatureGroupApi() throws FeatureStoreException { }

public FeatureGroup get(FeatureStore featureStore, String fgName, Integer fgVersion)
throws IOException, FeatureStoreException {
String pathTemplate = HopsworksClient.PROJECT_PATH
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ FEATURE_GROUP_PATH;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
package com.logicalclocks.featurestore.metadata;

import com.damnhandy.uri.template.UriTemplate;
import com.logicalclocks.featurestore.FeatureStore;
import com.logicalclocks.featurestore.FeatureStoreException;
import org.apache.http.client.methods.HttpGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class FeatureStoreApi {

public static final String FEATURE_STORE_SERVICE_PATH = "/featurestores";
public static final String FEATURE_STORE_PATH = FEATURE_STORE_SERVICE_PATH + "{/fsId}";
public static final String FEATURE_STORE_NAME_PATH = FEATURE_STORE_SERVICE_PATH + "{/fsName}";

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureStoreApi.class);

public FeatureStore get(int projectId, String name) throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FEATURE_STORE_NAME_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", projectId)
.set("fsName", name)
.expand();

LOGGER.info("Sending metadata request: " + uri);
return hopsworksClient.handleRequest(new HttpGet(uri), FeatureStore.class);
}
}
Loading

0 comments on commit 3c8cf08

Please sign in to comment.