Skip to content

Commit

Permalink
Java Feature group (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Jul 1, 2020
1 parent c714d33 commit 055da08
Show file tree
Hide file tree
Showing 24 changed files with 969 additions and 203 deletions.
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<handy.version>2.1.8</handy.version>
<lombok.version>1.18.6</lombok.version>
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version>
<spark.version>2.4.3.0</spark.version>
<spark.version>2.4.3.2</spark.version>
</properties>

<dependencies>
Expand Down
19 changes: 19 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.logicalclocks.hsfs;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.parquet.Strings;

@AllArgsConstructor
@NoArgsConstructor
Expand Down Expand Up @@ -49,4 +51,21 @@ public Feature(String name, String type) {
this.name = name;
this.type = type;
}

@Builder
public Feature(String name, String type, String onlineType, Boolean primary, Boolean partition)
throws FeatureStoreException {
if (Strings.isNullOrEmpty(name)) {
throw new FeatureStoreException("Name is required when creating a feature");
}
this.name = name;

if (Strings.isNullOrEmpty(type)) {
throw new FeatureStoreException("Type is required when creating a feature");
}
this.type = type;
this.onlineType = onlineType;
this.primary = primary;
this.partition = partition;
}
}
144 changes: 129 additions & 15 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
*/
package com.logicalclocks.hsfs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.metadata.Query;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@JsonIgnoreProperties(ignoreUnknown = true)
public class FeatureGroup {

@Getter @Setter
private Integer id;

Expand All @@ -43,34 +47,68 @@ public class FeatureGroup {
private String description;

@Getter @Setter
private Date created;
private FeatureStore featureStore;

@Getter @Setter
private List<Feature> features;

@Getter
private Date created;

@Getter
private String creator;

@Getter @Setter
private FeatureStore featureStore;
private Storage defaultStorage;

@Getter @Setter
private List<Feature> features;
private Boolean onlineEnabled;

public FeatureGroup(String name, Integer version, String description, Date created, String creator) {
@Getter @Setter
private String type = "cachedFeaturegroupDTO";

@JsonIgnore
// These are only used in the client. In the server they are aggregated in the `features` field
private List<String> primaryKeys;

@JsonIgnore
// These are only used in the client. In the server they are aggregated in the `features` field
private List<String> partitionKeys;

private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine();

@Builder
public FeatureGroup(FeatureStore featureStore, String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
boolean onlineEnabled, Storage defaultStorage, List<Feature> features)
throws FeatureStoreException {
if (name == null) {
throw new FeatureStoreException("Name is required when creating a feature group");
}
if (version == null) {
throw new FeatureStoreException("Version is required when creating a feature group");
}

this.featureStore = featureStore;
this.name = name;
this.version = version;
this.description = description;
this.created = created;
this.creator = creator;
this.primaryKeys = primaryKeys;
this.partitionKeys = partitionKeys;
this.onlineEnabled = onlineEnabled;
this.defaultStorage = defaultStorage != null ? defaultStorage : Storage.OFFLINE;
this.features = features;
}

public FeatureGroup() {
}

public Dataset<Row> read() throws FeatureStoreException, IOException {
return selectAll().read();
public Query selectFeatures(List<Feature> features) throws FeatureStoreException, IOException {
return new Query(this, features);
}

public void show(int numRows) throws FeatureStoreException, IOException {
selectAll().show(numRows);
public Query selectAll() throws FeatureStoreException, IOException {
return new Query(this, getFeatures());
}

public Query select(List<String> features) throws FeatureStoreException, IOException {
Expand All @@ -80,11 +118,87 @@ public Query select(List<String> features) throws FeatureStoreException, IOExcep
return selectFeatures(featureObjList);
}

public Query selectFeatures(List<Feature> features) throws FeatureStoreException, IOException {
return new Query(this, features);
public Dataset<Row> read() throws FeatureStoreException, IOException {
return read(this.defaultStorage);
}

public Query selectAll() throws FeatureStoreException, IOException {
return new Query(this, getFeatures());
public Dataset<Row> read(Storage storage) throws FeatureStoreException, IOException {
return selectAll().read(storage);
}

public void show(int numRows) throws FeatureStoreException, IOException {
show(numRows, defaultStorage);
}

public void show(int numRows, Storage storage) throws FeatureStoreException, IOException {
read(storage).show(numRows);
}

public void save(Dataset<Row> featureData) throws FeatureStoreException, IOException {
save(featureData, defaultStorage, null);
}

public void save(Dataset<Row> featureData, Storage storage) throws FeatureStoreException, IOException {
save(featureData, storage, null);
}

public void save(Dataset<Row> featureData, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
save(featureData, defaultStorage, writeOptions);
}

public void save(Dataset<Row> featureData, Storage storage, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, storage, writeOptions);
}

public void insert(Dataset<Row> featureData, Storage storage) throws IOException, FeatureStoreException {
insert(featureData, storage, false, null);
}

public void insert(Dataset<Row> featureData, boolean overwrite) throws IOException, FeatureStoreException {
insert(featureData, overwrite, null);
}

public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite)
throws IOException, FeatureStoreException {
insert(featureData, storage, overwrite, null);
}

public void insert(Dataset<Row> featureData, boolean overwrite, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
insert(featureData, defaultStorage, overwrite, writeOptions);
}

public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
featureGroupEngine.saveDataframe(this, featureData, storage,
overwrite ? SaveMode.Overwrite : SaveMode.Append, writeOptions);
}

public void delete() throws FeatureStoreException, IOException {
featureGroupEngine.delete(this);
}

public void addTag(String name) throws FeatureStoreException, IOException {
addTag(name, null);
}

public void addTag(String name, String value) throws FeatureStoreException, IOException {
featureGroupEngine.addTag(this, name, value);
}

@JsonIgnore
public Map<String, String> getTags() throws FeatureStoreException, IOException {
return featureGroupEngine.getTags(this);
}

@JsonIgnore
public String getTag(String name) throws FeatureStoreException, IOException {
return featureGroupEngine.getTag(this, name);
}

public void deleteTag(String name) throws FeatureStoreException, IOException {
featureGroupEngine.deleteTag(this, name);
}
}
6 changes: 6 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.engine.SparkEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.metadata.TrainingDatasetApi;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -74,6 +75,11 @@ public StorageConnector getStorageConnector(String name, StorageConnectorType ty
return storageConnectorApi.getByNameAndType(this, name, type);
}

public FeatureGroup.FeatureGroupBuilder createFeatureGroup() {
return FeatureGroup.builder()
.featureStore(this);
}

public TrainingDataset.TrainingDatasetBuilder createTrainingDataset() {
return TrainingDataset.builder()
.featureStore(this);
Expand Down
19 changes: 19 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/FsQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,23 @@
public class FsQuery {
@Getter @Setter
private String query;

@Getter @Setter
private String queryOnline;

public void removeNewLines() {
query = query.replace("\n", " ");
queryOnline = queryOnline.replace("\n", " ");
}

public String getStorageQuery(Storage storage) throws FeatureStoreException {
switch (storage) {
case OFFLINE:
return query;
case ONLINE:
return queryOnline;
default:
throw new FeatureStoreException("Cannot run query on ALL storages");
}
}
}
11 changes: 11 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/MainClass.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MainClass {
Expand All @@ -34,6 +35,16 @@ public static void main(String[] args) throws Exception {
FeatureStore fs = connection.getFeatureStore();
LOGGER.info("Feature Store " + fs);

FeatureGroup housingFeatureGroup = fs.createFeatureGroup()
.name("housing")
.description("House pricing model features")
.version(1)
.primaryKeys(Arrays.asList("house_id", "date"))
.partitionKeys(Arrays.asList("country"))
.onlineEnabled(true)
.defaultStorage(Storage.OFFLINE)
.build();

FeatureGroup attendance = fs.getFeatureGroup("attendances_features", 1);
FeatureGroup players = fs.getFeatureGroup("players_features", 1);

Expand Down
23 changes: 23 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/Storage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2020 Logical Clocks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.logicalclocks.hsfs;

public enum Storage {
OFFLINE,
ONLINE,
ALL
}
34 changes: 34 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@
*/
package com.logicalclocks.hsfs;

import com.logicalclocks.hsfs.util.Constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@AllArgsConstructor
@NoArgsConstructor
public class StorageConnector {
Expand All @@ -36,6 +44,32 @@ public class StorageConnector {
@Getter @Setter
private String secretKey;

@Getter @Setter
private String connectionString;

@Getter @Setter
private String arguments;

@Getter @Setter
private StorageConnectorType storageConnectorType;

public Map<String, String> getSparkOptions() throws FeatureStoreException{
List<String[]> args = Arrays.stream(arguments.split(","))
.map(arg -> arg.split("="))
.collect(Collectors.toList());

String user = args.stream().filter(arg -> arg[0].equalsIgnoreCase(Constants.JDBC_USER))
.findFirst()
.orElseThrow(() -> new FeatureStoreException("No user provided for storage connector"))[1];

String password = args.stream().filter(arg -> arg[0].equalsIgnoreCase(Constants.JDBC_PWD))
.findFirst()
.orElseThrow(() -> new FeatureStoreException("No password provided for storage connector"))[1];

Map<String, String> options = new HashMap<>();
options.put(Constants.JDBC_URL, connectionString);
options.put(Constants.JDBC_USER, user);
options.put(Constants.JDBC_PWD, password);
return options;
}
}
Loading

0 comments on commit 055da08

Please sign in to comment.