From f4503b082fb0ace95bc09f3e951fb441360c12e4 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Tue, 21 Jan 2020 17:00:13 +0100 Subject: [PATCH 1/3] Feature store querying --- java/pom.xml | 123 ++++++++++- .../logicalclocks/featurestore/Feature.java | 32 +++ .../featurestore/FeatureGroup.java | 83 +++++++- .../featurestore/FeatureStore.java | 33 ++- .../featurestore/FeatureStoreException.java | 3 + .../logicalclocks/featurestore/JoinType.java | 11 + .../logicalclocks/featurestore/MainClass.java | 23 +++ .../featurestore/engine/SparkEngine.java | 39 ++++ .../metadata/FeatureGroupApi.java | 52 +++++ .../metadata/FeatureStoreApi.java | 8 + .../metadata/HopsworksClient.java | 191 ++++++++++++++++-- .../featurestore/metadata/Join.java | 42 ++++ .../featurestore/metadata/MetadataClient.java | 4 + .../featurestore/metadata/Query.java | 77 +++++++ .../metadata/QueryConstructorApi.java | 47 +++++ 15 files changed, 738 insertions(+), 30 deletions(-) create mode 100644 java/src/main/java/com/logicalclocks/featurestore/Feature.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/JoinType.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/MainClass.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/engine/SparkEngine.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureGroupApi.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureStoreApi.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/metadata/Join.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/metadata/MetadataClient.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/metadata/Query.java create mode 100644 java/src/main/java/com/logicalclocks/featurestore/metadata/QueryConstructorApi.java diff --git a/java/pom.xml b/java/pom.xml index 79fcb5c9ba..d77eb5bfab 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -9,15 +9,70 @@ 1.2.0-SNAPSHOT + 1.8 + 1.8 14.0.1 4.5.6 + 1.7.16 + 1.2.17 + 2.1.8 + 1.18.6 + 2.6.7.1 + 2.4.3.0 org.projectlombok lombok - 1.18.6 + ${lombok.version} + + + + org.apache.spark + spark-core_2.11 + ${spark.version} + provided + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + + + org.apache.spark + spark-sql_2.11 + ${spark.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.jackson.databind.version} + provided + + + + com.damnhandy + handy-uri-templates + ${handy.version} @@ -33,5 +88,71 @@ ${httpclient.version} provided + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + provided + + + + log4j + log4j + ${log4j.version} + provided + + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.4.1 + + + + jar-with-dependencies + + + + + make-assembly + + package + + single + + + + + + + + + + Hops + Hops Repo + https://bbc1.sics.se/archiva/repository/Hops/ + + true + + + true + + + + + \ No newline at end of file diff --git a/java/src/main/java/com/logicalclocks/featurestore/Feature.java b/java/src/main/java/com/logicalclocks/featurestore/Feature.java new file mode 100644 index 0000000000..6df0d7a862 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/Feature.java @@ -0,0 +1,32 @@ +package com.logicalclocks.featurestore; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@AllArgsConstructor +@NoArgsConstructor +public class Feature { + @Getter @Setter + private String name; + + @Getter @Setter + private String Type; + + @Getter @Setter + private String onlineType; + + @Getter @Setter + private String description; + + @Getter @Setter + private Boolean primary; + + @Getter @Setter + private Boolean partition; + + public Feature(String name) { + this.name = name; + } +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java b/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java index 9bc2c97a8c..63cb57f5b9 100644 --- a/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java @@ -1,17 +1,88 @@ package com.logicalclocks.featurestore; -import lombok.AllArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.logicalclocks.featurestore.engine.SparkEngine; +import com.logicalclocks.featurestore.metadata.Query; +import com.logicalclocks.featurestore.metadata.QueryConstructorApi; import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@AllArgsConstructor -@NoArgsConstructor +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +@JsonIgnoreProperties(ignoreUnknown = true) public class FeatureGroup { - @Getter + private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class); + + @Getter @Setter + private Integer id; + + @Getter @Setter private String name; - @Getter + @Getter @Setter private Integer version; + @Getter @Setter + private String description; + + @Getter @Setter + private Date created; + + @Getter @Setter + private String creator; + + @Getter @Setter + private FeatureStore featureStore; + + @Getter @Setter + private List features; + + // API + private QueryConstructorApi queryConstructorApi; + + public FeatureGroup(String name, Integer version, String description, Date created, + String creator) + throws FeatureStoreException { + this.name = name; + this.version = version; + this.description = description; + this.created = created; + this.creator = creator; + + this.queryConstructorApi = new QueryConstructorApi(); + } + + public FeatureGroup() throws FeatureStoreException { + this.queryConstructorApi = new QueryConstructorApi(); + } + + public Dataset read() throws FeatureStoreException, IOException { + String sqlQuery = queryConstructorApi.constructQuery(featureStore, + new Query(this, Arrays.asList(new Feature("*")))); + LOGGER.info("Executing query: " + sqlQuery); + return SparkEngine.getInstance().read(sqlQuery); + } + + public Object head(int numRows) throws FeatureStoreException, IOException { + String sqlQuery = queryConstructorApi.constructQuery(featureStore, + new Query(this, Arrays.asList(new Feature("*")))); + LOGGER.info("Executing query: " + sqlQuery); + return SparkEngine.getInstance().read(sqlQuery).head(numRows); + } + + public Query select(List features) throws FeatureStoreException, IOException { + return new Query(this, features); + } + + public Query selectAll() throws FeatureStoreException, IOException { + return new Query(this, getFeatures()); + } } diff --git a/java/src/main/java/com/logicalclocks/featurestore/FeatureStore.java b/java/src/main/java/com/logicalclocks/featurestore/FeatureStore.java index edffa960bf..131ddb43fc 100644 --- a/java/src/main/java/com/logicalclocks/featurestore/FeatureStore.java +++ b/java/src/main/java/com/logicalclocks/featurestore/FeatureStore.java @@ -1,20 +1,32 @@ package com.logicalclocks.featurestore; import com.google.common.base.Strings; -import lombok.AllArgsConstructor; +import com.logicalclocks.featurestore.engine.SparkEngine; +import com.logicalclocks.featurestore.metadata.FeatureGroupApi; import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.io.IOException; -@AllArgsConstructor -@NoArgsConstructor public class FeatureStore { - @Getter + @Getter @Setter private Integer id; - @Getter + @Getter @Setter private String name; + @Getter @Setter + private Integer projectId; + + private FeatureGroupApi featureGroupApi; + + public FeatureStore() throws FeatureStoreException { + featureGroupApi = new FeatureGroupApi(); + } + /** * Get a feature group from the feature store * @param name: the name of the feature group @@ -22,10 +34,15 @@ public class FeatureStore { * @return * @throws FeatureStoreException */ - public FeatureGroup getFeatureGroup(String name, Integer version) throws FeatureStoreException { + public FeatureGroup getFeatureGroup(String name, Integer version) + throws FeatureStoreException, IOException { if (Strings.isNullOrEmpty(name) || version == null) { throw new FeatureStoreException("Both name and version are required"); } - return null; + return featureGroupApi.get(this, name, version); + } + + public Dataset sql(String query) { + return SparkEngine.getInstance().sql(query); } } diff --git a/java/src/main/java/com/logicalclocks/featurestore/FeatureStoreException.java b/java/src/main/java/com/logicalclocks/featurestore/FeatureStoreException.java index f9bbd536e5..9c0df8a403 100644 --- a/java/src/main/java/com/logicalclocks/featurestore/FeatureStoreException.java +++ b/java/src/main/java/com/logicalclocks/featurestore/FeatureStoreException.java @@ -6,4 +6,7 @@ public FeatureStoreException(String msg) { super(msg); } + public FeatureStoreException(String msg, Throwable cause) { + super(msg, cause); + } } diff --git a/java/src/main/java/com/logicalclocks/featurestore/JoinType.java b/java/src/main/java/com/logicalclocks/featurestore/JoinType.java new file mode 100644 index 0000000000..fcd2530c82 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/JoinType.java @@ -0,0 +1,11 @@ +package com.logicalclocks.featurestore; + +public enum JoinType { + INNER, + FULL, + CROSS, + LEFT, + RIGHT, + LEFT_SEMI_JOIN, + COMMA; +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/MainClass.java b/java/src/main/java/com/logicalclocks/featurestore/MainClass.java new file mode 100644 index 0000000000..2919a72922 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/MainClass.java @@ -0,0 +1,23 @@ +package com.logicalclocks.featurestore; + +import com.logicalclocks.featurestore.engine.SparkEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MainClass { + + private final static Logger LOGGER = LoggerFactory.getLogger(MainClass.class); + + public static void main(String[] args) throws Exception { + FeatureStore fs = new FeatureStore(); + fs.setProjectId(120); + fs.setId(67); + + FeatureGroup fg = fs.getFeatureGroup("attendances_features", 1); + LOGGER.info("Name " + fg.getName()); + + LOGGER.info("Brace yourself, I'm running the query"); + fg.read().show(10); + SparkEngine.getInstance().getSparkSession().close(); + } +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/engine/SparkEngine.java b/java/src/main/java/com/logicalclocks/featurestore/engine/SparkEngine.java new file mode 100644 index 0000000000..1c554b7824 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/engine/SparkEngine.java @@ -0,0 +1,39 @@ +package com.logicalclocks.featurestore.engine; + +import lombok.Getter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkEngine { + + private static SparkEngine INSTANCE = null; + + public static synchronized SparkEngine getInstance() { + if (INSTANCE == null) { + INSTANCE = new SparkEngine(); + } + return INSTANCE; + } + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkEngine.class); + + @Getter + private SparkSession sparkSession; + + private SparkEngine() { + sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate(); + } + + public Dataset read(String query) { + LOGGER.info("Lazily executing query: " + query); + return sparkSession.sql(query); + } + + public Dataset sql(String query) { + LOGGER.info("Lazily executing query: " + query); + return sparkSession.sql(query); + } +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureGroupApi.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureGroupApi.java new file mode 100644 index 0000000000..ebffc438e7 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureGroupApi.java @@ -0,0 +1,52 @@ +package com.logicalclocks.featurestore.metadata; + +import com.damnhandy.uri.template.UriTemplate; +import com.logicalclocks.featurestore.FeatureGroup; +import com.logicalclocks.featurestore.FeatureStore; +import com.logicalclocks.featurestore.FeatureStoreException; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; + +public class FeatureGroupApi { + + public static final String FEATURE_GROUP_PATH = "/featuregroups{/fgName}{?version}"; + + private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class); + + private HopsworksClient hopsworksClient; + + public FeatureGroupApi() throws FeatureStoreException { + hopsworksClient = HopsworksClient.getInstance(); + } + + public FeatureGroup get(FeatureStore featureStore, String fgName, Integer fgVersion) + throws IOException, FeatureStoreException { + String pathTemplate = HopsworksClient.PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureStore.getProjectId()) + .set("fsId", featureStore.getId()) + .set("fgName", fgName) + .set("version", fgVersion) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + FeatureGroup[] featureGroups = hopsworksClient.handleRequest(new HttpGet(uri), response -> { + String responseJSON = EntityUtils.toString(response.getEntity(), Charset.defaultCharset()); + return hopsworksClient.getObjectMapper().readValue(responseJSON, FeatureGroup[].class); + }); + + // There can be only one single feature group with a specific name and version in a feature store + // There has to be one otherwise an exception would have been thrown. + FeatureGroup resultFg = featureGroups[0]; + resultFg.setFeatureStore(featureStore); + return resultFg; + } +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureStoreApi.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureStoreApi.java new file mode 100644 index 0000000000..c44583e2cd --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/FeatureStoreApi.java @@ -0,0 +1,8 @@ +package com.logicalclocks.featurestore.metadata; + +public class FeatureStoreApi { + + public static final String FEATURE_STORE_SERVICE_PATH = "/featurestores"; + public static final String FEATURE_STORE_PATH = FEATURE_STORE_SERVICE_PATH + "{/fsId}"; + +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/HopsworksClient.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/HopsworksClient.java index e702e7d3eb..d9f91db176 100644 --- a/java/src/main/java/com/logicalclocks/featurestore/metadata/HopsworksClient.java +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/HopsworksClient.java @@ -1,24 +1,39 @@ package com.logicalclocks.featurestore.metadata; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.logicalclocks.featurestore.FeatureStoreException; +import lombok.Getter; +import org.apache.http.*; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; -import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.ssl.SSLContexts; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; -import java.io.FileInputStream; +import javax.net.ssl.SSLSession; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.security.KeyManagementException; -import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; @@ -26,12 +41,17 @@ public class HopsworksClient { + private static final Logger LOGGER = LoggerFactory.getLogger(HopsworksClient.class.getName()); + private final static String DOMAIN_CA_TRUSTSTORE = "hopsworks.domain.truststore"; private final static String TOKEN_PATH = "token.jwt"; + public final static String API_PATH = "/hopsworks-api/api"; + public final static String PROJECT_PATH = API_PATH + "/project{/projectId}"; + private static HopsworksClient hopsworksClientInstance = null; - public static HopsworksClient getInstance() { + public static HopsworksClient getInstance() throws FeatureStoreException { if (hopsworksClientInstance == null) { setupHopsworksClient(); } @@ -39,17 +59,36 @@ public static HopsworksClient getInstance() { return hopsworksClientInstance; } - private synchronized static void setupHopsworksClient() { + private synchronized static void setupHopsworksClient() throws FeatureStoreException { if (hopsworksClientInstance != null) { return; } - hopsworksClientInstance = new HopsworksClient(); + try { + hopsworksClientInstance = new HopsworksClient(); + } catch (Exception e) { + throw new FeatureStoreException("Could not setup Hopsworks client", e); + } } private PoolingHttpClientConnectionManager connectionPool = null; + private HttpHost httpHost = null; private CloseableHttpClient httpClient = null; - private HopsworksClient() { + private String hopsworksEndpoint = ""; + private String jwt = ""; + + @Getter + private ObjectMapper objectMapper; + + private HopsworksClient() throws IOException, KeyStoreException, CertificateException, + NoSuchAlgorithmException, KeyManagementException, FeatureStoreException { + + Properties systemProperties = System.getProperties(); + hopsworksEndpoint = systemProperties.getProperty("hopsworks.restendpoint"); + httpHost = HttpHost.create(hopsworksEndpoint); + + jwt = readContainerJwt(); + connectionPool = new PoolingHttpClientConnectionManager(createConnectionFactory()); connectionPool.setMaxTotal(10); connectionPool.setDefaultMaxPerRoute(10); @@ -58,6 +97,29 @@ private HopsworksClient() { .setConnectionManager(connectionPool) .setKeepAliveStrategy((httpResponse, httpContext) -> 30 * 1000) .build(); + + objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); + } + + private static class HopsworksHostnameVerifier implements HostnameVerifier { + + private boolean insecure = false; + private String hopsworksHost = ""; + + public HopsworksHostnameVerifier(boolean insecure, String restEndpoint) { + this.insecure = insecure; + this.hopsworksHost = restEndpoint.split(":")[0]; + } + + @Override + public boolean verify(String string, SSLSession ssls) { + return true; + //LOGGER.info("Verifying: " + string); + //LOGGER.info("against: " + hopsworksHost); + //return insecure || string.equals(hopsworksHost); + } } private Registry createConnectionFactory() @@ -65,21 +127,120 @@ private Registry createConnectionFactory() Properties systemProperties = System.getProperties(); Path trustStorePath = Paths.get(systemProperties.getProperty(DOMAIN_CA_TRUSTSTORE)); - KeyStore truststore = KeyStore.getInstance(KeyStore.getDefaultType()); - - try (FileInputStream trustStoreIS = new FileInputStream(trustStorePath.toFile())) { - truststore.load(trustStoreIS, null); - } - + LOGGER.info("Trust store path: " + trustStorePath); SSLContext sslCtx = SSLContexts.custom() - .loadTrustMaterial(truststore, new TrustSelfSignedStrategy()) + .loadTrustMaterial(trustStorePath.toFile(), null, new TrustSelfSignedStrategy()) .build(); - SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslCtx, NoopHostnameVerifier.INSTANCE); + boolean insecure = Boolean.parseBoolean(systemProperties.getProperty("hopsutil.insecure")); + + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslCtx, + new HopsworksHostnameVerifier(insecure, hopsworksEndpoint)); + return RegistryBuilder.create() .register("https", sslsf) .register("http", PlainConnectionSocketFactory.getSocketFactory()) .build(); } + private String readContainerJwt() throws FeatureStoreException { + String jwt = null; + try (FileChannel fc = FileChannel.open(Paths.get(TOKEN_PATH), StandardOpenOption.READ)) { + FileLock fileLock = fc.tryLock(0, Long.MAX_VALUE, true); + try { + short numRetries = 5; + short retries = 0; + while (fileLock == null && retries < numRetries) { + LOGGER.debug("Waiting for lock on jwt file at:" + TOKEN_PATH); + Thread.sleep(1000); + fileLock = fc.tryLock(0, Long.MAX_VALUE, true); + retries++; + } + //If could not acquire lock in reasonable time, throw exception + if (fileLock == null) { + throw new FeatureStoreException("Could not read jwt token from local container, possibly another process has" + + " acquired the lock"); + } + ByteBuffer buf = ByteBuffer.allocateDirect(512); + fc.read(buf); + buf.flip(); + jwt = StandardCharsets.UTF_8.decode(buf).toString(); + } catch (InterruptedException e) { + LOGGER.warn("JWT waiting thread was interrupted.", e); + } finally { + if (fileLock != null) { + fileLock.release(); + } + } + } catch (IOException e) { + throw new FeatureStoreException("Could not read jwt token from local container." + e.getMessage(), e); + } + return jwt; + } + + private static class AuthorizationHandler implements ResponseHandler { + + private ResponseHandler originalResponseHandler; + + AuthorizationHandler(ResponseHandler originalResponseHandler) { + this.originalResponseHandler = originalResponseHandler; + } + + @Override + public T handleResponse(HttpResponse response) throws ClientProtocolException, IOException { + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { + throw new UnauthorizedException(); + } else if (response.getStatusLine().getStatusCode() / 100 == 4) { + throw new IOException("Error: " + response.getStatusLine().getStatusCode() + + EntityUtils.toString(response.getEntity(), Charset.defaultCharset())); + } else if (response.getStatusLine().getStatusCode() / 100 == 5) { + // TODO(fabio): Propagate http error upstream + throw new InternalException(); + } + + return originalResponseHandler.handleResponse(response); + } + } + + private static class BaseHandler implements ResponseHandler { + + private Class cls; + private ObjectMapper objectMapper; + + public BaseHandler(Class cls, ObjectMapper objectMapper) { + this.cls = cls; + this.objectMapper = objectMapper; + } + + @Override + public T handleResponse(HttpResponse response) throws ClientProtocolException, IOException { + String responseJSON = EntityUtils.toString(response.getEntity(), Charset.defaultCharset()); + return objectMapper.readValue(responseJSON, cls); + } + } + + private static class UnauthorizedException extends ClientProtocolException {} + private static class InternalException extends ClientProtocolException {} + + public T handleRequest(HttpRequest request, ResponseHandler responseHandler) + throws IOException, FeatureStoreException { + LOGGER.debug("Handling metadata request: " + request); + AuthorizationHandler authHandler = new AuthorizationHandler<>(responseHandler); + request.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + jwt); + try { + return httpClient.execute(httpHost, request, authHandler); + } catch (UnauthorizedException e) { + // re-read the jwt and try one more time + readContainerJwt(); + request.setHeader(HttpHeaders.AUTHORIZATION, jwt); + return httpClient.execute(httpHost, request, responseHandler); + } catch (InternalException e) { + // Internal exception, try one more time + return httpClient.execute(httpHost, request, responseHandler); + } + } + + public T handleRequest(HttpRequest request, Class cls) throws IOException, FeatureStoreException { + return handleRequest(request, new BaseHandler<>(cls, objectMapper)); + } } diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/Join.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/Join.java new file mode 100644 index 0000000000..33fbb2e477 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/Join.java @@ -0,0 +1,42 @@ +package com.logicalclocks.featurestore.metadata; + +import com.logicalclocks.featurestore.Feature; +import com.logicalclocks.featurestore.JoinType; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +public class Join { + + @Getter @Setter + private Query query; + + @Getter @Setter + private List on; + @Getter @Setter + private List leftOn; + @Getter @Setter + private List rightOn; + + @Getter @Setter + private JoinType joinType; + + public Join(Query query, JoinType joinType) { + this.query = query; + this.joinType = joinType; + } + + public Join(Query query, List on, JoinType joinType) { + this.query = query; + this.on = on; + this.joinType = joinType; + } + + public Join(Query query, List leftOn, List rightOn, JoinType joinType) { + this.query = query; + this.leftOn = leftOn; + this.rightOn = rightOn; + this.joinType = joinType; + } +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/MetadataClient.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/MetadataClient.java new file mode 100644 index 0000000000..1de6c72639 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/MetadataClient.java @@ -0,0 +1,4 @@ +package com.logicalclocks.featurestore.metadata; + +public class MetadataClient { +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/Query.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/Query.java new file mode 100644 index 0000000000..92c9b2222d --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/Query.java @@ -0,0 +1,77 @@ +package com.logicalclocks.featurestore.metadata; + +import com.logicalclocks.featurestore.*; +import com.logicalclocks.featurestore.engine.SparkEngine; +import lombok.Getter; +import lombok.Setter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class Query { + + private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class); + + @Getter @Setter + private FeatureGroup leftFeatureGroup; + @Getter @Setter + private List leftFeatures; + + @Getter @Setter + private List joins = new ArrayList<>(); + + private QueryConstructorApi queryConstructorApi; + + public Query(FeatureGroup leftFeatureGroup, List leftFeatures) throws FeatureStoreException { + this.leftFeatureGroup = leftFeatureGroup; + this.leftFeatures = leftFeatures; + + this.queryConstructorApi = new QueryConstructorApi(); + } + + public Query join(Query subquery) { + return join(subquery, JoinType.INNER); + } + + public Query join(Query subquery, List on) { + return join(subquery, on, JoinType.INNER); + } + + public Query join(Query subquery, List leftOn, List rightOn) { + return join(subquery, leftOn, rightOn, JoinType.INNER); + } + + public Query join(Query subquery, JoinType joinType) { + joins.add(new Join(subquery, joinType)); + return this; + } + + public Query join(Query subquery, List on, JoinType joinType) { + joins.add(new Join(subquery, on, joinType)); + return this; + } + + public Query join(Query subquery, List leftOn, List rightOn, JoinType joinType) { + joins.add(new Join(subquery, leftOn, rightOn, joinType)); + return this; + } + + public Dataset read() throws FeatureStoreException, IOException { + String sqlQuery = + queryConstructorApi.constructQuery(leftFeatureGroup.getFeatureStore(), this); + LOGGER.info("Executing query: " + sqlQuery); + return SparkEngine.getInstance().read(sqlQuery); + } + + public Object head(int numRows) throws FeatureStoreException, IOException { + String sqlQuery = + queryConstructorApi.constructQuery(leftFeatureGroup.getFeatureStore(), this); + LOGGER.info("Executing query: " + sqlQuery); + return SparkEngine.getInstance().read(sqlQuery).head(numRows); + } +} diff --git a/java/src/main/java/com/logicalclocks/featurestore/metadata/QueryConstructorApi.java b/java/src/main/java/com/logicalclocks/featurestore/metadata/QueryConstructorApi.java new file mode 100644 index 0000000000..13bada6446 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/featurestore/metadata/QueryConstructorApi.java @@ -0,0 +1,47 @@ +package com.logicalclocks.featurestore.metadata; + +import com.damnhandy.uri.template.UriTemplate; +import com.logicalclocks.featurestore.FeatureStore; +import com.logicalclocks.featurestore.FeatureStoreException; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; + +public class QueryConstructorApi { + + public static final String QUERY_CONSTRUCTOR_PATH = "/query"; + + private static final Logger LOGGER = LoggerFactory.getLogger(QueryConstructorApi.class); + + private HopsworksClient hopsworksClient; + + public QueryConstructorApi() throws FeatureStoreException { + hopsworksClient = HopsworksClient.getInstance(); + } + + public String constructQuery(FeatureStore featureStore, Query query) throws FeatureStoreException, IOException { + String pathTemplate = HopsworksClient.PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_SERVICE_PATH + + QUERY_CONSTRUCTOR_PATH; + + String uri = UriTemplate .fromTemplate(pathTemplate) + .set("projectId", featureStore.getProjectId()) + .expand(); + + String queryJson = hopsworksClient.getObjectMapper().writeValueAsString(query); + HttpPost postRequest = new HttpPost(uri); + postRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + postRequest.setEntity(new StringEntity(queryJson)); + + LOGGER.info("Sending metadata request: " + uri); + LOGGER.info("Sending query: " + queryJson); + return hopsworksClient.handleRequest(postRequest, + response -> EntityUtils.toString(response.getEntity(), Charset.defaultCharset()).replace("\n", " ")); + } +} \ No newline at end of file From ea2a62c8f023e622d36ddb319538966acf13baaf Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Thu, 23 Jan 2020 16:01:40 +0100 Subject: [PATCH 2/3] remove duplicated code in feature group --- .../com/logicalclocks/featurestore/FeatureGroup.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java b/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java index 63cb57f5b9..9df84b26e7 100644 --- a/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java @@ -65,17 +65,11 @@ public FeatureGroup() throws FeatureStoreException { } public Dataset read() throws FeatureStoreException, IOException { - String sqlQuery = queryConstructorApi.constructQuery(featureStore, - new Query(this, Arrays.asList(new Feature("*")))); - LOGGER.info("Executing query: " + sqlQuery); - return SparkEngine.getInstance().read(sqlQuery); + return selectAll().read(); } public Object head(int numRows) throws FeatureStoreException, IOException { - String sqlQuery = queryConstructorApi.constructQuery(featureStore, - new Query(this, Arrays.asList(new Feature("*")))); - LOGGER.info("Executing query: " + sqlQuery); - return SparkEngine.getInstance().read(sqlQuery).head(numRows); + return selectAll().head(numRows); } public Query select(List features) throws FeatureStoreException, IOException { From e0e7f36649b12fec52581fa52ad57b82e1042342 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Thu, 23 Jan 2020 16:02:53 +0100 Subject: [PATCH 3/3] more code removed --- .../featurestore/FeatureGroup.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java b/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java index 9df84b26e7..71d6e37add 100644 --- a/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/featurestore/FeatureGroup.java @@ -1,26 +1,19 @@ package com.logicalclocks.featurestore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.logicalclocks.featurestore.engine.SparkEngine; import com.logicalclocks.featurestore.metadata.Query; -import com.logicalclocks.featurestore.metadata.QueryConstructorApi; import lombok.Getter; import lombok.Setter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; import java.util.Date; import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) public class FeatureGroup { - private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class); - @Getter @Setter private Integer id; @@ -45,23 +38,15 @@ public class FeatureGroup { @Getter @Setter private List features; - // API - private QueryConstructorApi queryConstructorApi; - - public FeatureGroup(String name, Integer version, String description, Date created, - String creator) - throws FeatureStoreException { + public FeatureGroup(String name, Integer version, String description, Date created, String creator) { this.name = name; this.version = version; this.description = description; this.created = created; this.creator = creator; - - this.queryConstructorApi = new QueryConstructorApi(); } - public FeatureGroup() throws FeatureStoreException { - this.queryConstructorApi = new QueryConstructorApi(); + public FeatureGroup() { } public Dataset read() throws FeatureStoreException, IOException {