Skip to content

Commit 1b3dbb6

Browse files
authored
SPARK: Allow spark catalogs to have hadoop configuration overrides p… (#2792)
Previously Iceberg Catalogs loaded into Spark would always use the Hadoop Configuration owned by the underlying Spark Session. This made it impossible to use a different set of configuration values which may be required to connect to a remote Catalog. This patch allows Spark catalogs to have hadoop configuration overrides per catalog permitting different configuration for different underlying Iceberg catalogs.
1 parent 0055e85 commit 1b3dbb6

File tree

4 files changed

+177
-3
lines changed

4 files changed

+177
-3
lines changed

spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.function.BiFunction;
2525
import java.util.function.Function;
2626
import java.util.stream.Collectors;
27+
import org.apache.hadoop.conf.Configuration;
2728
import org.apache.iceberg.PartitionField;
2829
import org.apache.iceberg.PartitionSpec;
2930
import org.apache.iceberg.Schema;
@@ -37,6 +38,7 @@
3738
import org.apache.iceberg.types.Types;
3839
import org.apache.iceberg.util.Pair;
3940
import org.apache.spark.sql.RuntimeConfig;
41+
import org.apache.spark.sql.SparkSession;
4042
import org.apache.spark.util.SerializableConfiguration;
4143

4244
public class SparkUtil {
@@ -52,6 +54,12 @@ public class SparkUtil {
5254
public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES =
5355
"spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables";
5456

57+
private static final String SPARK_CATALOG_CONF_PREFIX = "spark.sql.catalog";
58+
// Format string used as the prefix for spark configuration keys to override hadoop configuration values
59+
// for Iceberg tables from a given catalog. These keys can be specified as `spark.sql.catalog.$catalogName.hadoop.*`,
60+
// similar to using `spark.hadoop.*` to override hadoop configurations globally for a given spark session.
61+
private static final String SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR = SPARK_CATALOG_CONF_PREFIX + ".%s.hadoop.";
62+
5563
private SparkUtil() {
5664
}
5765

@@ -170,4 +178,37 @@ public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionCo
170178
return false;
171179
}
172180

181+
/**
182+
* Pulls any Catalog specific overrides for the Hadoop conf from the current SparkSession, which can be
183+
* set via `spark.sql.catalog.$catalogName.hadoop.*`
184+
*
185+
* Mirrors the override of hadoop configurations for a given spark session using `spark.hadoop.*`.
186+
*
187+
* The SparkCatalog allows for hadoop configurations to be overridden per catalog, by setting
188+
* them on the SQLConf, where the following will add the property "fs.default.name" with value
189+
* "hdfs://hanksnamenode:8020" to the catalog's hadoop configuration.
190+
* SparkSession.builder()
191+
* .config(s"spark.sql.catalog.$catalogName.hadoop.fs.default.name", "hdfs://hanksnamenode:8020")
192+
* .getOrCreate()
193+
* @param spark The current Spark session
194+
* @param catalogName Name of the catalog to find overrides for.
195+
* @return the Hadoop Configuration that should be used for this catalog, with catalog specific overrides applied.
196+
*/
197+
public static Configuration hadoopConfCatalogOverrides(SparkSession spark, String catalogName) {
198+
// Find keys for the catalog intended to be hadoop configurations
199+
final String hadoopConfCatalogPrefix = hadoopConfPrefixForCatalog(catalogName);
200+
final Configuration conf = spark.sessionState().newHadoopConf();
201+
spark.sqlContext().conf().settings().forEach((k, v) -> {
202+
// These checks are copied from `spark.sessionState().newHadoopConfWithOptions()`, which we
203+
// avoid using to not have to convert back and forth between scala / java map types.
204+
if (v != null && k != null && k.startsWith(hadoopConfCatalogPrefix)) {
205+
conf.set(k.substring(hadoopConfCatalogPrefix.length()), v);
206+
}
207+
});
208+
return conf;
209+
}
210+
211+
private static String hadoopConfPrefixForCatalog(String catalogName) {
212+
return String.format(SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR, catalogName);
213+
}
173214
}

spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private static Catalog buildCatalog(Pair<SparkSession, String> sparkAndName) {
6868
SparkSession spark = sparkAndName.first();
6969
String name = sparkAndName.second();
7070
SparkConf sparkConf = spark.sparkContext().getConf();
71-
Configuration conf = spark.sessionState().newHadoopConf();
71+
Configuration conf = SparkUtil.hadoopConfCatalogOverrides(spark, name);
7272

7373
String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
7474
if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&

spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class SparkCatalog extends BaseCatalog {
9797
* @return an Iceberg catalog
9898
*/
9999
protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
100-
Configuration conf = SparkSession.active().sessionState().newHadoopConf();
100+
Configuration conf = SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name);
101101
Map<String, String> optionsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
102102
optionsMap.putAll(options);
103103
optionsMap.put(CatalogProperties.APP_ID, SparkSession.active().sparkContext().applicationId());
@@ -390,7 +390,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
390390
this.catalogName = name;
391391
SparkSession sparkSession = SparkSession.active();
392392
this.useTimestampsWithoutZone = SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf());
393-
this.tables = new HadoopTables(sparkSession.sessionState().newHadoopConf());
393+
this.tables = new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name));
394394
this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog;
395395
if (catalog instanceof SupportsNamespaces) {
396396
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.source;
21+
22+
import java.util.Map;
23+
import org.apache.hadoop.conf.Configurable;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.iceberg.KryoHelpers;
26+
import org.apache.iceberg.SerializableTable;
27+
import org.apache.iceberg.Table;
28+
import org.apache.iceberg.TestHelpers;
29+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
30+
import org.apache.iceberg.spark.SparkCatalog;
31+
import org.apache.iceberg.spark.SparkCatalogTestBase;
32+
import org.apache.iceberg.spark.SparkSessionCatalog;
33+
import org.apache.spark.sql.connector.catalog.Identifier;
34+
import org.apache.spark.sql.connector.catalog.TableCatalog;
35+
import org.junit.After;
36+
import org.junit.Assert;
37+
import org.junit.Before;
38+
import org.junit.Test;
39+
import org.junit.runners.Parameterized;
40+
41+
42+
public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase {
43+
44+
private static final String configToOverride = "fs.s3a.buffer.dir";
45+
// prepend "hadoop." so that the test base formats SQLConf correctly
46+
// as `spark.sql.catalogs.<catalogName>.hadoop.<configToOverride>
47+
private static final String hadoopPrefixedConfigToOverride = "hadoop." + configToOverride;
48+
private static final String configOverrideValue = "/tmp-overridden";
49+
50+
@Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
51+
public static Object[][] parameters() {
52+
return new Object[][] {
53+
{ "testhive", SparkCatalog.class.getName(),
54+
ImmutableMap.of(
55+
"type", "hive",
56+
"default-namespace", "default",
57+
hadoopPrefixedConfigToOverride, configOverrideValue
58+
) },
59+
{ "testhadoop", SparkCatalog.class.getName(),
60+
ImmutableMap.of(
61+
"type", "hadoop",
62+
hadoopPrefixedConfigToOverride, configOverrideValue
63+
) },
64+
{ "spark_catalog", SparkSessionCatalog.class.getName(),
65+
ImmutableMap.of(
66+
"type", "hive",
67+
"default-namespace", "default",
68+
hadoopPrefixedConfigToOverride, configOverrideValue
69+
) }
70+
};
71+
}
72+
73+
public TestSparkCatalogHadoopOverrides(String catalogName,
74+
String implementation,
75+
Map<String, String> config) {
76+
super(catalogName, implementation, config);
77+
}
78+
79+
@Before
80+
public void createTable() {
81+
sql("CREATE TABLE IF NOT EXISTS %s (id bigint) USING iceberg", tableName(tableIdent.name()));
82+
}
83+
84+
@After
85+
public void dropTable() {
86+
sql("DROP TABLE IF EXISTS %s", tableName(tableIdent.name()));
87+
}
88+
89+
@Test
90+
public void testTableFromCatalogHasOverrides() throws Exception {
91+
Table table = getIcebergTableFromSparkCatalog();
92+
Configuration conf = ((Configurable) table.io()).getConf();
93+
String actualCatalogOverride = conf.get(configToOverride, "/whammies");
94+
Assert.assertEquals(
95+
"Iceberg tables from spark should have the overridden hadoop configurations from the spark config",
96+
configOverrideValue, actualCatalogOverride);
97+
}
98+
99+
@Test
100+
public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws Exception {
101+
Table table = getIcebergTableFromSparkCatalog();
102+
Configuration originalConf = ((Configurable) table.io()).getConf();
103+
String actualCatalogOverride = originalConf.get(configToOverride, "/whammies");
104+
Assert.assertEquals(
105+
"Iceberg tables from spark should have the overridden hadoop configurations from the spark config",
106+
configOverrideValue, actualCatalogOverride);
107+
108+
// Now convert to SerializableTable and ensure overridden property is still present.
109+
Table serializableTable = SerializableTable.copyOf(table);
110+
Table kryoSerializedTable = KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table));
111+
Configuration configFromKryoSerde = ((Configurable) kryoSerializedTable.io()).getConf();
112+
String kryoSerializedCatalogOverride = configFromKryoSerde.get(configToOverride, "/whammies");
113+
Assert.assertEquals(
114+
"Tables serialized with Kryo serialization should retain overridden hadoop configuration properties",
115+
configOverrideValue, kryoSerializedCatalogOverride);
116+
117+
// Do the same for Java based serde
118+
Table javaSerializedTable = TestHelpers.roundTripSerialize(serializableTable);
119+
Configuration configFromJavaSerde = ((Configurable) javaSerializedTable.io()).getConf();
120+
String javaSerializedCatalogOverride = configFromJavaSerde.get(configToOverride, "/whammies");
121+
Assert.assertEquals(
122+
"Tables serialized with Java serialization should retain overridden hadoop configuration properties",
123+
configOverrideValue, javaSerializedCatalogOverride);
124+
}
125+
126+
@SuppressWarnings("ThrowSpecificity")
127+
private Table getIcebergTableFromSparkCatalog() throws Exception {
128+
Identifier identifier = Identifier.of(tableIdent.namespace().levels(), tableIdent.name());
129+
TableCatalog catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName);
130+
SparkTable sparkTable = (SparkTable) catalog.loadTable(identifier);
131+
return sparkTable.table();
132+
}
133+
}

0 commit comments

Comments
 (0)