Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature group query/join capabilities #3

Merged
merged 3 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 122 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,70 @@
<version>1.2.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<guava.version>14.0.1</guava.version>
<httpclient.version>4.5.6</httpclient.version>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<handy.version>2.1.8</handy.version>
<lombok.version>1.18.6</lombok.version>
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version>
<spark.version>2.4.3.0</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.databind.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.damnhandy</groupId>
<artifactId>handy-uri-templates</artifactId>
<version>${handy.version}</version>
</dependency>

<dependency>
Expand All @@ -33,5 +88,71 @@
<version>${httpclient.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>



<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>Hops</id>
<name>Hops Repo</name>
<url>https://bbc1.sics.se/archiva/repository/Hops/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>


</project>
32 changes: 32 additions & 0 deletions java/src/main/java/com/logicalclocks/featurestore/Feature.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.logicalclocks.featurestore;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@AllArgsConstructor
@NoArgsConstructor
public class Feature {
@Getter @Setter
private String name;

@Getter @Setter
private String Type;

@Getter @Setter
private String onlineType;

@Getter @Setter
private String description;

@Getter @Setter
private Boolean primary;

@Getter @Setter
private Boolean partition;

public Feature(String name) {
this.name = name;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,67 @@
package com.logicalclocks.featurestore;

import lombok.AllArgsConstructor;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.featurestore.metadata.Query;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

@AllArgsConstructor
@NoArgsConstructor
import java.io.IOException;
import java.util.Date;
import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
public class FeatureGroup {

@Getter
@Getter @Setter
private Integer id;

@Getter @Setter
private String name;

@Getter
@Getter @Setter
private Integer version;

@Getter @Setter
private String description;

@Getter @Setter
private Date created;

@Getter @Setter
private String creator;

@Getter @Setter
private FeatureStore featureStore;

@Getter @Setter
private List<Feature> features;

public FeatureGroup(String name, Integer version, String description, Date created, String creator) {
this.name = name;
this.version = version;
this.description = description;
this.created = created;
this.creator = creator;
}

public FeatureGroup() {
}

public Dataset<Row> read() throws FeatureStoreException, IOException {
return selectAll().read();
}

public Object head(int numRows) throws FeatureStoreException, IOException {
return selectAll().head(numRows);
}

public Query select(List<Feature> features) throws FeatureStoreException, IOException {
return new Query(this, features);
}

public Query selectAll() throws FeatureStoreException, IOException {
return new Query(this, getFeatures());
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
package com.logicalclocks.featurestore;

import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
import com.logicalclocks.featurestore.engine.SparkEngine;
import com.logicalclocks.featurestore.metadata.FeatureGroupApi;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.io.IOException;

@AllArgsConstructor
@NoArgsConstructor
public class FeatureStore {

@Getter
@Getter @Setter
private Integer id;

@Getter
@Getter @Setter
private String name;

@Getter @Setter
private Integer projectId;

private FeatureGroupApi featureGroupApi;

public FeatureStore() throws FeatureStoreException {
featureGroupApi = new FeatureGroupApi();
}

/**
* Get a feature group from the feature store
* @param name: the name of the feature group
* @param version: the version of the feature group
* @return
* @throws FeatureStoreException
*/
public FeatureGroup getFeatureGroup(String name, Integer version) throws FeatureStoreException {
public FeatureGroup getFeatureGroup(String name, Integer version)
throws FeatureStoreException, IOException {
if (Strings.isNullOrEmpty(name) || version == null) {
throw new FeatureStoreException("Both name and version are required");
}
return null;
return featureGroupApi.get(this, name, version);
}

public Dataset<Row> sql(String query) {
return SparkEngine.getInstance().sql(query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ public FeatureStoreException(String msg) {
super(msg);
}

public FeatureStoreException(String msg, Throwable cause) {
super(msg, cause);
}
}
11 changes: 11 additions & 0 deletions java/src/main/java/com/logicalclocks/featurestore/JoinType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.logicalclocks.featurestore;

public enum JoinType {
INNER,
FULL,
CROSS,
LEFT,
RIGHT,
LEFT_SEMI_JOIN,
COMMA;
}
23 changes: 23 additions & 0 deletions java/src/main/java/com/logicalclocks/featurestore/MainClass.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.logicalclocks.featurestore;

import com.logicalclocks.featurestore.engine.SparkEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MainClass {

private final static Logger LOGGER = LoggerFactory.getLogger(MainClass.class);

public static void main(String[] args) throws Exception {
FeatureStore fs = new FeatureStore();
fs.setProjectId(120);
fs.setId(67);

FeatureGroup fg = fs.getFeatureGroup("attendances_features", 1);
LOGGER.info("Name " + fg.getName());

LOGGER.info("Brace yourself, I'm running the query");
fg.read().show(10);
SparkEngine.getInstance().getSparkSession().close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.logicalclocks.featurestore.engine;

import lombok.Getter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkEngine {

private static SparkEngine INSTANCE = null;

public static synchronized SparkEngine getInstance() {
if (INSTANCE == null) {
INSTANCE = new SparkEngine();
}
return INSTANCE;
}

private static final Logger LOGGER = LoggerFactory.getLogger(SparkEngine.class);

@Getter
private SparkSession sparkSession;

private SparkEngine() {
sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();
}

public Dataset<Row> read(String query) {
LOGGER.info("Lazily executing query: " + query);
return sparkSession.sql(query);
}

public Dataset<Row> sql(String query) {
LOGGER.info("Lazily executing query: " + query);
return sparkSession.sql(query);
}
}
Loading