Skip to content

Commit 78f8c50

Browse files
mccheahrdblue
authored andcommitted
Allow custom hadoop properties to be loaded in the Spark data source (#7)
Properties that start with iceberg.hadoop are copied into the Hadoop Configuration used in the Spark source. These may be set in table properties or in read and write options passed to the Spark operation. Read and write options take precedence over the table properties. Supporting these custom Hadoop properties should also be done in other Iceberg integrations in subsequent patches.
1 parent 5684ac1 commit 78f8c50

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.spark.sql.types.StructType;
3737
import java.util.List;
3838
import java.util.Locale;
39+
import java.util.Map;
3940
import java.util.Optional;
4041

4142
import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
@@ -53,16 +54,18 @@ public String shortName() {
5354

5455
@Override
5556
public DataSourceReader createReader(DataSourceOptions options) {
56-
Table table = findTable(options);
57-
return new Reader(table, lazyConf());
57+
Configuration conf = new Configuration(lazyBaseConf());
58+
Table table = getTableAndResolveHadoopConfiguration(options, conf);
59+
60+
return new Reader(table, conf);
5861
}
5962

6063
@Override
6164
public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
6265
DataSourceOptions options) {
6366
Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
64-
65-
Table table = findTable(options);
67+
Configuration conf = new Configuration(lazyBaseConf());
68+
Table table = getTableAndResolveHadoopConfiguration(options, conf);
6669

6770
Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
6871
List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
@@ -86,30 +89,49 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct
8689
.toUpperCase(Locale.ENGLISH));
8790
}
8891

89-
return Optional.of(new Writer(table, lazyConf(), format));
92+
return Optional.of(new Writer(table, conf, format));
9093
}
9194

92-
protected Table findTable(DataSourceOptions options) {
95+
protected Table findTable(DataSourceOptions options, Configuration conf) {
9396
Optional<String> location = options.get("path");
9497
Preconditions.checkArgument(location.isPresent(),
9598
"Cannot open table without a location: path is not set");
9699

97-
HadoopTables tables = new HadoopTables(lazyConf());
100+
HadoopTables tables = new HadoopTables(conf);
98101

99102
return tables.load(location.get());
100103
}
101104

102-
protected SparkSession lazySparkSession() {
105+
private SparkSession lazySparkSession() {
103106
if (lazySpark == null) {
104107
this.lazySpark = SparkSession.builder().getOrCreate();
105108
}
106109
return lazySpark;
107110
}
108111

109-
protected Configuration lazyConf() {
112+
private Configuration lazyBaseConf() {
110113
if (lazyConf == null) {
111114
this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
112115
}
113116
return lazyConf;
114117
}
118+
119+
private Table getTableAndResolveHadoopConfiguration(
120+
DataSourceOptions options, Configuration conf) {
121+
// Overwrite configurations from the Spark Context with configurations from the options.
122+
mergeIcebergHadoopConfs(conf, options.asMap());
123+
Table table = findTable(options, conf);
124+
// Set confs from table properties
125+
mergeIcebergHadoopConfs(conf, table.properties());
126+
// Re-overwrite values set in options and table properties but were not in the environment.
127+
mergeIcebergHadoopConfs(conf, options.asMap());
128+
return table;
129+
}
130+
131+
private static void mergeIcebergHadoopConfs(
132+
Configuration baseConf, Map<String, String> options) {
133+
options.keySet().stream()
134+
.filter(key -> key.startsWith("iceberg.hadoop"))
135+
.forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key)));
136+
}
115137
}

spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.netflix.iceberg.spark.source;
1818

1919
import com.netflix.iceberg.Table;
20+
import org.apache.hadoop.conf.Configuration;
2021
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2122

2223
public class TestIcebergSource extends IcebergSource {
@@ -26,7 +27,7 @@ public String shortName() {
2627
}
2728

2829
@Override
29-
protected Table findTable(DataSourceOptions options) {
30+
protected Table findTable(DataSourceOptions options, Configuration conf) {
3031
return TestTables.load(options.get("iceberg.table.name").get());
3132
}
3233
}

0 commit comments

Comments
 (0)