diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index fd8f7de62f1e..092c3b88599b 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -37,6 +37,12 @@ public interface ContentFile { /** Returns id of the partition spec used for partition metadata. */ int specId(); + /** Return id of the schema when write this file. */ + default Integer schemaId() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement schemaId"); + } + /** * Returns type of content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES. */ diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 59b329c500c7..4e9685097825 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -98,11 +98,12 @@ public interface DataFile extends ContentFile { Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID"); Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID"); + Types.NestedField SCHEMA_ID = optional(142, "schema_id", IntegerType.get(), "Schema ID"); int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 142 + // NEXT ID TO ASSIGN: 143 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -123,7 +124,8 @@ static StructType getType(StructType partitionType) { KEY_METADATA, SPLIT_OFFSETS, EQUALITY_IDS, - SORT_ORDER_ID); + SORT_ORDER_ID, + SCHEMA_ID); } /** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */ diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index aa05275c0af8..1d3886a6269b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -78,6 +78,7 @@ public PartitionData copy() { private int[] equalityIds = null; private byte[] keyMetadata = null; private Integer sortOrderId; + private Integer schemaId; // cached schema private transient Schema avroSchema = null; @@ -123,6 +124,7 @@ public PartitionData copy() { } BaseFile( + Integer schemaId, int specId, FileContent content, String filePath, @@ -140,6 +142,7 @@ public PartitionData copy() { int[] equalityFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { + this.schemaId = schemaId; this.partitionSpecId = specId; this.content = content; this.filePath = filePath; @@ -177,6 +180,7 @@ public PartitionData copy() { */ BaseFile(BaseFile toCopy, boolean fullCopy) { this.fileOrdinal = toCopy.fileOrdinal; + this.schemaId = toCopy.schemaId; this.partitionSpecId = toCopy.partitionSpecId; this.content = toCopy.content; this.filePath = toCopy.filePath; @@ -320,6 +324,9 @@ public void put(int i, Object value) { this.sortOrderId = (Integer) value; return; case 17: + this.schemaId = (Integer) value; + return; + case 18: this.fileOrdinal = (long) value; return; default: @@ -375,6 +382,8 @@ public Object get(int i) { case 16: return sortOrderId; case 17: + return schemaId; + case 18: return fileOrdinal; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -396,6 +405,11 @@ public Long pos() { return fileOrdinal; } + @Override + public Integer schemaId() { + return schemaId; + } + @Override public FileContent content() { return content; @@ -526,6 +540,7 @@ public String toString() { .add("sort_order_id", sortOrderId) .add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber) .add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber) + .add("schema_id", schemaId == null ? "null" : schemaId) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index b3edf2927fbc..32f789bce47d 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -28,6 +28,7 @@ import org.apache.iceberg.util.JsonUtil; class ContentFileParser { + private static final String SCHEMA_ID = "schema-id"; private static final String SPEC_ID = "spec-id"; private static final String CONTENT = "content"; private static final String FILE_PATH = "file-path"; @@ -78,6 +79,10 @@ static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator // ignore the ordinal position (ContentFile#pos) of the file in a manifest, // as it isn't used and BaseFile constructor doesn't support it. + if (contentFile.schemaId() != null) { + generator.writeNumberField(SCHEMA_ID, contentFile.schemaId()); + } + generator.writeNumberField(SPEC_ID, contentFile.specId()); generator.writeStringField(CONTENT, contentFile.content().name()); generator.writeStringField(FILE_PATH, contentFile.path().toString()); @@ -118,6 +123,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { jsonNode.isObject(), "Invalid JSON node for content file: non-object (%s)", jsonNode); Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); + Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, jsonNode); int specId = JsonUtil.getInt(SPEC_ID, jsonNode); FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode)); String filePath = JsonUtil.getString(FILE_PATH, jsonNode); @@ -148,6 +154,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { if (fileContent == FileContent.DATA) { return new GenericDataFile( + schemaId, specId, filePath, fileFormat, @@ -160,6 +167,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { sortOrderId); } else { return new GenericDeleteFile( + schemaId, specId, fileContent, filePath, diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index a6ea80d3662a..d9c6f12cd0f0 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -156,6 +156,7 @@ public static class Builder { private List splitOffsets = null; private List equalityFieldIds = null; private Integer sortOrderId = SortOrder.unsorted().orderId(); + private Integer schemaId = null; public Builder(PartitionSpec spec) { this.spec = spec; @@ -180,6 +181,7 @@ public void clear() { this.upperBounds = null; this.splitOffsets = null; this.sortOrderId = SortOrder.unsorted().orderId(); + this.schemaId = null; } public Builder copy(DataFile toCopy) { @@ -203,6 +205,7 @@ public Builder copy(DataFile toCopy) { this.splitOffsets = toCopy.splitOffsets() == null ? null : ImmutableList.copyOf(toCopy.splitOffsets()); this.sortOrderId = toCopy.sortOrderId(); + this.schemaId = toCopy.schemaId(); return this; } @@ -325,6 +328,11 @@ public Builder withSortOrder(SortOrder newSortOrder) { return this; } + public Builder withSchemaId(Integer newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DataFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -335,6 +343,7 @@ public DataFile build() { Preconditions.checkArgument(recordCount >= 0, "Record count is required"); return new GenericDataFile( + schemaId, specId, filePath, format, diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 9a201d1b3b6f..3aef04acb458 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -59,6 +59,7 @@ public static class Builder { private ByteBuffer keyMetadata = null; private Integer sortOrderId = null; private List splitOffsets = null; + private Integer schemaId = null; Builder(PartitionSpec spec) { this.spec = spec; @@ -82,6 +83,7 @@ public void clear() { this.lowerBounds = null; this.upperBounds = null; this.sortOrderId = null; + this.schemaId = null; } public Builder copy(DeleteFile toCopy) { @@ -104,6 +106,7 @@ public Builder copy(DeleteFile toCopy) { this.keyMetadata = toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata()); this.sortOrderId = toCopy.sortOrderId(); + this.schemaId = toCopy.schemaId(); return this; } @@ -220,6 +223,11 @@ public Builder withSortOrder(SortOrder newSortOrder) { return this; } + public Builder withSchemaId(Integer newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DeleteFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -245,6 +253,7 @@ public DeleteFile build() { } return new GenericDeleteFile( + schemaId, specId, content, filePath, diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 07c5172f1b3f..1edb64074f3d 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -32,6 +32,7 @@ class GenericDataFile extends BaseFile implements DataFile { } GenericDataFile( + Integer schemaId, int specId, String filePath, FileFormat format, @@ -43,6 +44,7 @@ class GenericDataFile extends BaseFile implements DataFile { int[] equalityFieldIds, Integer sortOrderId) { super( + schemaId, specId, FileContent.DATA, filePath, diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index cf20d931aa28..a49d42187893 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -32,6 +32,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { } GenericDeleteFile( + Integer schemaId, int specId, FileContent content, String filePath, @@ -44,6 +45,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { List splitOffsets, ByteBuffer keyMetadata) { super( + schemaId, specId, content, filePath, diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index f262614127bd..df14ffd20180 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -223,7 +223,8 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) { DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, - DataFile.SORT_ORDER_ID); + DataFile.SORT_ORDER_ID, + DataFile.SCHEMA_ID); } /** Wrapper used to write a ManifestEntry to v1 metadata. */ @@ -371,6 +372,8 @@ public Object get(int pos) { return wrapped.splitOffsets(); case 14: return wrapped.sortOrderId(); + case 15: + return wrapped.schemaId(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -480,6 +483,11 @@ public Long fileSequenceNumber() { return wrapped.fileSequenceNumber(); } + @Override + public Integer schemaId() { + return wrapped.schemaId(); + } + @Override public DataFile copy() { return wrapped.copy(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 7db54e46b95e..3d7383fb6cc9 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, - DataFile.SORT_ORDER_ID); + DataFile.SORT_ORDER_ID, + DataFile.SCHEMA_ID); } static class IndexedManifestEntry> @@ -446,6 +447,8 @@ public Object get(int pos) { return wrapped.equalityFieldIds(); case 15: return wrapped.sortOrderId(); + case 16: + return wrapped.schemaId(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -555,6 +558,11 @@ public Long fileSequenceNumber() { return wrapped.fileSequenceNumber(); } + @Override + public Integer schemaId() { + return wrapped.schemaId(); + } + @Override public F copy() { throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index 5decabd50d6c..9bd1ee94a580 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -41,6 +41,7 @@ public class EqualityDeleteWriter implements FileWriter private final ByteBuffer keyMetadata; private final int[] equalityFieldIds; private final SortOrder sortOrder; + private final Integer schemaId; private DeleteFile deleteFile = null; public EqualityDeleteWriter( @@ -52,6 +53,28 @@ public EqualityDeleteWriter( EncryptionKeyMetadata keyMetadata, SortOrder sortOrder, int... equalityFieldIds) { + this( + appender, + format, + location, + spec, + partition, + keyMetadata, + sortOrder, + null, + equalityFieldIds); + } + + public EqualityDeleteWriter( + FileAppender appender, + FileFormat format, + String location, + PartitionSpec spec, + StructLike partition, + EncryptionKeyMetadata keyMetadata, + SortOrder sortOrder, + Integer schemaId, + int... equalityFieldIds) { this.appender = appender; this.format = format; this.location = location; @@ -59,6 +82,7 @@ public EqualityDeleteWriter( this.partition = partition; this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.sortOrder = sortOrder; + this.schemaId = schemaId; this.equalityFieldIds = equalityFieldIds; } @@ -87,6 +111,7 @@ public void close() throws IOException { .withMetrics(appender.metrics()) .withSplitOffsets(appender.splitOffsets()) .withSortOrder(sortOrder) + .withSchemaId(schemaId) .build(); } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index 4f799b434993..404cfc210a3a 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -57,6 +57,7 @@ public class PositionDeleteWriter implements FileWriter, De private final StructLike partition; private final ByteBuffer keyMetadata; private final CharSequenceSet referencedDataFiles; + private final Integer schemaId; private DeleteFile deleteFile = null; public PositionDeleteWriter( @@ -66,6 +67,17 @@ public PositionDeleteWriter( PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { + this(appender, format, location, spec, partition, keyMetadata, null); + } + + public PositionDeleteWriter( + FileAppender appender, + FileFormat format, + String location, + PartitionSpec spec, + StructLike partition, + EncryptionKeyMetadata keyMetadata, + Integer schemaId) { this.appender = appender; this.format = format; this.location = location; @@ -73,6 +85,7 @@ public PositionDeleteWriter( this.partition = partition; this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.referencedDataFiles = CharSequenceSet.empty(); + this.schemaId = schemaId; } @Override @@ -100,6 +113,7 @@ public void close() throws IOException { .withSplitOffsets(appender.splitOffsets()) .withFileSizeInBytes(appender.length()) .withMetrics(metrics()) + .withSchemaId(schemaId) .build(); } } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index dfb372d3a89e..bbe323f0fd9d 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -37,6 +37,7 @@ public class DataWriter implements FileWriter { private final StructLike partition; private final ByteBuffer keyMetadata; private final SortOrder sortOrder; + private final Integer schemaId; private DataFile dataFile = null; public DataWriter( @@ -57,6 +58,18 @@ public DataWriter( StructLike partition, EncryptionKeyMetadata keyMetadata, SortOrder sortOrder) { + this(appender, format, location, spec, partition, keyMetadata, sortOrder, null); + } + + public DataWriter( + FileAppender appender, + FileFormat format, + String location, + PartitionSpec spec, + StructLike partition, + EncryptionKeyMetadata keyMetadata, + SortOrder sortOrder, + Integer schemaId) { this.appender = appender; this.format = format; this.location = location; @@ -64,6 +77,7 @@ public DataWriter( this.partition = partition; this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.sortOrder = sortOrder; + this.schemaId = schemaId; } @Override @@ -90,6 +104,7 @@ public void close() throws IOException { .withMetrics(appender.metrics()) .withSplitOffsets(appender.splitOffsets()) .withSortOrder(sortOrder) + .withSchemaId(schemaId) .build(); } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index a800214bc9a7..78e6beade1f1 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -598,6 +598,7 @@ protected DataFile newDataFile(String partitionPath) { .withFileSizeInBytes(10) .withPartitionPath(partitionPath) .withRecordCount(1) + .withSchemaId(table.schema().schemaId()) .build(); } @@ -609,6 +610,7 @@ protected DeleteFile newDeleteFile(int specId, String partitionPath) { .withFileSizeInBytes(10) .withPartitionPath(partitionPath) .withRecordCount(1) + .withSchemaId(table.schema().schemaId()) .build(); } @@ -620,6 +622,7 @@ protected DeleteFile newEqualityDeleteFile(int specId, String partitionPath, int .withFileSizeInBytes(10) .withPartitionPath(partitionPath) .withRecordCount(1) + .withSchemaId(table.schema().schemaId()) .build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java index 9360f571c5bb..5cc2fcd8d4f1 100644 --- a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java +++ b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java @@ -108,6 +108,7 @@ private static Stream provideSpecAndDataFile() { private static DataFile dataFileWithRequiredOnly(PartitionSpec spec) { DataFiles.Builder builder = DataFiles.builder(spec) + .withSchemaId(0) .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(10) .withRecordCount(1); @@ -122,17 +123,17 @@ private static DataFile dataFileWithRequiredOnly(PartitionSpec spec) { private static String dataFileJsonWithRequiredOnly(PartitionSpec spec) { if (spec.isUnpartitioned()) { - return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\"," + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}"; } else { - return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\"," + "\"partition\":{\"1000\":1},\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}"; } } private static String dataFileJsonWithAllOptional(PartitionSpec spec) { if (spec.isUnpartitioned()) { - return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\"," + "\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":350,\"record-count\":10," + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]}," + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]}," @@ -143,7 +144,7 @@ private static String dataFileJsonWithAllOptional(PartitionSpec spec) { + "\"key-metadata\":\"00000000000000000000000000000000\"," + "\"split-offsets\":[128,256],\"equality-ids\":[1],\"sort-order-id\":1}"; } else { - return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\"," + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":1},\"file-size-in-bytes\":350,\"record-count\":10," + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]}," + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]}," @@ -159,6 +160,7 @@ private static String dataFileJsonWithAllOptional(PartitionSpec spec) { private static DataFile dataFileWithAllOptional(PartitionSpec spec) { DataFiles.Builder builder = DataFiles.builder(spec) + .withSchemaId(0) .withPath("/path/to/data-with-stats.parquet") .withMetrics( new Metrics( @@ -224,6 +226,7 @@ private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) { } return new GenericDeleteFile( + spec.schema().schemaId(), spec.specId(), FileContent.POSITION_DELETES, "/path/to/delete-a.parquet", @@ -263,6 +266,7 @@ private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) { ); return new GenericDeleteFile( + spec.schema().schemaId(), spec.specId(), FileContent.EQUALITY_DELETES, "/path/to/delete-with-stats.parquet", @@ -278,17 +282,17 @@ private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) { private static String deleteFileJsonWithRequiredOnly(PartitionSpec spec) { if (spec.isUnpartitioned()) { - return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\"," + "\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":1234,\"record-count\":9}"; } else { - return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\"," + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":9},\"file-size-in-bytes\":1234,\"record-count\":9}"; } } private static String deleteFileJsonWithAllOptional(PartitionSpec spec) { if (spec.isUnpartitioned()) { - return "{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\"," + "\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":1234,\"record-count\":10," + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]}," + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]}," @@ -299,7 +303,7 @@ private static String deleteFileJsonWithAllOptional(PartitionSpec spec) { + "\"key-metadata\":\"00000000000000000000000000000000\"," + "\"split-offsets\":[128],\"equality-ids\":[3],\"sort-order-id\":1}"; } else { - return "{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\"," + return "{\"schema-id\":0,\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\"," + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":9},\"file-size-in-bytes\":1234,\"record-count\":10," + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]}," + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]}," diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 09ed1e448560..ffeffac0d308 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -18,14 +18,20 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.types.Types.NestedField.required; + import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -53,6 +59,7 @@ public void testManifestReaderWithEmptyInheritableMetadata() throws IOException Assert.assertEquals(Status.EXISTING, entry.status()); Assert.assertEquals(FILE_A.path(), entry.file().path()); Assert.assertEquals(1000L, (long) entry.snapshotId()); + Assertions.assertThat(entry.file().schemaId()).isNull(); } } @@ -130,7 +137,7 @@ public void testDataFilePositions() throws IOException { for (DataFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); Assert.assertEquals( - "Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + "Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } @@ -147,9 +154,135 @@ public void testDeleteFilePositions() throws IOException { for (DeleteFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); Assert.assertEquals( - "Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + "Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } } + + @Test + public void testDataFileWithSchemaId() throws IOException { + DataFile fileA = newDataFile("data_bucket=0"); + DataFile fileB = newDataFile("data_bucket=1"); + + // test copy + DataFile fileACopy = fileA.copy(true); + assertFileEqual(fileA, fileACopy); + DataFile fileBCopy = fileB.copy(true); + assertFileEqual(fileB, fileBCopy); + + ManifestFile manifest = writeManifest(1000L, fileA, fileB); + Map specsById = ImmutableMap.of(SPEC.specId(), SPEC); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, specsById)) { + List files = Lists.newArrayList(reader); + Assertions.assertThat(files.size()).isEqualTo(2); + assertFileEqual(fileA, files.get(0)); + assertFileEqual(fileB, files.get(1)); + } + } + + @Test + public void testDeleteFileWithSchemaId() throws IOException { + Assume.assumeTrue("Delete files only work for format version 2", formatVersion == 2); + DeleteFile deleteFileA = newDeleteFile(table.spec().specId(), "data_bucket=0"); + DeleteFile deleteFileB = newDeleteFile(table.spec().specId(), "data_bucket=1"); + + // test copy + DeleteFile deleteFileACopy = deleteFileA.copy(true); + assertFileEqual(deleteFileA, deleteFileACopy); + DeleteFile deleteFileBCopy = deleteFileB.copy(true); + assertFileEqual(deleteFileB, deleteFileBCopy); + + ManifestFile manifest = writeDeleteManifest(formatVersion, 1000L, deleteFileA, deleteFileB); + Map specsById = ImmutableMap.of(SPEC.specId(), SPEC); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifest, FILE_IO, specsById)) { + List files = Lists.newArrayList(reader); + Assertions.assertThat(files.size()).isEqualTo(2); + assertFileEqual(deleteFileA, files.get(0)); + assertFileEqual(deleteFileB, files.get(1)); + } + } + + @Test + public void testReadOldManifestFile() throws IOException { + Schema schema = + new Schema( + required(1, "id", Types.LongType.get()), + required(2, "timestamp", Types.TimestampType.withZone()), + required(3, "category", Types.StringType.get()), + required(4, "data", Types.StringType.get()), + required(5, "double", Types.DoubleType.get())); + + PartitionSpec spec = + PartitionSpec.builderFor(schema) + .identity("category") + .hour("timestamp") + .bucket("id", 16) + .build(); + PartitionData partition = DataFiles.data(spec, "category=cheesy/timestamp_hour=10/id_bucket=3"); + + String path = "s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro"; + long fileSize = 150972L; + FileFormat format = FileFormat.AVRO; + + Metrics metrics = + new Metrics( + 1587L, + ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes + ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts + ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts + ImmutableMap.of(5, 10L), // nan value counts + ImmutableMap.of( + 1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds + ImmutableMap.of( + 1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds + Integer sortOrderId = 2; + + String fileName = String.format("OldManifestFileV%s.avro", formatVersion); + InputFile file = + FILE_IO.newInputFile(getClass().getClassLoader().getResource(fileName).getPath()); + Map specsById = ImmutableMap.of(spec.specId(), spec); + try (ManifestReader reader = + new ManifestReader<>( + file, + spec.specId(), + specsById, + InheritableMetadataFactory.empty(), + ManifestReader.FileType.DATA_FILES)) { + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + DataFile dataFile = entry.file(); + Assertions.assertThat(dataFile.path()).isEqualTo(path); + Assertions.assertThat(dataFile.format()).isEqualTo(format); + Assertions.assertThat(dataFile.fileSizeInBytes()).isEqualTo(fileSize); + Assertions.assertThat(dataFile.partition()).isEqualTo(partition); + Assertions.assertThat(dataFile.keyMetadata()).isNull(); + Assertions.assertThat(dataFile.recordCount()).isEqualTo(metrics.recordCount()); + Assertions.assertThat(dataFile.columnSizes()).isEqualTo(metrics.columnSizes()); + Assertions.assertThat(dataFile.valueCounts()).isEqualTo(metrics.valueCounts()); + Assertions.assertThat(dataFile.nullValueCounts()).isEqualTo(metrics.nullValueCounts()); + Assertions.assertThat(dataFile.nanValueCounts()).isEqualTo(metrics.nanValueCounts()); + Assertions.assertThat(dataFile.lowerBounds()).isEqualTo(metrics.lowerBounds()); + Assertions.assertThat(dataFile.upperBounds()).isEqualTo(metrics.upperBounds()); + Assertions.assertThat(dataFile.sortOrderId()).isEqualTo(sortOrderId); + Assertions.assertThat(dataFile.schemaId()).isNull(); + } + } + + private void assertFileEqual(ContentFile expected, ContentFile actual) { + Assertions.assertThat(actual.schemaId()).isEqualTo(expected.schemaId()); + Assertions.assertThat(actual.content()).isEqualTo(expected.content()); + Assertions.assertThat(actual.path().toString()).isEqualTo(expected.path().toString()); + Assertions.assertThat(actual.format()).isEqualTo(expected.format()); + Assertions.assertThat(actual.partition()).isEqualTo(expected.partition()); + Assertions.assertThat(actual.keyMetadata()).isEqualTo(expected.keyMetadata()); + Assertions.assertThat(actual.recordCount()).isEqualTo(expected.recordCount()); + Assertions.assertThat(actual.columnSizes()).isEqualTo(expected.columnSizes()); + Assertions.assertThat(actual.valueCounts()).isEqualTo(expected.valueCounts()); + Assertions.assertThat(actual.nullValueCounts()).isEqualTo(expected.nullValueCounts()); + Assertions.assertThat(actual.nanValueCounts()).isEqualTo(expected.nanValueCounts()); + Assertions.assertThat(actual.lowerBounds()).isEqualTo(expected.lowerBounds()); + Assertions.assertThat(actual.upperBounds()).isEqualTo(expected.upperBounds()); + Assertions.assertThat(actual.sortOrderId()).isEqualTo(expected.sortOrderId()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 08b27d7460da..efe67c8fff03 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -75,16 +75,28 @@ public class TestManifestWriterVersions { ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds private static final List OFFSETS = ImmutableList.of(4L); private static final Integer SORT_ORDER_ID = 2; + private static final int SCHEMA_ID = 5; private static final DataFile DATA_FILE = new GenericDataFile( - 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, null, SORT_ORDER_ID); + SCHEMA_ID, + 0, + PATH, + FORMAT, + PARTITION, + 150972L, + METRICS, + null, + OFFSETS, + null, + SORT_ORDER_ID); private static final List EQUALITY_IDS = ImmutableList.of(1); private static final int[] EQUALITY_ID_ARR = new int[] {1}; private static final DeleteFile DELETE_FILE = new GenericDeleteFile( + SCHEMA_ID, 0, FileContent.EQUALITY_DELETES, PATH, @@ -245,6 +257,7 @@ void checkDataFile(ContentFile dataFile, FileContent content) { Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds()); Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds()); Assert.assertEquals("Sort order id", SORT_ORDER_ID, dataFile.sortOrderId()); + Assert.assertEquals("Schema Id", Integer.valueOf(SCHEMA_ID), dataFile.schemaId()); if (dataFile.content() == FileContent.EQUALITY_DELETES) { Assert.assertEquals(EQUALITY_IDS, dataFile.equalityFieldIds()); } else { diff --git a/core/src/test/resources/OldManifestFileV1.avro b/core/src/test/resources/OldManifestFileV1.avro new file mode 100644 index 000000000000..d38bbd0f3ab8 Binary files /dev/null and b/core/src/test/resources/OldManifestFileV1.avro differ diff --git a/core/src/test/resources/OldManifestFileV2.avro b/core/src/test/resources/OldManifestFileV2.avro new file mode 100644 index 000000000000..5c9c412bce8c Binary files /dev/null and b/core/src/test/resources/OldManifestFileV2.avro differ diff --git a/format/spec.md b/format/spec.md index 60c0f99c3f90..8070e3b8131e 100644 --- a/format/spec.md +++ b/format/spec.md @@ -443,6 +443,7 @@ The schema of a manifest file is a struct called `manifest_entry` with the follo | _optional_ | _optional_ | **`132 split_offsets`** | `list<133: long>` | Split offsets for the data file. For example, all row group offsets in a Parquet file. Must be sorted ascending | | | _optional_ | **`135 equality_ids`** | `list<136: int>` | Field ids used to determine row equality in equality delete files. Required when `content=2` and should be null otherwise. Fields with ids listed in this column must be present in the delete file | | _optional_ | _optional_ | **`140 sort_order_id`** | `int` | ID representing sort order for this file [3]. | +| _optional_ | _optional_ | **`142 schema_id`** | `int` | ID representing Schema for this file | Notes: diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 2cb7f0eba646..7a43d8de02e0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -34,6 +34,7 @@ public class SparkDataFile implements DataFile { private final int filePathPosition; private final int fileFormatPosition; + private final int schemaIdPosition; private final int partitionPosition; private final int recordCountPosition; private final int fileSizeInBytesPosition; @@ -69,6 +70,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { filePathPosition = positions.get("file_path"); fileFormatPosition = positions.get("file_format"); + schemaIdPosition = positions.get("schema_id"); partitionPosition = positions.get("partition"); recordCountPosition = positions.get("record_count"); fileSizeInBytesPosition = positions.get("file_size_in_bytes"); @@ -189,6 +191,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public Integer schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 76796825894a..36e0e2734216 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -35,6 +35,7 @@ public class SparkDataFile implements DataFile { private final int filePathPosition; private final int fileFormatPosition; + private final int schemaIdPosition; private final int partitionPosition; private final int recordCountPosition; private final int fileSizeInBytesPosition; @@ -86,6 +87,7 @@ public SparkDataFile( filePathPosition = positions.get("file_path"); fileFormatPosition = positions.get("file_format"); + schemaIdPosition = positions.get("schema_id"); partitionPosition = positions.get("partition"); recordCountPosition = positions.get("record_count"); fileSizeInBytesPosition = positions.get("file_size_in_bytes"); @@ -206,6 +208,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public Integer schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 76796825894a..36e0e2734216 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -35,6 +35,7 @@ public class SparkDataFile implements DataFile { private final int filePathPosition; private final int fileFormatPosition; + private final int schemaIdPosition; private final int partitionPosition; private final int recordCountPosition; private final int fileSizeInBytesPosition; @@ -86,6 +87,7 @@ public SparkDataFile( filePathPosition = positions.get("file_path"); fileFormatPosition = positions.get("file_format"); + schemaIdPosition = positions.get("schema_id"); partitionPosition = positions.get("partition"); recordCountPosition = positions.get("record_count"); fileSizeInBytesPosition = positions.get("file_size_in_bytes"); @@ -206,6 +208,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public Integer schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 76796825894a..36e0e2734216 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -35,6 +35,7 @@ public class SparkDataFile implements DataFile { private final int filePathPosition; private final int fileFormatPosition; + private final int schemaIdPosition; private final int partitionPosition; private final int recordCountPosition; private final int fileSizeInBytesPosition; @@ -86,6 +87,7 @@ public SparkDataFile( filePathPosition = positions.get("file_path"); fileFormatPosition = positions.get("file_format"); + schemaIdPosition = positions.get("schema_id"); partitionPosition = positions.get("partition"); recordCountPosition = positions.get("record_count"); fileSizeInBytesPosition = positions.get("file_size_in_bytes"); @@ -206,6 +208,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public Integer schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name);