From 083b2f7184ecbd8ef973e29353a5288098669bb6 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Sun, 2 Oct 2022 01:15:12 +0200 Subject: [PATCH 1/3] [FSTORE-331] BigQuery connector doesn't work if the application is executed on multiple nodes * Change the BigQuery configuration so that it uses the credential field (base64 encoding of the content of the credential file) instead of providing the path in the credentialFile property. * For Java, updated the tests to use JUnit5 to allow for the @TmpDir annotation --- java/pom.xml | 61 ++++++++++++++----- .../logicalclocks/hsfs/StorageConnector.java | 16 +++-- .../hsfs/engine/SparkEngine.java | 8 ++- .../logicalclocks/hsfs/util/Constants.java | 2 +- .../hsfs/TestHopsworksExternalClient.java | 6 +- .../hsfs/TestStorageConnector.java | 51 ++++++++++++++++ .../hsfs/metadata/TestTagsApi.java | 22 +++---- java/src/test/resources/system.properties | 1 + python/.pre-commit-config.yaml | 6 +- python/hsfs/engine/__init__.py | 8 +++ python/hsfs/engine/spark.py | 6 +- python/hsfs/storage_connector.py | 12 +++- python/tests/test_storage_connector.py | 31 +++++++++- 13 files changed, 186 insertions(+), 44 deletions(-) create mode 100644 java/src/test/java/com/logicalclocks/hsfs/TestStorageConnector.java create mode 100644 java/src/test/resources/system.properties diff --git a/java/pom.xml b/java/pom.xml index 2b5953156a..ba86f09349 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -26,6 +26,10 @@ 2.12.10 2.12 0.0.5 + 20190722 + 0.12.2 + 5.9.1 + 2.22.0 @@ -200,20 +204,6 @@ provided - - io.specto - hoverfly-java - 0.12.2 - test - - - - junit - junit - 4.13.1 - test - - io.hops.hudi @@ -252,7 +242,35 @@ org.json json - 20190722 + ${json.version} + + + + io.specto + hoverfly-java + ${hoverfly.version} + test + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + + org.apache.spark + spark-hive_${scala-short.version} + ${spark.version} + test @@ -328,7 +346,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire-plugin.version} + + src/test/resources/system.properties + + + + + src/test/resources + + diff --git a/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index e376536fc4..cebd274d59 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -32,8 +32,11 @@ import javax.ws.rs.NotSupportedException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Instant; import java.util.Arrays; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,7 +89,7 @@ public StorageConnector refetch() throws FeatureStoreException, IOException { @JsonIgnore public abstract String getPath(String subPath) throws FeatureStoreException; - public abstract Map sparkOptions(); + public abstract Map sparkOptions() throws IOException; public static class HopsFsConnector extends StorageConnector { @@ -478,7 +481,7 @@ public Object read(String query, String dataFormat, Map options, } public Object readStream(String topic, boolean topicPattern, String messageFormat, String schema, - Map options, boolean includeMetadata) throws FeatureStoreException { + Map options, boolean includeMetadata) throws FeatureStoreException, IOException { if (!Arrays.asList("avro", "json", null).contains(messageFormat.toLowerCase())) { throw new IllegalArgumentException("Can only read JSON and AVRO encoded records from Kafka."); } @@ -552,9 +555,14 @@ public static class BigqueryConnector extends StorageConnector { * Set spark options specific to BigQuery. * @return Map */ - public Map sparkOptions() { + public Map sparkOptions() throws IOException { Map options = new HashMap<>(); - options.put(Constants.BIGQ_CREDENTIALS_FILE, SparkEngine.getInstance().addFile(keyPath)); + + // Base64 encode the credentials file + String localKeyPath = SparkEngine.getInstance().addFile(keyPath); + byte[] fileContent = Files.readAllBytes(Paths.get(localKeyPath)); + options.put(Constants.BIGQ_CREDENTIALS, Base64.getEncoder().encodeToString(fileContent)); + options.put(Constants.BIGQ_PARENT_PROJECT, parentProject); if (!Strings.isNullOrEmpty(materializationDataset)) { options.put(Constants.BIGQ_MATERIAL_DATASET, materializationDataset); diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java index 2595327748..bdff9f0025 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java @@ -746,13 +746,17 @@ public S sanitizeFeatureNames(S datasetGeneric) { } public String addFile(String filePath) { - sparkSession.sparkContext().addFile("hdfs://" + filePath); + // this is used for unit testing + if (!filePath.startsWith("file://")) { + filePath = "hdfs://" + filePath; + } + sparkSession.sparkContext().addFile(filePath); return SparkFiles.get((new Path(filePath)).getName()); } public Dataset readStream(StorageConnector storageConnector, String dataFormat, String messageFormat, String schema, Map options, boolean includeMetadata) - throws FeatureStoreException { + throws FeatureStoreException, IOException { DataStreamReader stream = sparkSession.readStream().format(dataFormat); // set user options last so that they overwrite any default options diff --git a/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java b/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java index 032ada9a22..be3d454877 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java +++ b/java/src/main/java/com/logicalclocks/hsfs/util/Constants.java @@ -87,7 +87,7 @@ public class Constants { public static final String PROPERTY_GCS_ACCOUNT_ENABLE = "google.cloud.auth.service.account.enable"; // end gcs // bigquery constants - public static final String BIGQ_CREDENTIALS_FILE = "credentialsFile"; + public static final String BIGQ_CREDENTIALS = "credentials"; public static final String BIGQ_PARENT_PROJECT = "parentProject"; public static final String BIGQ_MATERIAL_DATASET = "materializationDataset"; public static final String BIGQ_VIEWS_ENABLED = "viewsEnabled"; diff --git a/java/src/test/java/com/logicalclocks/hsfs/TestHopsworksExternalClient.java b/java/src/test/java/com/logicalclocks/hsfs/TestHopsworksExternalClient.java index b992b26f4a..883b8c4953 100644 --- a/java/src/test/java/com/logicalclocks/hsfs/TestHopsworksExternalClient.java +++ b/java/src/test/java/com/logicalclocks/hsfs/TestHopsworksExternalClient.java @@ -24,9 +24,9 @@ import org.apache.http.HttpHost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.junit.Assert; import org.junit.Rule; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -63,6 +63,6 @@ public void testReadAPIKeyFromFile() throws IOException, FeatureStoreException { HopsworksExternalClient hopsworksExternalClient = new HopsworksExternalClient( httpClient, httpHost); String apiKey = hopsworksExternalClient.readApiKey(null, null, apiFilePath.toString()); - Assert.assertEquals("hello", apiKey); + Assertions.assertEquals("hello", apiKey); } } diff --git a/java/src/test/java/com/logicalclocks/hsfs/TestStorageConnector.java b/java/src/test/java/com/logicalclocks/hsfs/TestStorageConnector.java new file mode 100644 index 0000000000..f3495f26a5 --- /dev/null +++ b/java/src/test/java/com/logicalclocks/hsfs/TestStorageConnector.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 Hopsworks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.logicalclocks.hsfs; + +import com.logicalclocks.hsfs.util.Constants; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Base64; +import java.util.Map; + +public class TestStorageConnector { + + @Test + public void testBigQueryCredentialsBase64Encoded(@TempDir Path tempDir) throws IOException { + // Arrange + String credentials = "{\"type\": \"service_account\", \"project_id\": \"test\"}"; + Path credentialsFile = tempDir.resolve("bigquery.json"); + Files.write(credentialsFile, credentials.getBytes()); + + StorageConnector.BigqueryConnector bigqueryConnector = new StorageConnector.BigqueryConnector(); + bigqueryConnector.setKeyPath("file://" + credentialsFile); + + // Act + Map sparkOptions = bigqueryConnector.sparkOptions(); + + // Assert + Assertions.assertEquals(credentials, + new String(Base64.getDecoder().decode(sparkOptions.get(Constants.BIGQ_CREDENTIALS)), StandardCharsets.UTF_8)); + } +} diff --git a/java/src/test/java/com/logicalclocks/hsfs/metadata/TestTagsApi.java b/java/src/test/java/com/logicalclocks/hsfs/metadata/TestTagsApi.java index 0726d1d450..324b876eb4 100644 --- a/java/src/test/java/com/logicalclocks/hsfs/metadata/TestTagsApi.java +++ b/java/src/test/java/com/logicalclocks/hsfs/metadata/TestTagsApi.java @@ -17,8 +17,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.logicalclocks.hsfs.EntityEndpointType; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Map; @@ -29,7 +29,7 @@ public void testDoubleValueRead() throws IOException { TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP); ObjectMapper objectMapper = new ObjectMapper(); Object obj = tagsApi.parseTagValue(objectMapper, 4.2d); - Assert.assertTrue(obj instanceof Double); + Assertions.assertTrue(obj instanceof Double); } @Test @@ -44,7 +44,7 @@ public void testIntegerValue() throws IOException { TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP); ObjectMapper objectMapper = new ObjectMapper(); Object obj = tagsApi.parseTagValue(objectMapper, 4); - Assert.assertTrue(obj instanceof Integer); + Assertions.assertTrue(obj instanceof Integer); } @Test @@ -52,7 +52,7 @@ public void testStringValue() throws IOException { TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP); ObjectMapper objectMapper = new ObjectMapper(); Object obj = tagsApi.parseTagValue(objectMapper, "test"); - Assert.assertTrue(obj instanceof String); + Assertions.assertTrue(obj instanceof String); } @Test @@ -60,7 +60,7 @@ public void testMapValue() throws IOException { TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP); ObjectMapper objectMapper = new ObjectMapper(); Object obj = tagsApi.parseTagValue(objectMapper, "{\"key\":\"value\"}"); - Assert.assertTrue(obj instanceof Map); + Assertions.assertTrue(obj instanceof Map); } @Test @@ -68,7 +68,7 @@ public void testArrayValue() throws IOException { TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP); ObjectMapper objectMapper = new ObjectMapper(); Object obj = tagsApi.parseTagValue(objectMapper, "[{\"key\":\"value1\"}, {\"key\":\"value2\"}]"); - Assert.assertTrue(obj.getClass().isArray()); + Assertions.assertTrue(obj.getClass().isArray()); } @Test @@ -76,9 +76,9 @@ public void testInnerPrimitiveTypes() throws IOException { TagsApi tagsApi = new TagsApi(EntityEndpointType.FEATURE_GROUP); ObjectMapper objectMapper = new ObjectMapper(); Object obj = tagsApi.parseTagValue(objectMapper, "{\"key1\":\"value\", \"key2\":4.2, \"key3\":4}"); - Assert.assertTrue(obj instanceof Map); - Assert.assertTrue(((Map) obj).get("key1") instanceof String); - Assert.assertTrue(((Map) obj).get("key2") instanceof Double); - Assert.assertTrue(((Map) obj).get("key3") instanceof Integer); + Assertions.assertTrue(obj instanceof Map); + Assertions.assertTrue(((Map) obj).get("key1") instanceof String); + Assertions.assertTrue(((Map) obj).get("key2") instanceof Double); + Assertions.assertTrue(((Map) obj).get("key3") instanceof Integer); } } diff --git a/java/src/test/resources/system.properties b/java/src/test/resources/system.properties new file mode 100644 index 0000000000..3d39703a4f --- /dev/null +++ b/java/src/test/resources/system.properties @@ -0,0 +1 @@ +spark.master=local[1] diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml index 4249fcb968..ab8e164be3 100644 --- a/python/.pre-commit-config.yaml +++ b/python/.pre-commit-config.yaml @@ -1,18 +1,18 @@ exclude: setup.py repos: - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 22.8.0 hooks: - id: black language_version: python3 - repo: https://gitlab.com/pycqa/flake8 - rev: 3.8.3 + rev: 3.9.2 hooks: - id: flake8 language_version: python3 args: [--config=python/.flake8] - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.4.0 + rev: v4.3.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer diff --git a/python/hsfs/engine/__init__.py b/python/hsfs/engine/__init__.py index 185abe996a..204dcee188 100644 --- a/python/hsfs/engine/__init__.py +++ b/python/hsfs/engine/__init__.py @@ -55,6 +55,14 @@ def get_instance(): raise Exception("Couldn't find execution engine. Try reconnecting to Hopsworks.") +# Used for testing +def set_instance(engine_type, engine): + global _engine_type + global _engine + _engine_type = engine_type + _engine = engine + + def get_type(): global _engine_type if _engine_type: diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 9ae4464f41..549b391356 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -627,7 +627,11 @@ def _read_stream_kafka(self, stream, message_format, schema, include_metadata): return stream.load().select("key", "value") def add_file(self, file): - self._spark_context.addFile("hdfs://" + file) + # This is used for unit testing + if not file.startswith("file://"): + file = "hdfs://" + file + + self._spark_context.addFile(file) return SparkFiles.get(os.path.basename(file)) def profile( diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index a291c59798..c6a2ac179f 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -18,6 +18,7 @@ from typing import Optional import humps +import base64 from hsfs import engine from hsfs.core import storage_connector_api @@ -1041,7 +1042,7 @@ def prepare_spark(self, path: Optional[str] = None): class BigQueryConnector(StorageConnector): type = StorageConnector.BIGQUERY BIGQUERY_FORMAT = "bigquery" - BIGQ_CREDENTIALS_FILE = "credentialsFile" + BIGQ_CREDENTIALS = "credentials" BIGQ_PARENT_PROJECT = "parentProject" BIGQ_MATERIAL_DATASET = "materializationDataset" BIGQ_VIEWS_ENABLED = "viewsEnabled" @@ -1113,9 +1114,14 @@ def arguments(self): def spark_options(self): """Return spark options to be set for BigQuery spark connector""" properties = self._arguments - local_key_path = engine.get_instance().add_file(self._key_path) - properties[self.BIGQ_CREDENTIALS_FILE] = local_key_path properties[self.BIGQ_PARENT_PROJECT] = self._parent_project + + local_key_path = engine.get_instance().add_file(self._key_path) + with open(local_key_path, "rb") as credentials_file: + properties[self.BIGQ_CREDENTIALS] = base64.b64encode( + credentials_file.read() + ) + if self._materialization_dataset: properties[self.BIGQ_MATERIAL_DATASET] = self._materialization_dataset properties[self.BIGQ_VIEWS_ENABLED] = "true" diff --git a/python/tests/test_storage_connector.py b/python/tests/test_storage_connector.py index 4380e76216..0f3db6049b 100644 --- a/python/tests/test_storage_connector.py +++ b/python/tests/test_storage_connector.py @@ -14,7 +14,11 @@ # limitations under the License. # -from hsfs import storage_connector +import base64 + +from hsfs import engine, storage_connector +from hsfs.storage_connector import BigQueryConnector +from hsfs.engine import spark class TestHopsfsConnector: @@ -503,3 +507,28 @@ def test_from_response_json_basic_info(self, backend_fixtures): assert sc.query_project is None assert sc.materialization_dataset is None assert sc.arguments == {} + + def test_credentials_base64_encoded(self, backend_fixtures, tmp_path): + # Arrange + engine.set_instance("spark", spark.Engine()) + + credentials = '{"type": "service_account", "project_id": "test"}' + + credentialsFile = tmp_path / "bigquery.json" + credentialsFile.write_text(credentials) + + json = backend_fixtures["storage_connector"]["get_big_query"]["response"] + json["key_path"] = "file://" + str(credentialsFile.resolve()) + + sc = storage_connector.StorageConnector.from_response_json(json) + + # Act + spark_options = sc.spark_options() + + # Assert - Credentials should be base64 encoded + assert ( + base64.b64decode(spark_options[BigQueryConnector.BIGQ_CREDENTIALS]).decode( + "utf-8" + ) + == credentials + ) From e50d6e2486491c32834063317f1b984238d39d42 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Sun, 2 Oct 2022 01:30:50 +0200 Subject: [PATCH 2/3] Fix property encoding to be a string and not byte array --- python/hsfs/storage_connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index c6a2ac179f..d82ac570ee 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -1118,8 +1118,8 @@ def spark_options(self): local_key_path = engine.get_instance().add_file(self._key_path) with open(local_key_path, "rb") as credentials_file: - properties[self.BIGQ_CREDENTIALS] = base64.b64encode( - credentials_file.read() + properties[self.BIGQ_CREDENTIALS] = str( + base64.b64encode(credentials_file.read()), "utf-8" ) if self._materialization_dataset: From 885c5f8af88a97043f8e9792dadc22e9f64f49ca Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Mon, 3 Oct 2022 11:39:32 +0200 Subject: [PATCH 3/3] Check if options is null before checking if it's empty --- java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index cebd274d59..9a778cb579 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -600,7 +600,7 @@ public Object read(String query, String dataFormat, Map options, Map readOptions = sparkOptions(); // merge user spark options on top of default spark options - if (!options.isEmpty()) { + if (options != null && !options.isEmpty()) { readOptions.putAll(options); }