From eb18f1a0c285f3bd698ea0a0f94aa0becc1374a0 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jun 2024 17:34:17 +0200 Subject: [PATCH 1/4] Remove unused methods --- .../apache/iceberg/expressions/Projections.java | 4 ---- .../GenericArrowVectorAccessorFactory.java | 6 ------ .../java/org/apache/iceberg/DeleteFileIndex.java | 16 ---------------- .../org/apache/iceberg/hadoop/HadoopStreams.java | 5 ----- .../org/apache/iceberg/parquet/ParquetAvro.java | 4 ---- .../iceberg/parquet/ParquetValueWriters.java | 8 -------- 6 files changed, 43 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/expressions/Projections.java b/api/src/main/java/org/apache/iceberg/expressions/Projections.java index f873edfff5b1..216f8a739cc3 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/Projections.java +++ b/api/src/main/java/org/apache/iceberg/expressions/Projections.java @@ -190,10 +190,6 @@ public Expression predicate(UnboundPredicate pred) { PartitionSpec spec() { return spec; } - - boolean isCaseSensitive() { - return caseSensitive; - } } private static class InclusiveProjection extends BaseProjectionEvaluator { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index a988516bc6df..6350660d57bf 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -561,12 +561,6 @@ private static class FixedSizeBinaryAccessor< private final FixedSizeBinaryVector vector; private final StringFactory stringFactory; - FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) { - super(vector); - this.vector = vector; - this.stringFactory = null; - } - FixedSizeBinaryAccessor( FixedSizeBinaryVector vector, StringFactory stringFactory) { super(vector); diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index e401a8179ea0..c26716481836 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -737,22 +737,10 @@ public DeleteFile wrapped() { return wrapped; } - public PartitionSpec spec() { - return spec; - } - - public StructLike partition() { - return wrapped.partition(); - } - public long applySequenceNumber() { return applySequenceNumber; } - public FileContent content() { - return wrapped.content(); - } - public List equalityFields() { if (equalityFields == null) { synchronized (this) { @@ -778,10 +766,6 @@ public Map nullValueCounts() { return wrapped.nullValueCounts(); } - public Map nanValueCounts() { - return wrapped.nanValueCounts(); - } - public boolean hasLowerAndUpperBounds() { return wrapped.lowerBounds() != null && wrapped.upperBounds() != null; } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index f9b43b684666..c817b1d90afb 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -123,10 +122,6 @@ public int read(byte[] b, int off, int len) throws IOException { return stream.read(b, off, len); } - public int read(ByteBuffer buf) throws IOException { - return stream.read(buf); - } - @SuppressWarnings("checkstyle:NoFinalizer") @Override protected void finalize() throws Throwable { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java index 974c00076b7d..680eb1435967 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java @@ -377,10 +377,6 @@ private static class Pair { this.second = second; } - public static Pair of(K first, V second) { - return new Pair<>(first, second); - } - public K getFirst() { return first; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index b2d91c99ef44..4eddf91a182f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -147,18 +147,10 @@ private UnboxedWriter(ColumnDescriptor desc) { super(desc); } - public void writeBoolean(int repetitionLevel, boolean value) { - column.writeBoolean(repetitionLevel, value); - } - public void writeInteger(int repetitionLevel, int value) { column.writeInteger(repetitionLevel, value); } - public void writeLong(int repetitionLevel, long value) { - column.writeLong(repetitionLevel, value); - } - public void writeFloat(int repetitionLevel, float value) { column.writeFloat(repetitionLevel, value); } From 7300b624ded347d7696319c10ee59fd093c25628 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jun 2024 17:35:04 +0200 Subject: [PATCH 2/4] Remove redundant generics --- .../java/org/apache/iceberg/expressions/ExpressionUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java index 3708dafc4126..bf72e03bc406 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java +++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java @@ -484,7 +484,7 @@ public String predicate(UnboundPredicate pred) { } } - private static List abbreviateValues(List sanitizedValues) { + private static List abbreviateValues(List sanitizedValues) { if (sanitizedValues.size() >= LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD) { Set distinctValues = ImmutableSet.copyOf(sanitizedValues); if (distinctValues.size() From 401fac0b5dd5a3c1acdc00817a3c6e59ff16e5a7 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jun 2024 17:57:49 +0200 Subject: [PATCH 3/4] Remove unused StructReader Setters abstraction --- .../iceberg/parquet/ParquetValueReaders.java | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index c1f76e7bdb9a..a335fe11cf49 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -696,28 +696,21 @@ public V setValue(V newValue) { } public abstract static class StructReader implements ParquetValueReader { - private interface Setter { - void set(R record, int pos, Object reuse); - } - private final ParquetValueReader[] readers; private final TripleIterator column; private final List> children; - @SuppressWarnings("unchecked") protected StructReader(List types, List> readers) { this.readers = (ParquetValueReader[]) Array.newInstance(ParquetValueReader.class, readers.size()); TripleIterator[] columns = (TripleIterator[]) Array.newInstance(TripleIterator.class, readers.size()); - Setter[] setters = (Setter[]) Array.newInstance(Setter.class, readers.size()); ImmutableList.Builder> columnsBuilder = ImmutableList.builder(); for (int i = 0; i < readers.size(); i += 1) { ParquetValueReader reader = readers.get(i); this.readers[i] = readers.get(i); columns[i] = reader.column(); - setters[i] = newSetter(reader, types.get(i)); columnsBuilder.addAll(reader.columns()); } @@ -754,41 +747,6 @@ public List> columns() { return children; } - @SuppressWarnings("unchecked") - private Setter newSetter(ParquetValueReader reader, Type type) { - if (reader instanceof UnboxedReader && type.isPrimitive()) { - UnboxedReader unboxed = (UnboxedReader) reader; - switch (type.asPrimitiveType().getPrimitiveTypeName()) { - case BOOLEAN: - return (record, pos, ignored) -> setBoolean(record, pos, unboxed.readBoolean()); - case INT32: - return (record, pos, ignored) -> setInteger(record, pos, unboxed.readInteger()); - case INT64: - return (record, pos, ignored) -> setLong(record, pos, unboxed.readLong()); - case FLOAT: - return (record, pos, ignored) -> setFloat(record, pos, unboxed.readFloat()); - case DOUBLE: - return (record, pos, ignored) -> setDouble(record, pos, unboxed.readDouble()); - case INT96: - case FIXED_LEN_BYTE_ARRAY: - case BINARY: - return (record, pos, ignored) -> set(record, pos, unboxed.readBinary()); - default: - throw new UnsupportedOperationException("Unsupported type: " + type); - } - } - - // TODO: Add support for options to avoid the null check - return (record, pos, reuse) -> { - Object obj = reader.read((E) reuse); - if (obj != null) { - set(record, pos, obj); - } else { - setNull(record, pos); - } - }; - } - @SuppressWarnings("unchecked") private E get(I intermediate, int pos) { return (E) getField(intermediate, pos); From f612e96d8f327830cefd0a0be5d06fd9a955ecd0 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 18 Jun 2024 09:39:32 +0200 Subject: [PATCH 4/4] Add javadoc summary where missing The javadoc summary is expected by error-prone's `MissingSummary` check. Apparently the error-prone version currently in use could not find these, but a newer version can. --- .../iceberg/flink/sink/shuffle/MapRangePartitioner.java | 8 +++++++- .../flink/source/enumerator/EnumerationHistory.java | 6 +++++- .../iceberg/flink/sink/shuffle/MapRangePartitioner.java | 8 +++++++- .../flink/source/enumerator/EnumerationHistory.java | 6 +++++- .../iceberg/flink/sink/shuffle/MapRangePartitioner.java | 8 +++++++- .../flink/source/enumerator/EnumerationHistory.java | 6 +++++- 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java index fb1a8f03a65c..dde86b5b6047 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java @@ -163,6 +163,8 @@ Map mapStatistics() { } /** + * Returns assignment summary for every subtask. + * * @return assignment summary for every subtask. Key is subtaskId. Value pair is (weight assigned * to the subtask, number of keys assigned to the subtask) */ @@ -328,7 +330,11 @@ static class KeyAssignment { } } - /** @return subtask id */ + /** + * Select a subtask for the key. + * + * @return subtask id + */ int select() { if (assignedSubtasks.length == 1) { // only choice. no need to run random number generator. diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java index ef21dad0199d..ec56a9ecdac1 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java @@ -78,7 +78,11 @@ synchronized boolean hasFullHistory() { return count >= history.length; } - /** @return true if split discovery should pause because assigner has too many splits already. */ + /** + * Checks whether split discovery should be paused. + * + * @return true if split discovery should pause because assigner has too many splits already. + */ synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) { if (count < history.length) { // only check throttling when full history is obtained. diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java index fb1a8f03a65c..dde86b5b6047 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java @@ -163,6 +163,8 @@ Map mapStatistics() { } /** + * Returns assignment summary for every subtask. + * * @return assignment summary for every subtask. Key is subtaskId. Value pair is (weight assigned * to the subtask, number of keys assigned to the subtask) */ @@ -328,7 +330,11 @@ static class KeyAssignment { } } - /** @return subtask id */ + /** + * Select a subtask for the key. + * + * @return subtask id + */ int select() { if (assignedSubtasks.length == 1) { // only choice. no need to run random number generator. diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java index ef21dad0199d..ec56a9ecdac1 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java @@ -78,7 +78,11 @@ synchronized boolean hasFullHistory() { return count >= history.length; } - /** @return true if split discovery should pause because assigner has too many splits already. */ + /** + * Checks whether split discovery should be paused. + * + * @return true if split discovery should pause because assigner has too many splits already. + */ synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) { if (count < history.length) { // only check throttling when full history is obtained. diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java index 4f52915a925e..298426cee872 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java @@ -161,6 +161,8 @@ Map mapStatistics() { } /** + * Returns assignment summary for every subtask. + * * @return assignment summary for every subtask. Key is subtaskId. Value pair is (weight assigned * to the subtask, number of keys assigned to the subtask) */ @@ -326,7 +328,11 @@ static class KeyAssignment { } } - /** @return subtask id */ + /** + * Select a subtask for the key. + * + * @return subtask id + */ int select() { if (assignedSubtasks.length == 1) { // only choice. no need to run random number generator. diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java index ef21dad0199d..ec56a9ecdac1 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java @@ -78,7 +78,11 @@ synchronized boolean hasFullHistory() { return count >= history.length; } - /** @return true if split discovery should pause because assigner has too many splits already. */ + /** + * Checks whether split discovery should be paused. + * + * @return true if split discovery should pause because assigner has too many splits already. + */ synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) { if (count < history.length) { // only check throttling when full history is obtained.