Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-331] BigQuery connector doesn't work if the application is ex… #809

Merged
merged 4 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<scala.version>2.12.10</scala.version>
<scala-short.version>2.12</scala-short.version>
<dbutils.version>0.0.5</dbutils.version>
<json.version>20190722</json.version>
<hoverfly.version>0.12.2</hoverfly.version>
<junit.version>5.9.1</junit.version>
<surefire-plugin.version>2.22.0</surefire-plugin.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -200,20 +204,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.specto</groupId>
<artifactId>hoverfly-java</artifactId>
<version>0.12.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle -->
<dependency>
<groupId>io.hops.hudi</groupId>
Expand Down Expand Up @@ -252,7 +242,35 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20190722</version>
<version>${json.version}</version>
</dependency>

<dependency>
<groupId>io.specto</groupId>
<artifactId>hoverfly-java</artifactId>
<version>${hoverfly.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala-short.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down Expand Up @@ -328,7 +346,20 @@
</sourceDirectories>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemPropertiesFile>src/test/resources/system.properties</systemPropertiesFile>
</configuration>
</plugin>
</plugins>
<testResources>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
</build>

<repositories>
Expand Down
18 changes: 13 additions & 5 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@

import javax.ws.rs.NotSupportedException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,7 +89,7 @@ public StorageConnector refetch() throws FeatureStoreException, IOException {
@JsonIgnore
public abstract String getPath(String subPath) throws FeatureStoreException;

public abstract Map<String, String> sparkOptions();
public abstract Map<String, String> sparkOptions() throws IOException;

public static class HopsFsConnector extends StorageConnector {

Expand Down Expand Up @@ -478,7 +481,7 @@ public Object read(String query, String dataFormat, Map<String, String> options,
}

public Object readStream(String topic, boolean topicPattern, String messageFormat, String schema,
Map<String, String> options, boolean includeMetadata) throws FeatureStoreException {
Map<String, String> options, boolean includeMetadata) throws FeatureStoreException, IOException {
if (!Arrays.asList("avro", "json", null).contains(messageFormat.toLowerCase())) {
throw new IllegalArgumentException("Can only read JSON and AVRO encoded records from Kafka.");
}
Expand Down Expand Up @@ -552,9 +555,14 @@ public static class BigqueryConnector extends StorageConnector {
* Set spark options specific to BigQuery.
* @return Map
*/
public Map<String, String> sparkOptions() {
public Map<String, String> sparkOptions() throws IOException {
Map<String, String> options = new HashMap<>();
options.put(Constants.BIGQ_CREDENTIALS_FILE, SparkEngine.getInstance().addFile(keyPath));

// Base64 encode the credentials file
String localKeyPath = SparkEngine.getInstance().addFile(keyPath);
byte[] fileContent = Files.readAllBytes(Paths.get(localKeyPath));
options.put(Constants.BIGQ_CREDENTIALS, Base64.getEncoder().encodeToString(fileContent));

options.put(Constants.BIGQ_PARENT_PROJECT, parentProject);
if (!Strings.isNullOrEmpty(materializationDataset)) {
options.put(Constants.BIGQ_MATERIAL_DATASET, materializationDataset);
Expand Down Expand Up @@ -592,7 +600,7 @@ public Object read(String query, String dataFormat, Map<String, String> options,

Map<String, String> readOptions = sparkOptions();
// merge user spark options on top of default spark options
if (!options.isEmpty()) {
if (options != null && !options.isEmpty()) {
readOptions.putAll(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,13 +746,17 @@ public <S> S sanitizeFeatureNames(S datasetGeneric) {
}

public String addFile(String filePath) {
sparkSession.sparkContext().addFile("hdfs://" + filePath);
// this is used for unit testing
if (!filePath.startsWith("file://")) {
filePath = "hdfs://" + filePath;
}
sparkSession.sparkContext().addFile(filePath);
return SparkFiles.get((new Path(filePath)).getName());
}

public Dataset<Row> readStream(StorageConnector storageConnector, String dataFormat, String messageFormat,
String schema, Map<String, String> options, boolean includeMetadata)
throws FeatureStoreException {
throws FeatureStoreException, IOException {
DataStreamReader stream = sparkSession.readStream().format(dataFormat);

// set user options last so that they overwrite any default options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class Constants {
public static final String PROPERTY_GCS_ACCOUNT_ENABLE = "google.cloud.auth.service.account.enable";
// end gcs
// bigquery constants
public static final String BIGQ_CREDENTIALS_FILE = "credentialsFile";
public static final String BIGQ_CREDENTIALS = "credentials";
public static final String BIGQ_PARENT_PROJECT = "parentProject";
public static final String BIGQ_MATERIAL_DATASET = "materializationDataset";
public static final String BIGQ_VIEWS_ENABLED = "viewsEnabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.http.HttpHost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -63,6 +63,6 @@ public void testReadAPIKeyFromFile() throws IOException, FeatureStoreException {
HopsworksExternalClient hopsworksExternalClient = new HopsworksExternalClient(
httpClient, httpHost);
String apiKey = hopsworksExternalClient.readApiKey(null, null, apiFilePath.toString());
Assert.assertEquals("hello", apiKey);
Assertions.assertEquals("hello", apiKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022 Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.logicalclocks.hsfs;

import com.logicalclocks.hsfs.util.Constants;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Map;

public class TestStorageConnector {

@Test
public void testBigQueryCredentialsBase64Encoded(@TempDir Path tempDir) throws IOException {
// Arrange
String credentials = "{\"type\": \"service_account\", \"project_id\": \"test\"}";
Path credentialsFile = tempDir.resolve("bigquery.json");
Files.write(credentialsFile, credentials.getBytes());

StorageConnector.BigqueryConnector bigqueryConnector = new StorageConnector.BigqueryConnector();
bigqueryConnector.setKeyPath("file://" + credentialsFile);

// Act
Map<String, String> sparkOptions = bigqueryConnector.sparkOptions();

// Assert
Assertions.assertEquals(credentials,
new String(Base64.getDecoder().decode(sparkOptions.get(Constants.BIGQ_CREDENTIALS)), StandardCharsets.UTF_8));
}
}
22 changes: 11 additions & 11 deletions java/src/test/java/com/logicalclocks/hsfs/metadata/TestTagsApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.logicalclocks.hsfs.EntityEndpointType;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Map;
Expand All @@ -29,7 +29,7 @@ public void testDoubleValueRead() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, 4.2d);
Assert.assertTrue(obj instanceof Double);
Assertions.assertTrue(obj instanceof Double);
}

@Test
Expand All @@ -44,41 +44,41 @@ public void testIntegerValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, 4);
Assert.assertTrue(obj instanceof Integer);
Assertions.assertTrue(obj instanceof Integer);
}

@Test
public void testStringValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "test");
Assert.assertTrue(obj instanceof String);
Assertions.assertTrue(obj instanceof String);
}

@Test
public void testMapValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "{\"key\":\"value\"}");
Assert.assertTrue(obj instanceof Map);
Assertions.assertTrue(obj instanceof Map);
}

@Test
public void testArrayValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "[{\"key\":\"value1\"}, {\"key\":\"value2\"}]");
Assert.assertTrue(obj.getClass().isArray());
Assertions.assertTrue(obj.getClass().isArray());
}

@Test
public void testInnerPrimitiveTypes() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "{\"key1\":\"value\", \"key2\":4.2, \"key3\":4}");
Assert.assertTrue(obj instanceof Map);
Assert.assertTrue(((Map) obj).get("key1") instanceof String);
Assert.assertTrue(((Map) obj).get("key2") instanceof Double);
Assert.assertTrue(((Map) obj).get("key3") instanceof Integer);
Assertions.assertTrue(obj instanceof Map);
Assertions.assertTrue(((Map) obj).get("key1") instanceof String);
Assertions.assertTrue(((Map) obj).get("key2") instanceof Double);
Assertions.assertTrue(((Map) obj).get("key3") instanceof Integer);
}
}
1 change: 1 addition & 0 deletions java/src/test/resources/system.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
spark.master=local[1]
6 changes: 3 additions & 3 deletions python/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
exclude: setup.py
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 22.8.0
hooks:
- id: black
language_version: python3
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
rev: 3.9.2
hooks:
- id: flake8
language_version: python3
args: [--config=python/.flake8]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
8 changes: 8 additions & 0 deletions python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def get_instance():
raise Exception("Couldn't find execution engine. Try reconnecting to Hopsworks.")


# Used for testing
def set_instance(engine_type, engine):
global _engine_type
global _engine
_engine_type = engine_type
_engine = engine


def get_type():
global _engine_type
if _engine_type:
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,11 @@ def _read_stream_kafka(self, stream, message_format, schema, include_metadata):
return stream.load().select("key", "value")

def add_file(self, file):
self._spark_context.addFile("hdfs://" + file)
# This is used for unit testing
if not file.startswith("file://"):
file = "hdfs://" + file

self._spark_context.addFile(file)
return SparkFiles.get(os.path.basename(file))

def profile(
Expand Down
Loading