Skip to content

Commit

Permalink
[FSTORE-331] BigQuery connector doesn't work if the application is ex… (
Browse files Browse the repository at this point in the history
logicalclocks#809)

* [FSTORE-331] BigQuery connector doesn't work if the application is executed on multiple nodes

* Change the BigQuery configuration so that it uses the credential field
  (base64 encoding of the content of the credential file)
  instead of providing the path in the credentialFile property.

* For Java, updated the tests to use JUnit5 to allow for the @tmpdir
  annotation

* Check if options is null before checking if it's empty
  and try again.
  • Loading branch information
SirOibaf committed Oct 3, 2022
1 parent c6e14a5 commit 186d2a4
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 149 deletions.
61 changes: 46 additions & 15 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<scala.version>2.12.10</scala.version>
<scala-short.version>2.12</scala-short.version>
<dbutils.version>0.0.5</dbutils.version>
<json.version>20190722</json.version>
<hoverfly.version>0.12.2</hoverfly.version>
<junit.version>5.9.1</junit.version>
<surefire-plugin.version>2.22.0</surefire-plugin.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -200,20 +204,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.specto</groupId>
<artifactId>hoverfly-java</artifactId>
<version>0.12.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle -->
<dependency>
<groupId>io.hops.hudi</groupId>
Expand Down Expand Up @@ -252,7 +242,35 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20190722</version>
<version>${json.version}</version>
</dependency>

