From 055da08dacbb7a18db34f257fdcac5141f3fd603 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Wed, 1 Jul 2020 20:22:17 +0200 Subject: [PATCH] Java Feature group (#20) --- java/pom.xml | 2 +- .../java/com/logicalclocks/hsfs/Feature.java | 19 ++ .../com/logicalclocks/hsfs/FeatureGroup.java | 144 ++++++++++-- .../com/logicalclocks/hsfs/FeatureStore.java | 6 + .../java/com/logicalclocks/hsfs/FsQuery.java | 19 ++ .../com/logicalclocks/hsfs/MainClass.java | 11 + .../java/com/logicalclocks/hsfs/Storage.java | 23 ++ .../logicalclocks/hsfs/StorageConnector.java | 34 +++ .../hsfs/engine/FeatureGroupEngine.java | 150 ++++++++++++ .../hsfs/engine/SparkEngine.java | 221 +++++++++++++++++- .../hsfs/engine/TrainingDatasetEngine.java | 154 +----------- .../com/logicalclocks/hsfs/engine/Utils.java | 40 +++- .../hsfs/metadata/AuthorizationHandler.java | 15 +- .../hsfs/metadata/FeatureGroupApi.java | 161 ++++++++++++- .../hsfs/metadata/HopsworksClient.java | 4 + .../metadata/HopsworksExternalClient.java | 2 +- .../metadata/HopsworksInternalClient.java | 4 +- .../hsfs/metadata/InternalException.java | 3 + .../logicalclocks/hsfs/metadata/Query.java | 37 ++- .../hsfs/metadata/QueryConstructorApi.java | 5 +- .../logicalclocks/hsfs/metadata/RestDTO.java | 39 ++++ .../{ => metadata}/StorageConnectorApi.java | 31 ++- .../com/logicalclocks/hsfs/metadata/Tags.java | 41 ++++ .../logicalclocks/hsfs/util/Constants.java | 7 + 24 files changed, 969 insertions(+), 203 deletions(-) create mode 100644 java/src/main/java/com/logicalclocks/hsfs/Storage.java create mode 100644 java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java create mode 100644 java/src/main/java/com/logicalclocks/hsfs/metadata/RestDTO.java rename java/src/main/java/com/logicalclocks/hsfs/{ => metadata}/StorageConnectorApi.java (63%) create mode 100644 java/src/main/java/com/logicalclocks/hsfs/metadata/Tags.java diff --git a/java/pom.xml b/java/pom.xml index 1a28246a5a..143405bfee 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -19,7 +19,7 @@ 2.1.8 1.18.6 2.6.7.1 - 2.4.3.0 + 2.4.3.2 diff --git a/java/src/main/java/com/logicalclocks/hsfs/Feature.java b/java/src/main/java/com/logicalclocks/hsfs/Feature.java index 45b2ae3cd2..8655fc8503 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/Feature.java +++ b/java/src/main/java/com/logicalclocks/hsfs/Feature.java @@ -16,9 +16,11 @@ package com.logicalclocks.hsfs; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import org.apache.parquet.Strings; @AllArgsConstructor @NoArgsConstructor @@ -49,4 +51,21 @@ public Feature(String name, String type) { this.name = name; this.type = type; } + + @Builder + public Feature(String name, String type, String onlineType, Boolean primary, Boolean partition) + throws FeatureStoreException { + if (Strings.isNullOrEmpty(name)) { + throw new FeatureStoreException("Name is required when creating a feature"); + } + this.name = name; + + if (Strings.isNullOrEmpty(type)) { + throw new FeatureStoreException("Type is required when creating a feature"); + } + this.type = type; + this.onlineType = onlineType; + this.primary = primary; + this.partition = partition; + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java index 086297d84e..99b01535f9 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java @@ -15,21 +15,25 @@ */ package com.logicalclocks.hsfs; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.logicalclocks.hsfs.engine.FeatureGroupEngine; import com.logicalclocks.hsfs.metadata.Query; +import lombok.Builder; import lombok.Getter; import lombok.Setter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; import java.io.IOException; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @JsonIgnoreProperties(ignoreUnknown = true) public class FeatureGroup { - @Getter @Setter private Integer id; @@ -43,34 +47,68 @@ public class FeatureGroup { private String description; @Getter @Setter - private Date created; + private FeatureStore featureStore; @Getter @Setter + private List features; + + @Getter + private Date created; + + @Getter private String creator; @Getter @Setter - private FeatureStore featureStore; + private Storage defaultStorage; @Getter @Setter - private List features; + private Boolean onlineEnabled; - public FeatureGroup(String name, Integer version, String description, Date created, String creator) { + @Getter @Setter + private String type = "cachedFeaturegroupDTO"; + + @JsonIgnore + // These are only used in the client. In the server they are aggregated in the `features` field + private List primaryKeys; + + @JsonIgnore + // These are only used in the client. In the server they are aggregated in the `features` field + private List partitionKeys; + + private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine(); + + @Builder + public FeatureGroup(FeatureStore featureStore, String name, Integer version, String description, + List primaryKeys, List partitionKeys, + boolean onlineEnabled, Storage defaultStorage, List features) + throws FeatureStoreException { + if (name == null) { + throw new FeatureStoreException("Name is required when creating a feature group"); + } + if (version == null) { + throw new FeatureStoreException("Version is required when creating a feature group"); + } + + this.featureStore = featureStore; this.name = name; this.version = version; this.description = description; - this.created = created; - this.creator = creator; + this.primaryKeys = primaryKeys; + this.partitionKeys = partitionKeys; + this.onlineEnabled = onlineEnabled; + this.defaultStorage = defaultStorage != null ? defaultStorage : Storage.OFFLINE; + this.features = features; } public FeatureGroup() { } - public Dataset read() throws FeatureStoreException, IOException { - return selectAll().read(); + public Query selectFeatures(List features) throws FeatureStoreException, IOException { + return new Query(this, features); } - public void show(int numRows) throws FeatureStoreException, IOException { - selectAll().show(numRows); + public Query selectAll() throws FeatureStoreException, IOException { + return new Query(this, getFeatures()); } public Query select(List features) throws FeatureStoreException, IOException { @@ -80,11 +118,87 @@ public Query select(List features) throws FeatureStoreException, IOExcep return selectFeatures(featureObjList); } - public Query selectFeatures(List features) throws FeatureStoreException, IOException { - return new Query(this, features); + public Dataset read() throws FeatureStoreException, IOException { + return read(this.defaultStorage); } - public Query selectAll() throws FeatureStoreException, IOException { - return new Query(this, getFeatures()); + public Dataset read(Storage storage) throws FeatureStoreException, IOException { + return selectAll().read(storage); + } + + public void show(int numRows) throws FeatureStoreException, IOException { + show(numRows, defaultStorage); + } + + public void show(int numRows, Storage storage) throws FeatureStoreException, IOException { + read(storage).show(numRows); + } + + public void save(Dataset featureData) throws FeatureStoreException, IOException { + save(featureData, defaultStorage, null); + } + + public void save(Dataset featureData, Storage storage) throws FeatureStoreException, IOException { + save(featureData, storage, null); + } + + public void save(Dataset featureData, Map writeOptions) + throws FeatureStoreException, IOException { + save(featureData, defaultStorage, writeOptions); + } + + public void save(Dataset featureData, Storage storage, Map writeOptions) + throws FeatureStoreException, IOException { + featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, storage, writeOptions); + } + + public void insert(Dataset featureData, Storage storage) throws IOException, FeatureStoreException { + insert(featureData, storage, false, null); + } + + public void insert(Dataset featureData, boolean overwrite) throws IOException, FeatureStoreException { + insert(featureData, overwrite, null); + } + + public void insert(Dataset featureData, Storage storage, boolean overwrite) + throws IOException, FeatureStoreException { + insert(featureData, storage, overwrite, null); + } + + public void insert(Dataset featureData, boolean overwrite, Map writeOptions) + throws FeatureStoreException, IOException { + insert(featureData, defaultStorage, overwrite, writeOptions); + } + + public void insert(Dataset featureData, Storage storage, boolean overwrite, Map writeOptions) + throws FeatureStoreException, IOException { + featureGroupEngine.saveDataframe(this, featureData, storage, + overwrite ? SaveMode.Overwrite : SaveMode.Append, writeOptions); + } + + public void delete() throws FeatureStoreException, IOException { + featureGroupEngine.delete(this); + } + + public void addTag(String name) throws FeatureStoreException, IOException { + addTag(name, null); + } + + public void addTag(String name, String value) throws FeatureStoreException, IOException { + featureGroupEngine.addTag(this, name, value); + } + + @JsonIgnore + public Map getTags() throws FeatureStoreException, IOException { + return featureGroupEngine.getTags(this); + } + + @JsonIgnore + public String getTag(String name) throws FeatureStoreException, IOException { + return featureGroupEngine.getTag(this, name); + } + + public void deleteTag(String name) throws FeatureStoreException, IOException { + featureGroupEngine.deleteTag(this, name); } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java index ad245f8f89..10eaeb0d81 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java @@ -19,6 +19,7 @@ import com.google.common.base.Strings; import com.logicalclocks.hsfs.engine.SparkEngine; import com.logicalclocks.hsfs.metadata.FeatureGroupApi; +import com.logicalclocks.hsfs.metadata.StorageConnectorApi; import com.logicalclocks.hsfs.metadata.TrainingDatasetApi; import lombok.Getter; import lombok.Setter; @@ -74,6 +75,11 @@ public StorageConnector getStorageConnector(String name, StorageConnectorType ty return storageConnectorApi.getByNameAndType(this, name, type); } + public FeatureGroup.FeatureGroupBuilder createFeatureGroup() { + return FeatureGroup.builder() + .featureStore(this); + } + public TrainingDataset.TrainingDatasetBuilder createTrainingDataset() { return TrainingDataset.builder() .featureStore(this); diff --git a/java/src/main/java/com/logicalclocks/hsfs/FsQuery.java b/java/src/main/java/com/logicalclocks/hsfs/FsQuery.java index 3023d9b107..fa30731a0c 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FsQuery.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FsQuery.java @@ -27,4 +27,23 @@ public class FsQuery { @Getter @Setter private String query; + + @Getter @Setter + private String queryOnline; + + public void removeNewLines() { + query = query.replace("\n", " "); + queryOnline = queryOnline.replace("\n", " "); + } + + public String getStorageQuery(Storage storage) throws FeatureStoreException { + switch (storage) { + case OFFLINE: + return query; + case ONLINE: + return queryOnline; + default: + throw new FeatureStoreException("Cannot run query on ALL storages"); + } + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/MainClass.java b/java/src/main/java/com/logicalclocks/hsfs/MainClass.java index 21d6476c8b..cebda56bcf 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/MainClass.java +++ b/java/src/main/java/com/logicalclocks/hsfs/MainClass.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class MainClass { @@ -34,6 +35,16 @@ public static void main(String[] args) throws Exception { FeatureStore fs = connection.getFeatureStore(); LOGGER.info("Feature Store " + fs); + FeatureGroup housingFeatureGroup = fs.createFeatureGroup() + .name("housing") + .description("House pricing model features") + .version(1) + .primaryKeys(Arrays.asList("house_id", "date")) + .partitionKeys(Arrays.asList("country")) + .onlineEnabled(true) + .defaultStorage(Storage.OFFLINE) + .build(); + FeatureGroup attendance = fs.getFeatureGroup("attendances_features", 1); FeatureGroup players = fs.getFeatureGroup("players_features", 1); diff --git a/java/src/main/java/com/logicalclocks/hsfs/Storage.java b/java/src/main/java/com/logicalclocks/hsfs/Storage.java new file mode 100644 index 0000000000..02672198df --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/Storage.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2020 Logical Clocks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.logicalclocks.hsfs; + +public enum Storage { + OFFLINE, + ONLINE, + ALL +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 4d2144c003..e1dd005e13 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -15,11 +15,19 @@ */ package com.logicalclocks.hsfs; +import com.logicalclocks.hsfs.util.Constants; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + @AllArgsConstructor @NoArgsConstructor public class StorageConnector { @@ -36,6 +44,32 @@ public class StorageConnector { @Getter @Setter private String secretKey; + @Getter @Setter + private String connectionString; + + @Getter @Setter + private String arguments; + @Getter @Setter private StorageConnectorType storageConnectorType; + + public Map getSparkOptions() throws FeatureStoreException{ + List args = Arrays.stream(arguments.split(",")) + .map(arg -> arg.split("=")) + .collect(Collectors.toList()); + + String user = args.stream().filter(arg -> arg[0].equalsIgnoreCase(Constants.JDBC_USER)) + .findFirst() + .orElseThrow(() -> new FeatureStoreException("No user provided for storage connector"))[1]; + + String password = args.stream().filter(arg -> arg[0].equalsIgnoreCase(Constants.JDBC_PWD)) + .findFirst() + .orElseThrow(() -> new FeatureStoreException("No password provided for storage connector"))[1]; + + Map options = new HashMap<>(); + options.put(Constants.JDBC_URL, connectionString); + options.put(Constants.JDBC_USER, user); + options.put(Constants.JDBC_PWD, password); + return options; + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java new file mode 100644 index 0000000000..ad779f1e34 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java @@ -0,0 +1,150 @@ +package com.logicalclocks.hsfs.engine; + +import com.logicalclocks.hsfs.FeatureGroup; +import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; +import com.logicalclocks.hsfs.metadata.StorageConnectorApi; +import com.logicalclocks.hsfs.metadata.FeatureGroupApi; +import com.logicalclocks.hsfs.util.Constants; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FeatureGroupEngine { + + private FeatureGroupApi featureGroupApi = new FeatureGroupApi(); + private StorageConnectorApi storageConnectorApi = new StorageConnectorApi(); + private Utils utils = new Utils(); + + private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupEngine.class); + + //TODO: + // Compute statistics + + /** + * Create the metadata and write the data to the online/offline feature store + * @param featureGroup + * @param dataset + * @param primaryKeys + * @param partitionKeys + * @param storage + * @param writeOptions + * @throws FeatureStoreException + * @throws IOException + */ + public void saveFeatureGroup(FeatureGroup featureGroup, Dataset dataset, + List primaryKeys, List partitionKeys, + Storage storage, Map writeOptions) + throws FeatureStoreException, IOException { + + if (featureGroup.getFeatureStore() != null) { + featureGroup.setFeatures(utils.parseSchema(dataset)); + } + + LOGGER.info("Featuregroup features: " + featureGroup.getFeatures()); + + /* set primary features */ + if (primaryKeys != null) { + primaryKeys.forEach(pk -> + featureGroup.getFeatures().forEach(f -> { + if (f.getName().equals(pk)) { + f.setPrimary(true); + } + })); + } + + /* set partition key features */ + if (partitionKeys != null) { + partitionKeys.forEach(pk -> + featureGroup.getFeatures().forEach(f -> { + if (f.getName().equals(pk)) { + f.setPartition(true); + } + })); + } + + // Send Hopsworks the request to create a new feature group + featureGroupApi.save(featureGroup); + + // Write the dataframe + saveDataframe(featureGroup, dataset, storage, SaveMode.Append, writeOptions); + } + + public void saveDataframe(FeatureGroup featureGroup, Dataset dataset, Storage storage, + SaveMode saveMode, Map writeOptions) + throws IOException, FeatureStoreException { + switch (storage) { + case OFFLINE: + saveOfflineDataframe(featureGroup, dataset, saveMode, writeOptions); + break; + case ONLINE: + saveOnlineDataframe(featureGroup, dataset, saveMode, writeOptions); + break; + case ALL: + saveOfflineDataframe(featureGroup, dataset, saveMode, writeOptions); + saveOnlineDataframe(featureGroup, dataset, saveMode, writeOptions); + } + } + + /** + * Write dataframe to the offline feature store + * @param featureGroup + * @param dataset + * @param saveMode + * @param writeOptions + */ + private void saveOfflineDataframe(FeatureGroup featureGroup, Dataset dataset, + SaveMode saveMode, Map writeOptions) + throws FeatureStoreException, IOException { + + if (saveMode == SaveMode.Overwrite) { + // If we set overwrite, then the directory will be removed and with it all the metadata + // related to the feature group will be lost. We need to keep them. + // So we call Hopsworks to manage to truncate the table and re-create the metadata + // After that it's going to be just a normal append + featureGroupApi.deleteContent(featureGroup); + saveMode = SaveMode.Append; + } + + SparkEngine.getInstance().writeOfflineDataframe(featureGroup, dataset, saveMode, writeOptions); + } + + private void saveOnlineDataframe(FeatureGroup featureGroup, Dataset dataset, + SaveMode saveMode, Map providedWriteOptions) + throws IOException, FeatureStoreException { + StorageConnector storageConnector = storageConnectorApi.getOnlineStorageConnector(featureGroup.getFeatureStore()); + Map writeOptions = + SparkEngine.getInstance().getOnlineOptions(providedWriteOptions, featureGroup, storageConnector); + SparkEngine.getInstance().writeOnlineDataframe(dataset, saveMode, writeOptions); + } + + + public void delete(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + featureGroupApi.delete(featureGroup); + } + + public void addTag(FeatureGroup featureGroup, String name, String value) throws FeatureStoreException, IOException { + featureGroupApi.addTag(featureGroup, name, value); + } + + public Map getTags(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + return featureGroupApi.getTags(featureGroup); + } + + public String getTag(FeatureGroup featureGroup, String name) throws FeatureStoreException, IOException { + return featureGroupApi.getTag(featureGroup, name); + } + + public void deleteTag(FeatureGroup featureGroup, String name) throws FeatureStoreException, IOException { + featureGroupApi.deleteTag(featureGroup, name); + } + +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java index daaf4ac509..85ddcb9be2 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java @@ -15,15 +15,26 @@ */ package com.logicalclocks.hsfs.engine; +import com.logicalclocks.hsfs.DataFormat; +import com.logicalclocks.hsfs.FeatureGroup; +import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.Split; import com.logicalclocks.hsfs.StorageConnector; +import com.logicalclocks.hsfs.TrainingDataset; import com.logicalclocks.hsfs.util.Constants; import lombok.Getter; import org.apache.parquet.Strings; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; public class SparkEngine { @@ -36,19 +47,33 @@ public static synchronized SparkEngine getInstance() { return INSTANCE; } - private static final Logger LOGGER = LoggerFactory.getLogger(SparkEngine.class); - @Getter private SparkSession sparkSession; + private Utils utils = new Utils(); private SparkEngine() { - sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate(); + sparkSession = SparkSession.builder() + .enableHiveSupport() + .getOrCreate(); + + // Configure the Spark context to allow dynamic partitions + sparkSession.conf().set("hive.exec.dynamic.partition", "true"); + sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict"); } public Dataset sql(String query) { return sparkSession.sql(query); } + public Dataset jdbc(StorageConnector storageConnector, String query) throws FeatureStoreException { + Map readOptions = storageConnector.getSparkOptions(); + readOptions.put("query", query); + return sparkSession.read() + .format(Constants.JDBC_FORMAT) + .options(readOptions) + .load(); + } + public void configureConnector(StorageConnector storageConnector) { switch (storageConnector.getStorageConnectorType()) { case S3: @@ -71,4 +96,190 @@ private void configureS3Connector(StorageConnector storageConnector) { sparkSession.conf().set("fs.s3a.secret.key", storageConnector.getSecretKey()); } } + + /** + * Setup Spark to write the data on the File System + * @param trainingDataset + * @param dataset + * @param writeOptions + * @param saveMode + */ + public void write(TrainingDataset trainingDataset, Dataset dataset, + Map writeOptions, SaveMode saveMode) { + + if (trainingDataset.getStorageConnector() != null) { + SparkEngine.getInstance().configureConnector(trainingDataset.getStorageConnector()); + } + + if (trainingDataset.getSplits() == null) { + // Write a single dataset + + // The actual data will be stored in training_ds_version/training_ds the double directory is needed + // for cases such as tfrecords in which we need to store also the schema + // also in case of multiple splits, the single splits will be stored inside the training dataset dir + String path = Paths.get(trainingDataset.getLocation(), trainingDataset.getName()).toString(); + + writeSingle(dataset, trainingDataset.getDataFormat(), + writeOptions, saveMode, path); + } else { + List splitFactors = trainingDataset.getSplits().stream() + .map(Split::getPercentage) + .collect(Collectors.toList()); + + // The actual data will be stored in training_ds_version/split_name + Dataset[] datasetSplits = null; + if (trainingDataset.getSeed() != null) { + datasetSplits = dataset.randomSplit( + splitFactors.stream().mapToDouble(Float::doubleValue).toArray(), trainingDataset.getSeed()); + } else { + datasetSplits = dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray()); + } + + writeSplits(datasetSplits, + trainingDataset.getDataFormat(), writeOptions, saveMode, + trainingDataset.getLocation(), trainingDataset.getSplits()); + } + } + + public Map getWriteOptions(Map providedOptions, DataFormat dataFormat) { + Map writeOptions = new HashMap<>(); + switch (dataFormat) { + case CSV: + writeOptions.put(Constants.HEADER, "true"); + writeOptions.put(Constants.DELIMITER, ","); + break; + case TSV: + writeOptions.put(Constants.HEADER, "true"); + writeOptions.put(Constants.DELIMITER, "\t"); + break; + case TFRECORDS: + writeOptions.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example"); + } + + if (providedOptions != null && !providedOptions.isEmpty()) { + writeOptions.putAll(providedOptions); + } + + return writeOptions; + } + + public Map getReadOptions(Map providedOptions, DataFormat dataFormat) { + Map readOptions = new HashMap<>(); + switch (dataFormat) { + case CSV: + readOptions.put(Constants.HEADER, "true"); + readOptions.put(Constants.DELIMITER, ","); + readOptions.put(Constants.INFER_SCHEMA, "true"); + break; + case TSV: + readOptions.put(Constants.HEADER, "true"); + readOptions.put(Constants.DELIMITER, "\t"); + readOptions.put(Constants.INFER_SCHEMA, "true"); + break; + case TFRECORDS: + readOptions.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example"); + } + + if (providedOptions != null && !providedOptions.isEmpty()) { + readOptions.putAll(providedOptions); + } + + return readOptions; + } + + /** + * Write multiple training dataset splits and name them. + * @param datasets + * @param dataFormat + * @param writeOptions + * @param saveMode + * @param basePath + * @param splits + */ + private void writeSplits(Dataset[] datasets, DataFormat dataFormat, Map writeOptions, + SaveMode saveMode, String basePath, List splits) { + for (int i=0; i < datasets.length; i++) { + writeSingle(datasets[i], dataFormat, writeOptions, saveMode, + Paths.get(basePath, splits.get(i).getName()).toString()); + } + } + + /** + * Write a single dataset split + * @param dataset + * @param dataFormat + * @param writeOptions + * @param saveMode + * @param path: it should be the full path + */ + private void writeSingle(Dataset dataset, DataFormat dataFormat, + Map writeOptions, SaveMode saveMode, String path) { + dataset + .write() + .format(dataFormat.toString()) + .options(writeOptions) + .mode(saveMode) + .save(SparkEngine.sparkPath(path)); + } + + public Dataset read(DataFormat dataFormat, Map readOptions, String path) { + return SparkEngine.getInstance().getSparkSession() + .read() + .format(dataFormat.toString()) + .options(readOptions) + .load(SparkEngine.sparkPath(path)); + } + + /** + * Build the option maps to write the dataset to the JDBC sink. URL, username and password are taken from the + * storage connector. + * They can however be overwritten by the user if they pass a option map. For instance if they want to change the + * @param providedWriteOptions + * @param featureGroup + * @param storageConnector + * @return + * @throws FeatureStoreException + */ + public Map getOnlineOptions(Map providedWriteOptions, + FeatureGroup featureGroup, + StorageConnector storageConnector) throws FeatureStoreException { + Map writeOptions = storageConnector.getSparkOptions(); + writeOptions.put(Constants.JDBC_TABLE, utils.getFgName(featureGroup)); + + // add user provided configuration + if (providedWriteOptions != null) { + writeOptions.putAll(providedWriteOptions); + } + + return writeOptions; + } + + /** + * Write dataset on the JDBC sink + * @param dataset + * @param saveMode + * @param writeOptions + * @throws FeatureStoreException + */ + public void writeOnlineDataframe(Dataset dataset, SaveMode saveMode, Map writeOptions) { + dataset + .write() + .format(Constants.JDBC_FORMAT) + .options(writeOptions) + .mode(saveMode) + .save(); + } + + + public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset dataset, + SaveMode saveMode, Map writeOptions) { + dataset + .write() + .format(Constants.HIVE_FORMAT) + .mode(saveMode) + // write options cannot be null + .options(writeOptions == null ? new HashMap<>() : writeOptions) + .partitionBy(utils.getPartitionColumns(featureGroup)) + .saveAsTable(utils.getTableName(featureGroup)); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/TrainingDatasetEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/TrainingDatasetEngine.java index 9095225150..6ba55a3f7e 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/TrainingDatasetEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/TrainingDatasetEngine.java @@ -15,23 +15,16 @@ */ package com.logicalclocks.hsfs.engine; -import com.google.common.base.Strings; -import com.logicalclocks.hsfs.DataFormat; import com.logicalclocks.hsfs.FeatureStoreException; -import com.logicalclocks.hsfs.Split; import com.logicalclocks.hsfs.TrainingDataset; import com.logicalclocks.hsfs.metadata.TrainingDatasetApi; -import com.logicalclocks.hsfs.util.Constants; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import java.io.IOException; import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class TrainingDatasetEngine { @@ -62,9 +55,9 @@ public void save(TrainingDataset trainingDataset, Dataset dataset, // Build write options map Map writeOptions = - getWriteOptions(userWriteOptions, trainingDataset.getDataFormat()); + SparkEngine.getInstance().getWriteOptions(userWriteOptions, trainingDataset.getDataFormat()); - write(trainingDataset, dataset, writeOptions, SaveMode.Overwrite); + SparkEngine.getInstance().write(trainingDataset, dataset, writeOptions, SaveMode.Overwrite); } /** @@ -82,53 +75,9 @@ public void insert(TrainingDataset trainingDataset, Dataset dataset, utils.schemaMatches(dataset, trainingDataset.getFeatures()); Map writeOptions = - getWriteOptions(providedOptions, trainingDataset.getDataFormat()); + SparkEngine.getInstance().getWriteOptions(providedOptions, trainingDataset.getDataFormat()); - write(trainingDataset, dataset, writeOptions, saveMode); - } - - /** - * Setup Spark to write the data on the File System - * @param trainingDataset - * @param dataset - * @param writeOptions - * @param saveMode - */ - private void write(TrainingDataset trainingDataset, Dataset dataset, - Map writeOptions, SaveMode saveMode) { - - if (trainingDataset.getStorageConnector() != null) { - SparkEngine.getInstance().configureConnector(trainingDataset.getStorageConnector()); - } - - if (trainingDataset.getSplits() == null) { - // Write a single dataset - - // The actual data will be stored in training_ds_version/training_ds the double directory is needed - // for cases such as tfrecords in which we need to store also the schema - // also in case of multiple splits, the single splits will be stored inside the training dataset dir - String path = Paths.get(trainingDataset.getLocation(), trainingDataset.getName()).toString(); - - writeSingle(dataset, trainingDataset.getDataFormat(), - writeOptions, saveMode, path); - } else { - List splitFactors = trainingDataset.getSplits().stream() - .map(Split::getPercentage) - .collect(Collectors.toList()); - - // The actual data will be stored in training_ds_version/split_name - Dataset[] datasetSplits = null; - if (trainingDataset.getSeed() != null) { - datasetSplits = dataset.randomSplit( - splitFactors.stream().mapToDouble(Float::doubleValue).toArray(), trainingDataset.getSeed()); - } else { - datasetSplits = dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray()); - } - - writeSplits(datasetSplits, - trainingDataset.getDataFormat(), writeOptions, saveMode, - trainingDataset.getLocation(), trainingDataset.getSplits()); - } + SparkEngine.getInstance().write(trainingDataset, dataset, writeOptions, saveMode); } public Dataset read(TrainingDataset trainingDataset, String split, Map providedOptions) { @@ -137,7 +86,7 @@ public Dataset read(TrainingDataset trainingDataset, String split, Map read(TrainingDataset trainingDataset, String split, Map readOptions = getReadOptions(providedOptions, trainingDataset.getDataFormat()); - return read(trainingDataset.getDataFormat(), readOptions, path); + Map readOptions = + SparkEngine.getInstance().getReadOptions(providedOptions, trainingDataset.getDataFormat()); + return SparkEngine.getInstance().read(trainingDataset.getDataFormat(), readOptions, path); } - private Map getWriteOptions(Map providedOptions, DataFormat dataFormat) { - Map writeOptions = new HashMap<>(); - switch (dataFormat) { - case CSV: - writeOptions.put(Constants.HEADER, "true"); - writeOptions.put(Constants.DELIMITER, ","); - break; - case TSV: - writeOptions.put(Constants.HEADER, "true"); - writeOptions.put(Constants.DELIMITER, "\t"); - break; - case TFRECORDS: - writeOptions.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example"); - } - - if (providedOptions != null && !providedOptions.isEmpty()) { - writeOptions.putAll(providedOptions); - } - - return writeOptions; - } - - private Map getReadOptions(Map providedOptions, DataFormat dataFormat) { - Map readOptions = new HashMap<>(); - switch (dataFormat) { - case CSV: - readOptions.put(Constants.HEADER, "true"); - readOptions.put(Constants.DELIMITER, ","); - readOptions.put(Constants.INFER_SCHEMA, "true"); - break; - case TSV: - readOptions.put(Constants.HEADER, "true"); - readOptions.put(Constants.DELIMITER, "\t"); - readOptions.put(Constants.INFER_SCHEMA, "true"); - break; - case TFRECORDS: - readOptions.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example"); - } - - if (providedOptions != null && !providedOptions.isEmpty()) { - readOptions.putAll(providedOptions); - } - - return readOptions; - } - - /** - * Write multiple training dataset splits and name them. - * @param datasets - * @param dataFormat - * @param writeOptions - * @param saveMode - * @param basePath - * @param splits - */ - private void writeSplits(Dataset[] datasets, DataFormat dataFormat, Map writeOptions, - SaveMode saveMode, String basePath, List splits) { - for (int i=0; i < datasets.length; i++) { - writeSingle(datasets[i], dataFormat, writeOptions, saveMode, - Paths.get(basePath, splits.get(i).getName()).toString()); - } - } - - /** - * Write a single dataset split - * @param dataset - * @param dataFormat - * @param writeOptions - * @param saveMode - * @param path: it should be the full path - */ - private void writeSingle(Dataset dataset, DataFormat dataFormat, - Map writeOptions, SaveMode saveMode, String path) { - dataset - .write() - .format(dataFormat.toString()) - .options(writeOptions) - .mode(saveMode) - .save(SparkEngine.sparkPath(path)); - } - - private Dataset read(DataFormat dataFormat, Map readOptions, String path) { - return SparkEngine.getInstance().getSparkSession() - .read() - .format(dataFormat.toString()) - .options(readOptions) - .load(SparkEngine.sparkPath(path)); - } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/Utils.java b/java/src/main/java/com/logicalclocks/hsfs/engine/Utils.java index cdfea5411a..c5d792e4b5 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/Utils.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/Utils.java @@ -17,25 +17,34 @@ import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.FeatureGroup; +import com.logicalclocks.hsfs.StorageConnector; +import io.hops.common.Pair; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.parser.CatalystSqlParser; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import scala.collection.JavaConverters; +import scala.collection.Seq; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public class Utils { // TODO(Fabio): make sure we keep save the feature store/feature group for serving - public List parseSchema(Dataset dataset) { - return Arrays.stream(dataset.schema().fields()) - // TODO(Fabio): unit test this one for complext types - .map(f -> new Feature(f.name(), f.dataType().catalogString())) - .collect(Collectors.toList()); + public List parseSchema(Dataset dataset) throws FeatureStoreException { + List features = new ArrayList<>(); + for (StructField structField : dataset.schema().fields()) { + // TODO(Fabio): unit test this one for complext types + features.add(new Feature(structField.name(), structField.dataType().catalogString(), + structField.dataType().catalogString(), false, false)); + } + + return features; } // TODO(Fabio): keep into account the sorting - needs fixing in Hopsworks as well @@ -51,4 +60,23 @@ public void schemaMatches(Dataset dataset, List features) throws F " does not match the training dataset schema: " + tdStructType); } } + + // TODO(Fabio): this should be moved in the backend + public String getTableName(FeatureGroup offlineFeatureGroup) { + return offlineFeatureGroup.getFeatureStore().getName() + "." + + offlineFeatureGroup.getName() + "_" + offlineFeatureGroup.getVersion(); + } + + public Seq getPartitionColumns(FeatureGroup offlineFeatureGroup) { + List jPartitionCols = offlineFeatureGroup.getFeatures().stream() + .filter(Feature::getPartition) + .map(Feature::getName) + .collect(Collectors.toList()); + + return JavaConverters.asScalaIteratorConverter(jPartitionCols.iterator()).asScala().toSeq(); + } + + public String getFgName(FeatureGroup featureGroup) { + return featureGroup.getName() + "_" + featureGroup.getVersion(); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/AuthorizationHandler.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/AuthorizationHandler.java index 6f39c861fa..dd13fdf727 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/AuthorizationHandler.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/AuthorizationHandler.java @@ -28,8 +28,10 @@ public class AuthorizationHandler implements ResponseHandler { private ResponseHandler originalResponseHandler; - AuthorizationHandler(ResponseHandler originalResponseHandler) { - this.originalResponseHandler = originalResponseHandler; + AuthorizationHandler(ResponseHandler originalResponseHandler) { this.originalResponseHandler = originalResponseHandler; + } + + AuthorizationHandler() { } @Override @@ -40,10 +42,13 @@ public T handleResponse(HttpResponse response) throws ClientProtocolException, I throw new IOException("Error: " + response.getStatusLine().getStatusCode() + EntityUtils.toString(response.getEntity(), Charset.defaultCharset())); } else if (response.getStatusLine().getStatusCode() / 100 == 5) { - // TODO(fabio): Propagate http error upstream - throw new InternalException(); + throw new InternalException("Error: " + response.getStatusLine().getStatusCode() + + EntityUtils.toString(response.getEntity(), Charset.defaultCharset())); } - return originalResponseHandler.handleResponse(response); + if (originalResponseHandler != null) { + return originalResponseHandler.handleResponse(response); + } + return null; } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java index 7208a90833..d86e277b9f 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java @@ -19,24 +19,35 @@ import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureStore; import com.logicalclocks.hsfs.FeatureStoreException; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.logicalclocks.hsfs.metadata.HopsworksClient.*; public class FeatureGroupApi { - public static final String FEATURE_GROUP_PATH = "/featuregroups{/fgName}{?version}"; + public static final String FEATURE_GROUP_ROOT_PATH = "/featuregroups"; + public static final String FEATURE_GROUP_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgName}{?version}"; + public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}"; + public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear"; + public static final String FEATURE_GROUP_TAGS_PATH = FEATURE_GROUP_ID_PATH + "/tags{/name}{?value}"; private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class); - public FeatureGroupApi() throws FeatureStoreException { } - public FeatureGroup get(FeatureStore featureStore, String fgName, Integer fgVersion) throws IOException, FeatureStoreException { - HopsworksClient hopsworksClient = HopsworksClient.getInstance(); - String pathTemplate = HopsworksClient.PROJECT_PATH + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH + FEATURE_GROUP_PATH; @@ -48,12 +59,148 @@ public FeatureGroup get(FeatureStore featureStore, String fgName, Integer fgVers .expand(); LOGGER.info("Sending metadata request: " + uri); - FeatureGroup[] featureGroups = hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroup[].class); + FeatureGroup[] offlineFeatureGroups = hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroup[].class); // There can be only one single feature group with a specific name and version in a feature store // There has to be one otherwise an exception would have been thrown. - FeatureGroup resultFg = featureGroups[0]; + FeatureGroup resultFg = offlineFeatureGroups[0]; resultFg.setFeatureStore(featureStore); return resultFg; } + + public FeatureGroup save(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_ROOT_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .expand(); + + String featureGroupJson = hopsworksClient.getObjectMapper().writeValueAsString(featureGroup); + HttpPost postRequest = new HttpPost(uri); + postRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + postRequest.setEntity(new StringEntity(featureGroupJson)); + + LOGGER.info("Sending metadata request: " + uri); + LOGGER.info(featureGroupJson); + + return hopsworksClient.handleRequest(postRequest, FeatureGroup.class); + } + + public void delete(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_ID_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .expand(); + + HttpDelete deleteRequest = new HttpDelete(uri); + + LOGGER.info("Sending metadata request: " + uri); + hopsworksClient.handleRequest(deleteRequest); + } + + public void deleteContent(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_CLEAR_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + HttpPost postRequest = new HttpPost(uri); + hopsworksClient.handleRequest(postRequest); + } + + public void addTag(FeatureGroup featureGroup, String name, String value) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_TAGS_PATH; + + UriTemplate uriTemplate = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .set("name", name); + + if (value != null) { + uriTemplate.set("value", value); + } + + LOGGER.info("Sending metadata request: " + uriTemplate.expand()); + HttpPut putRequest = new HttpPut(uriTemplate.expand()); + hopsworksClient.handleRequest(putRequest); + } + + public String getTag(FeatureGroup featureGroup, String name) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_TAGS_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .set("name", name) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + HttpGet getRequest = new HttpGet(uri); + Tags tags = hopsworksClient.handleRequest(getRequest, Tags.class); + + return tags.getItems().get(0).getValue(); + } + + public Map getTags(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_TAGS_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + HttpGet getRequest = new HttpGet(uri); + Tags tags = hopsworksClient.handleRequest(getRequest, Tags.class); + + return tags.getItems().stream() + .collect(Collectors.toMap(Tags::getName, Tags::getValue)); + } + + public void deleteTag(FeatureGroup featureGroup, String name) throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_TAGS_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .set("name", name) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + HttpDelete httpDelete = new HttpDelete(uri); + hopsworksClient.handleRequest(httpDelete); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksClient.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksClient.java index fd285c99f2..c5e858b91a 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksClient.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksClient.java @@ -146,6 +146,10 @@ public T handleRequest(HttpRequest request, Class cls) throws IOException return hopsworksHttpClient.handleRequest(request, new BaseHandler<>(cls, objectMapper)); } + public T handleRequest(HttpRequest request) throws IOException, FeatureStoreException { + return hopsworksHttpClient.handleRequest(request, null); + } + public void downloadCredentials(Project project, String certPath) throws IOException, FeatureStoreException { certPwd = hopsworksHttpClient.downloadCredentials(project, certPath); } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksExternalClient.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksExternalClient.java index 6401e25469..4552195dfb 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksExternalClient.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksExternalClient.java @@ -193,7 +193,7 @@ public T handleRequest(HttpRequest request, ResponseHandler responseHandl return httpClient.execute(httpHost, request, authHandler); } catch (InternalException e) { // Internal exception, try one more time - return httpClient.execute(httpHost, request, responseHandler); + return httpClient.execute(httpHost, request, authHandler); } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksInternalClient.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksInternalClient.java index 7f767ced23..4350f27890 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksInternalClient.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksInternalClient.java @@ -150,10 +150,10 @@ public T handleRequest(HttpRequest request, ResponseHandler responseHandl // re-read the jwt and try one more time refreshJWT(); request.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + jwt); - return httpClient.execute(httpHost, request, responseHandler); + return httpClient.execute(httpHost, request, authHandler); } catch (InternalException e) { // Internal exception, try one more time - return httpClient.execute(httpHost, request, responseHandler); + return httpClient.execute(httpHost, request, authHandler); } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/InternalException.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/InternalException.java index 99687d557b..927470316d 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/InternalException.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/InternalException.java @@ -18,4 +18,7 @@ import org.apache.http.client.ClientProtocolException; public class InternalException extends ClientProtocolException { + public InternalException(String msg) { + super(msg); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/Query.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/Query.java index a32191d340..c47ca77ac6 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/Query.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/Query.java @@ -42,12 +42,14 @@ public class Query { private List joins = new ArrayList<>(); private QueryConstructorApi queryConstructorApi; + private StorageConnectorApi storageConnectorApi; public Query(FeatureGroup leftFeatureGroup, List leftFeatures) { this.leftFeatureGroup = leftFeatureGroup; this.leftFeatures = leftFeatures; this.queryConstructorApi = new QueryConstructorApi(); + this.storageConnectorApi = new StorageConnectorApi(); } public Query join(Query subquery) { @@ -98,23 +100,42 @@ public Query joinFeatures(Query subquery, List leftOn, List ri } public Dataset read() throws FeatureStoreException, IOException { - String sqlQuery = - queryConstructorApi.constructQuery(leftFeatureGroup.getFeatureStore(), this); - LOGGER.info("Executing query: " + sqlQuery); - return SparkEngine.getInstance().sql(sqlQuery); + return read(Storage.OFFLINE); } public void show(int numRows) throws FeatureStoreException, IOException { + show(Storage.OFFLINE, numRows); + } + + public Dataset read(Storage storage) throws FeatureStoreException, IOException { String sqlQuery = - queryConstructorApi.constructQuery(leftFeatureGroup.getFeatureStore(), this); + queryConstructorApi.constructQuery(leftFeatureGroup.getFeatureStore(), this).getStorageQuery(storage); LOGGER.info("Executing query: " + sqlQuery); - SparkEngine.getInstance().sql(sqlQuery).show(numRows); + switch (storage) { + case OFFLINE: + return SparkEngine.getInstance().sql(sqlQuery); + case ONLINE: + StorageConnector onlineConnector + = storageConnectorApi.getOnlineStorageConnector(leftFeatureGroup.getFeatureStore()); + return SparkEngine.getInstance().jdbc(onlineConnector, sqlQuery); + default: + throw new FeatureStoreException("Storage not supported"); + } + } + + public void show(Storage storage, int numRows) throws FeatureStoreException, IOException { + read(storage).show(numRows); } - @Override public String toString() { + return toString(Storage.OFFLINE); + } + + public String toString(Storage storage) { try { - return queryConstructorApi.constructQuery(leftFeatureGroup.getFeatureStore(), this); + return queryConstructorApi + .constructQuery(leftFeatureGroup.getFeatureStore(), this) + .getStorageQuery(storage); } catch (FeatureStoreException | IOException e) { return e.getMessage(); } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/QueryConstructorApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/QueryConstructorApi.java index 4797724a40..9b6da4d66a 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/QueryConstructorApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/QueryConstructorApi.java @@ -33,7 +33,7 @@ public class QueryConstructorApi { private static final Logger LOGGER = LoggerFactory.getLogger(QueryConstructorApi.class); - public String constructQuery(FeatureStore featureStore, Query query) throws FeatureStoreException, IOException { + public FsQuery constructQuery(FeatureStore featureStore, Query query) throws FeatureStoreException, IOException { HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = HopsworksClient.PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_SERVICE_PATH + @@ -51,6 +51,7 @@ public String constructQuery(FeatureStore featureStore, Query query) throws Feat LOGGER.info("Sending metadata request: " + uri); LOGGER.info("Sending query: " + queryJson); FsQuery fsQuery = hopsworksClient.handleRequest(putRequest, FsQuery.class); - return fsQuery.getQuery().replace("\n", " "); + fsQuery.removeNewLines(); + return fsQuery; } } \ No newline at end of file diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/RestDTO.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/RestDTO.java new file mode 100644 index 0000000000..795f902b9a --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/RestDTO.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 Logical Clocks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.logicalclocks.hsfs.metadata; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.ArrayList; +import java.util.List; + +@JsonIgnoreProperties(ignoreUnknown = true) +public abstract class RestDTO { + + protected List items = new ArrayList<>(); + + public RestDTO() { + } + + public List getItems() { + return items; + } + + public void setItems(List items) { + this.items = items; + } +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/StorageConnectorApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/StorageConnectorApi.java similarity index 63% rename from java/src/main/java/com/logicalclocks/hsfs/StorageConnectorApi.java rename to java/src/main/java/com/logicalclocks/hsfs/metadata/StorageConnectorApi.java index 1ab395c8a7..5db851c642 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/StorageConnectorApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/StorageConnectorApi.java @@ -13,11 +13,13 @@ * * See the License for the specific language governing permissions and limitations under the License. */ -package com.logicalclocks.hsfs; +package com.logicalclocks.hsfs.metadata; import com.damnhandy.uri.template.UriTemplate; -import com.logicalclocks.hsfs.metadata.FeatureStoreApi; -import com.logicalclocks.hsfs.metadata.HopsworksClient; +import com.logicalclocks.hsfs.FeatureStore; +import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.StorageConnector; +import com.logicalclocks.hsfs.StorageConnectorType; import org.apache.http.client.methods.HttpGet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +29,10 @@ public class StorageConnectorApi { - private static final String CONNECTOR_PATH = "/storageconnectors{/connType}"; + private static final String CONNECTOR_PATH = "/storageconnectors"; + private static final String CONNECTOR_TYPE_PATH = CONNECTOR_PATH + "{/connType}"; + private static final String ONLINE_CONNECTOR_PATH = CONNECTOR_PATH + "/onlinefeaturestore"; + private static final Logger LOGGER = LoggerFactory.getLogger(StorageConnectorApi.class); public StorageConnector getByNameAndType(FeatureStore featureStore, String name, StorageConnectorType type) @@ -35,7 +40,7 @@ public StorageConnector getByNameAndType(FeatureStore featureStore, String name, HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = HopsworksClient.PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH - + CONNECTOR_PATH; + + CONNECTOR_TYPE_PATH; String uri = UriTemplate.fromTemplate(pathTemplate) .set("projectId", featureStore.getProjectId()) @@ -51,4 +56,20 @@ public StorageConnector getByNameAndType(FeatureStore featureStore, String name, .orElseThrow(() -> new FeatureStoreException("Could not find storage connector " + name + " with type " + type)); } + public StorageConnector getOnlineStorageConnector(FeatureStore featureStore) + throws IOException, FeatureStoreException { + HopsworksClient hopsworksClient = HopsworksClient.getInstance(); + String pathTemplate = HopsworksClient.PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + ONLINE_CONNECTOR_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureStore.getProjectId()) + .set("fsId", featureStore.getId()) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + return hopsworksClient.handleRequest(new HttpGet(uri), StorageConnector.class); + } + } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/Tags.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/Tags.java new file mode 100644 index 0000000000..461aca06b6 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/Tags.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 Logical Clocks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.logicalclocks.hsfs.metadata; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class Tags extends RestDTO { + private String name; + private String value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java b/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java index 0fe2f62394..dd7602e792 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java +++ b/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java @@ -22,10 +22,17 @@ public class Constants { public static final String FEATURESTORE_SUFFIX = "_featurestore"; + public static final String HIVE_FORMAT = "hive"; + public static final String JDBC_FORMAT = "jdbc"; + // Spark options public static final String DELIMITER = "delimiter"; public static final String HEADER = "header"; public static final String INFER_SCHEMA = "inferSchema"; + public static final String JDBC_USER = "user"; + public static final String JDBC_PWD = "password"; + public static final String JDBC_URL = "url"; + public static final String JDBC_TABLE = "dbtable"; public static final String TF_CONNECTOR_RECORD_TYPE = "recordType";