Skip to content

Commit 2e8b743

Browse files
author
Wenye Zhang
committed
remove hiveberg dependency in iceberg-spark2 module
1 parent 76bda7b commit 2e8b743

File tree

6 files changed

+162
-32
lines changed

6 files changed

+162
-32
lines changed

build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,12 +498,16 @@ project(':iceberg-hive-metastore') {
498498
project(':iceberg-hiveberg') {
499499
dependencies {
500500
compile project(':iceberg-hive-metastore')
501+
compile project(':iceberg-spark2')
501502

502503
compileOnly "org.apache.avro:avro"
503504
compileOnly("org.apache.hadoop:hadoop-client") {
504505
exclude group: 'org.apache.avro', module: 'avro'
505506
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
506507
}
508+
compileOnly("org.apache.spark:spark-hive_2.11") {
509+
exclude group: 'org.apache.avro', module: 'avro'
510+
}
507511

508512
compileOnly("org.apache.hive:hive-metastore") {
509513
exclude group: 'org.apache.avro', module: 'avro'
@@ -872,7 +876,6 @@ if (jdkVersion == '8') {
872876
compile project(':iceberg-arrow')
873877
compile project(':iceberg-hive-metastore')
874878
compile project(':iceberg-spark')
875-
compile project(':iceberg-hiveberg')
876879

877880
compileOnly "org.apache.avro:avro"
878881
compileOnly("org.apache.spark:spark-hive_2.11") {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.hiveberg.spark2;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.iceberg.Table;
24+
import org.apache.iceberg.encryption.EncryptionManager;
25+
import org.apache.iceberg.io.FileIO;
26+
import org.apache.iceberg.spark.SparkSchemaUtil;
27+
import org.apache.iceberg.spark.SparkUtil;
28+
import org.apache.spark.broadcast.Broadcast;
29+
import org.apache.spark.sql.sources.v2.DataSourceOptions;
30+
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
31+
import org.apache.spark.sql.types.StructType;
32+
33+
34+
public class IcebergSource extends org.apache.iceberg.spark.source.IcebergSource {
35+
36+
@Override
37+
public DataSourceReader createReader(StructType readSchema, DataSourceOptions options) {
38+
Configuration conf = new Configuration(lazyBaseConf());
39+
Table table = getTableAndResolveHadoopConfiguration(options, conf);
40+
String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive");
41+
42+
Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
43+
Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
44+
45+
Reader reader = new Reader(table, io, encryptionManager, Boolean.parseBoolean(caseSensitive), options);
46+
if (readSchema != null) {
47+
// convert() will fail if readSchema contains fields not in table.schema()
48+
SparkSchemaUtil.convert(table.schema(), readSchema);
49+
reader.pruneColumns(readSchema);
50+
}
51+
52+
return reader;
53+
}
54+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.hiveberg.spark2;
21+
22+
import java.util.OptionalLong;
23+
import org.apache.iceberg.CombinedScanTask;
24+
import org.apache.iceberg.FileScanTask;
25+
import org.apache.iceberg.SnapshotSummary;
26+
import org.apache.iceberg.Table;
27+
import org.apache.iceberg.encryption.EncryptionManager;
28+
import org.apache.iceberg.expressions.Expressions;
29+
import org.apache.iceberg.hiveberg.LegacyHiveTable;
30+
import org.apache.iceberg.io.FileIO;
31+
import org.apache.iceberg.spark.SparkSchemaUtil;
32+
import org.apache.iceberg.spark.source.Stats;
33+
import org.apache.iceberg.util.PropertyUtil;
34+
import org.apache.spark.broadcast.Broadcast;
35+
import org.apache.spark.sql.sources.v2.DataSourceOptions;
36+
import org.apache.spark.sql.sources.v2.reader.Statistics;
37+
38+
39+
class Reader extends org.apache.iceberg.spark.source.Reader {
40+
Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
41+
boolean caseSensitive, DataSourceOptions options) {
42+
super(table, io, encryptionManager, caseSensitive, options);
43+
}
44+
45+
@Override
46+
public Statistics estimateStatistics() {
47+
Table table = super.getTable();
48+
if (table instanceof LegacyHiveTable) {
49+
// We currently don't have reliable stats for Hive tables
50+
return EMPTY_STATS;
51+
}
52+
53+
// its a fresh table, no data
54+
if (table.currentSnapshot() == null) {
55+
return new Stats(0L, 0L);
56+
}
57+
58+
// estimate stats using snapshot summary only for partitioned tables (metadata tables are unpartitioned)
59+
if (!table.spec().isUnpartitioned() && filterExpression() == Expressions.alwaysTrue()) {
60+
long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(),
61+
SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE);
62+
return new Stats(SparkSchemaUtil.estimateSize(lazyType(), totalRecords), totalRecords);
63+
}
64+
65+
long sizeInBytes = 0L;
66+
long numRows = 0L;
67+
68+
for (CombinedScanTask task : tasks()) {
69+
for (FileScanTask file : task.files()) {
70+
sizeInBytes += file.length();
71+
numRows += file.file().recordCount();
72+
}
73+
}
74+
75+
return new Stats(sizeInBytes, numRows);
76+
}
77+
78+
private static final Statistics EMPTY_STATS = new Statistics() {
79+
@Override
80+
public OptionalLong sizeInBytes() {
81+
return OptionalLong.empty();
82+
}
83+
84+
@Override
85+
public OptionalLong numRows() {
86+
return OptionalLong.empty();
87+
}
88+
};
89+
}

spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,29 +139,28 @@ protected Table findTable(DataSourceOptions options, Configuration conf) {
139139
}
140140
}
141141

142-
private SparkSession lazySparkSession() {
142+
protected SparkSession lazySparkSession() {
143143
if (lazySpark == null) {
144144
this.lazySpark = SparkSession.builder().getOrCreate();
145145
}
146146
return lazySpark;
147147
}
148148

149-
private JavaSparkContext lazySparkContext() {
149+
protected JavaSparkContext lazySparkContext() {
150150
if (lazySparkContext == null) {
151151
this.lazySparkContext = new JavaSparkContext(lazySparkSession().sparkContext());
152152
}
153153
return lazySparkContext;
154154
}
155155

156-
private Configuration lazyBaseConf() {
156+
protected Configuration lazyBaseConf() {
157157
if (lazyConf == null) {
158158
this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
159159
}
160160
return lazyConf;
161161
}
162162

163-
private Table getTableAndResolveHadoopConfiguration(
164-
DataSourceOptions options, Configuration conf) {
163+
protected Table getTableAndResolveHadoopConfiguration(DataSourceOptions options, Configuration conf) {
165164
// Overwrite configurations from the Spark Context with configurations from the options.
166165
mergeIcebergHadoopConfs(conf, options.asMap());
167166
Table table = findTable(options, conf);

spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.List;
2525
import java.util.Locale;
2626
import java.util.Map;
27-
import java.util.OptionalLong;
2827
import org.apache.hadoop.conf.Configuration;
2928
import org.apache.hadoop.fs.FileSystem;
3029
import org.apache.hadoop.fs.Path;
@@ -44,7 +43,6 @@
4443
import org.apache.iceberg.expressions.Expressions;
4544
import org.apache.iceberg.hadoop.HadoopFileIO;
4645
import org.apache.iceberg.hadoop.Util;
47-
import org.apache.iceberg.hiveberg.LegacyHiveTable;
4846
import org.apache.iceberg.io.CloseableIterable;
4947
import org.apache.iceberg.io.FileIO;
5048
import org.apache.iceberg.orc.OrcRowFilterUtils;
@@ -80,7 +78,7 @@
8078

8179
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
8280

83-
class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
81+
public class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
8482
SupportsPushDownRequiredColumns, SupportsReportStatistics {
8583
private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
8684

@@ -112,7 +110,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
112110
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
113111
private Boolean readUsingBatch = null;
114112

115-
Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
113+
protected Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
116114
boolean caseSensitive, DataSourceOptions options) {
117115
this.table = table;
118116
this.snapshotId = options.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
@@ -202,14 +200,14 @@ private Schema lazySchema() {
202200
return schema;
203201
}
204202

205-
private Expression filterExpression() {
203+
protected Expression filterExpression() {
206204
if (filterExpressions != null) {
207205
return filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
208206
}
209207
return Expressions.alwaysTrue();
210208
}
211209

212-
private StructType lazyType() {
210+
protected StructType lazyType() {
213211
if (type == null) {
214212
Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()),
215213
"Spark does not support timestamp without time zone fields");
@@ -310,11 +308,6 @@ public void pruneColumns(StructType newRequestedSchema) {
310308

311309
@Override
312310
public Statistics estimateStatistics() {
313-
if (table instanceof LegacyHiveTable) {
314-
// We currently don't have reliable stats for Hive tables
315-
return EMPTY_STATS;
316-
}
317-
318311
// its a fresh table, no data
319312
if (table.currentSnapshot() == null) {
320313
return new Stats(0L, 0L);
@@ -340,18 +333,6 @@ public Statistics estimateStatistics() {
340333
return new Stats(sizeInBytes, numRows);
341334
}
342335

343-
private static final Statistics EMPTY_STATS = new Statistics() {
344-
@Override
345-
public OptionalLong sizeInBytes() {
346-
return OptionalLong.empty();
347-
}
348-
349-
@Override
350-
public OptionalLong numRows() {
351-
return OptionalLong.empty();
352-
}
353-
};
354-
355336
@Override
356337
public boolean enableBatchRead() {
357338
if (readUsingBatch == null) {
@@ -402,7 +383,7 @@ private static void mergeIcebergHadoopConfs(
402383
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
403384
}
404385

405-
private List<CombinedScanTask> tasks() {
386+
protected List<CombinedScanTask> tasks() {
406387
if (tasks == null) {
407388
TableScan scan = table
408389
.newScan()
@@ -588,4 +569,8 @@ private static class BatchReader extends BatchDataReader implements InputPartiti
588569
super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
589570
}
590571
}
572+
573+
public Table getTable() {
574+
return table;
575+
}
591576
}

spark2/src/main/java/org/apache/iceberg/spark/source/Stats.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import java.util.OptionalLong;
2323
import org.apache.spark.sql.sources.v2.reader.Statistics;
2424

25-
class Stats implements Statistics {
25+
public class Stats implements Statistics {
2626
private final OptionalLong sizeInBytes;
2727
private final OptionalLong numRows;
2828

29-
Stats(long sizeInBytes, long numRows) {
29+
public Stats(long sizeInBytes, long numRows) {
3030
this.sizeInBytes = OptionalLong.of(sizeInBytes);
3131
this.numRows = OptionalLong.of(numRows);
3232
}

0 commit comments

Comments
 (0)