<dependency>
<groupId>io.specto</groupId>
<artifactId>hoverfly-java</artifactId>
<version>${hoverfly.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala-short.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down Expand Up @@ -328,7 +346,20 @@
</sourceDirectories>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemPropertiesFile>src/test/resources/system.properties</systemPropertiesFile>
</configuration>
</plugin>
</plugins>
<testResources>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
</build>

<repositories>
Expand Down
18 changes: 13 additions & 5 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@

import javax.ws.rs.NotSupportedException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,7 +89,7 @@ public StorageConnector refetch() throws FeatureStoreException, IOException {
@JsonIgnore
public abstract String getPath(String subPath) throws FeatureStoreException;

public abstract Map<String, String> sparkOptions();
public abstract Map<String, String> sparkOptions() throws IOException;

public static class HopsFsConnector extends StorageConnector {

Expand Down Expand Up @@ -478,7 +481,7 @@ public Object read(String query, String dataFormat, Map<String, String> options,
}

public Object readStream(String topic, boolean topicPattern, String messageFormat, String schema,
Map<String, String> options, boolean includeMetadata) throws FeatureStoreException {
Map<String, String> options, boolean includeMetadata) throws FeatureStoreException, IOException {
if (!Arrays.asList("avro", "json", null).contains(messageFormat.toLowerCase())) {
throw new IllegalArgumentException("Can only read JSON and AVRO encoded records from Kafka.");
}
Expand Down Expand Up @@ -552,9 +555,14 @@ public static class BigqueryConnector extends StorageConnector {
* Set spark options specific to BigQuery.
* @return Map
*/
public Map<String, String> sparkOptions() {
public Map<String, String> sparkOptions() throws IOException {
Map<String, String> options = new HashMap<>();
options.put(Constants.BIGQ_CREDENTIALS_FILE, SparkEngine.getInstance().addFile(keyPath));

// Base64 encode the credentials file
String localKeyPath = SparkEngine.getInstance().addFile(keyPath);
byte[] fileContent = Files.readAllBytes(Paths.get(localKeyPath));
options.put(Constants.BIGQ_CREDENTIALS, Base64.getEncoder().encodeToString(fileContent));

options.put(Constants.BIGQ_PARENT_PROJECT, parentProject);
if (!Strings.isNullOrEmpty(materializationDataset)) {
options.put(Constants.BIGQ_MATERIAL_DATASET, materializationDataset);
Expand Down Expand Up @@ -592,7 +600,7 @@ public Object read(String query, String dataFormat, Map<String, String> options,

Map<String, String> readOptions = sparkOptions();
// merge user spark options on top of default spark options
if (!options.isEmpty()) {
if (options != null && !options.isEmpty()) {
readOptions.putAll(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ public void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map<String,
writeOptions = utils.getKafkaConfig(streamFeatureGroup, writeOptions);
hudiEngine.streamToHoodieTable(sparkSession, streamFeatureGroup, writeOptions);
}

public <S> List<Feature> parseFeatureGroupSchema(S datasetGeneric,
TimeTravelFormat timeTravelFormat) throws FeatureStoreException {
List<Feature> features = new ArrayList<>();
Expand Down Expand Up @@ -656,12 +656,12 @@ public <S> List<Feature> parseFeatureGroupSchema(S datasetGeneric,
throw new FeatureStoreException("Feature '" + structField.name().toLowerCase() + "': "
+ "spark type " + structField.dataType().catalogString() + " not supported.");
}

Feature f = new Feature(structField.name().toLowerCase(), featureType, false, false);
if (structField.metadata().contains("description")) {
f.setDescription(structField.metadata().getString("description"));
}

features.add(f);
}

Expand All @@ -675,13 +675,17 @@ public <S> S sanitizeFeatureNames(S datasetGeneric) {
}

public String addFile(String filePath) {
sparkSession.sparkContext().addFile("hdfs://" + filePath);
// this is used for unit testing
if (!filePath.startsWith("file://")) {
filePath = "hdfs://" + filePath;
}
sparkSession.sparkContext().addFile(filePath);
return SparkFiles.get((new Path(filePath)).getName());
}

public Dataset<Row> readStream(StorageConnector storageConnector, String dataFormat, String messageFormat,
String schema, Map<String, String> options, boolean includeMetadata)
throws FeatureStoreException {
throws FeatureStoreException, IOException {
DataStreamReader stream = sparkSession.readStream().format(dataFormat);

// set user options last so that they overwrite any default options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class Constants {
public static final String PROPERTY_GCS_ACCOUNT_ENABLE = "google.cloud.auth.service.account.enable";
// end gcs
// bigquery constants
public static final String BIGQ_CREDENTIALS_FILE = "credentialsFile";
public static final String BIGQ_CREDENTIALS = "credentials";
public static final String BIGQ_PARENT_PROJECT = "parentProject";
public static final String BIGQ_MATERIAL_DATASET = "materializationDataset";
public static final String BIGQ_VIEWS_ENABLED = "viewsEnabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.http.HttpHost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -63,6 +63,6 @@ public void testReadAPIKeyFromFile() throws IOException, FeatureStoreException {
HopsworksExternalClient hopsworksExternalClient = new HopsworksExternalClient(
httpClient, httpHost);
String apiKey = hopsworksExternalClient.readApiKey(null, null, apiFilePath.toString());
Assert.assertEquals("hello", apiKey);
Assertions.assertEquals("hello", apiKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022 Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.logicalclocks.hsfs;

import com.logicalclocks.hsfs.util.Constants;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Map;

public class TestStorageConnector {

@Test
public void testBigQueryCredentialsBase64Encoded(@TempDir Path tempDir) throws IOException {
// Arrange
String credentials = "{\"type\": \"service_account\", \"project_id\": \"test\"}";
Path credentialsFile = tempDir.resolve("bigquery.json");
Files.write(credentialsFile, credentials.getBytes());

StorageConnector.BigqueryConnector bigqueryConnector = new StorageConnector.BigqueryConnector();
bigqueryConnector.setKeyPath("file://" + credentialsFile);

// Act
Map<String, String> sparkOptions = bigqueryConnector.sparkOptions();

// Assert
Assertions.assertEquals(credentials,
new String(Base64.getDecoder().decode(sparkOptions.get(Constants.BIGQ_CREDENTIALS)), StandardCharsets.UTF_8));
}
}
22 changes: 11 additions & 11 deletions java/src/test/java/com/logicalclocks/hsfs/metadata/TestTagsApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.logicalclocks.hsfs.EntityEndpointType;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Map;
Expand All @@ -29,7 +29,7 @@ public void testDoubleValueRead() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, 4.2d);
Assert.assertTrue(obj instanceof Double);
Assertions.assertTrue(obj instanceof Double);
}

@Test
Expand All @@ -44,41 +44,41 @@ public void testIntegerValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, 4);
Assert.assertTrue(obj instanceof Integer);
Assertions.assertTrue(obj instanceof Integer);
}

@Test
public void testStringValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "test");
Assert.assertTrue(obj instanceof String);
Assertions.assertTrue(obj instanceof String);
}

@Test
public void testMapValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "{\"key\":\"value\"}");
Assert.assertTrue(obj instanceof Map);
Assertions.assertTrue(obj instanceof Map);
}

@Test
public void testArrayValue() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "[{\"key\":\"value1\"}, {\"key\":\"value2\"}]");
Assert.assertTrue(obj.getClass().isArray());
Assertions.assertTrue(obj.getClass().isArray());
}

@Test
public void testInnerPrimitiveTypes() throws IOException {
TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP);
ObjectMapper objectMapper = new ObjectMapper();
Object obj = tagsApi.parseTagValue(objectMapper, "{\"key1\":\"value\", \"key2\":4.2, \"key3\":4}");
Assert.assertTrue(obj instanceof Map);
Assert.assertTrue(((Map) obj).get("key1") instanceof String);
Assert.assertTrue(((Map) obj).get("key2") instanceof Double);
Assert.assertTrue(((Map) obj).get("key3") instanceof Integer);
Assertions.assertTrue(obj instanceof Map);
Assertions.assertTrue(((Map) obj).get("key1") instanceof String);
Assertions.assertTrue(((Map) obj).get("key2") instanceof Double);
Assertions.assertTrue(((Map) obj).get("key3") instanceof Integer);
}
}
1 change: 1 addition & 0 deletions java/src/test/resources/system.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
spark.master=local[1]
6 changes: 3 additions & 3 deletions python/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
exclude: setup.py
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 22.8.0
hooks:
- id: black
language_version: python3
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
rev: 3.9.2
hooks:
- id: flake8
language_version: python3
args: [--config=python/.flake8]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
8 changes: 8 additions & 0 deletions python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def get_instance():
raise Exception("Couldn't find execution engine. Try reconnecting to Hopsworks.")


# Used for testing
def set_instance(engine_type, engine):
global _engine_type
global _engine
_engine_type = engine_type
_engine = engine


def get_type():
global _engine_type
if _engine_type:
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,11 @@ def _read_stream_kafka(self, stream, message_format, schema, include_metadata):
return stream.load().select("key", "value")

def add_file(self, file):
self._spark_context.addFile("hdfs://" + file)
# This is used for unit testing
if not file.startswith("file://"):
file = "hdfs://" + file

self._spark_context.addFile(file)
return SparkFiles.get(os.path.basename(file))

def profile(
Expand Down
Loading

0 comments on commit 186d2a4

Please sign in to comment.