Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public interface ContentFile<F> {
/** Returns id of the partition spec used for partition metadata. */
int specId();

/** Return id of the schema when write this file. */
default Integer schemaId() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement schemaId");
}

/**
* Returns type of content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES.
*/
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField SORT_ORDER_ID =
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Types.NestedField SCHEMA_ID = optional(142, "schema_id", IntegerType.get(), "Schema ID");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 142
// NEXT ID TO ASSIGN: 143

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -123,7 +124,8 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID);
SORT_ORDER_ID,
SCHEMA_ID);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it in the last one to reduce the code changes.

}

/** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public PartitionData copy() {
private int[] equalityIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;
private Integer schemaId;

// cached schema
private transient Schema avroSchema = null;
Expand Down Expand Up @@ -123,6 +124,7 @@ public PartitionData copy() {
}

BaseFile(
Integer schemaId,
int specId,
FileContent content,
String filePath,
Expand All @@ -140,6 +142,7 @@ public PartitionData copy() {
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
this.schemaId = schemaId;
this.partitionSpecId = specId;
this.content = content;
this.filePath = filePath;
Expand Down Expand Up @@ -177,6 +180,7 @@ public PartitionData copy() {
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
this.fileOrdinal = toCopy.fileOrdinal;
this.schemaId = toCopy.schemaId;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
this.filePath = toCopy.filePath;
Expand Down Expand Up @@ -320,6 +324,9 @@ public void put(int i, Object value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.schemaId = (Integer) value;
return;
case 18:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -375,6 +382,8 @@ public Object get(int i) {
case 16:
return sortOrderId;
case 17:
return schemaId;
case 18:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand All @@ -396,6 +405,11 @@ public Long pos() {
return fileOrdinal;
}

@Override
public Integer schemaId() {
return schemaId;
}

@Override
public FileContent content() {
return content;
Expand Down Expand Up @@ -526,6 +540,7 @@ public String toString() {
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("schema_id", schemaId == null ? "null" : schemaId)
.toString();
}
}
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.util.JsonUtil;

class ContentFileParser {
private static final String SCHEMA_ID = "schema-id";
private static final String SPEC_ID = "spec-id";
private static final String CONTENT = "content";
private static final String FILE_PATH = "file-path";
Expand Down Expand Up @@ -78,6 +79,10 @@ static void toJson(ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator
// ignore the ordinal position (ContentFile#pos) of the file in a manifest,
// as it isn't used and BaseFile constructor doesn't support it.

if (contentFile.schemaId() != null) {
generator.writeNumberField(SCHEMA_ID, contentFile.schemaId());
}

generator.writeNumberField(SPEC_ID, contentFile.specId());
generator.writeStringField(CONTENT, contentFile.content().name());
generator.writeStringField(FILE_PATH, contentFile.path().toString());
Expand Down Expand Up @@ -118,6 +123,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
jsonNode.isObject(), "Invalid JSON node for content file: non-object (%s)", jsonNode);
Preconditions.checkArgument(spec != null, "Invalid partition spec: null");

Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, jsonNode);
int specId = JsonUtil.getInt(SPEC_ID, jsonNode);
FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode));
String filePath = JsonUtil.getString(FILE_PATH, jsonNode);
Expand Down Expand Up @@ -148,6 +154,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {

if (fileContent == FileContent.DATA) {
return new GenericDataFile(
schemaId,
specId,
filePath,
fileFormat,
Expand All @@ -160,6 +167,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
sortOrderId);
} else {
return new GenericDeleteFile(
schemaId,
specId,
fileContent,
filePath,
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public static class Builder {
private List<Long> splitOffsets = null;
private List<Integer> equalityFieldIds = null;
private Integer sortOrderId = SortOrder.unsorted().orderId();
private Integer schemaId = null;

public Builder(PartitionSpec spec) {
this.spec = spec;
Expand All @@ -180,6 +181,7 @@ public void clear() {
this.upperBounds = null;
this.splitOffsets = null;
this.sortOrderId = SortOrder.unsorted().orderId();
this.schemaId = null;
}

public Builder copy(DataFile toCopy) {
Expand All @@ -203,6 +205,7 @@ public Builder copy(DataFile toCopy) {
this.splitOffsets =
toCopy.splitOffsets() == null ? null : ImmutableList.copyOf(toCopy.splitOffsets());
this.sortOrderId = toCopy.sortOrderId();
this.schemaId = toCopy.schemaId();
return this;
}

Expand Down Expand Up @@ -325,6 +328,11 @@ public Builder withSortOrder(SortOrder newSortOrder) {
return this;
}

public Builder withSchemaId(Integer newSchemaId) {
this.schemaId = newSchemaId;
return this;
}

public DataFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
Expand All @@ -335,6 +343,7 @@ public DataFile build() {
Preconditions.checkArgument(recordCount >= 0, "Record count is required");

return new GenericDataFile(
schemaId,
specId,
filePath,
format,
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static class Builder {
private ByteBuffer keyMetadata = null;
private Integer sortOrderId = null;
private List<Long> splitOffsets = null;
private Integer schemaId = null;

Builder(PartitionSpec spec) {
this.spec = spec;
Expand All @@ -82,6 +83,7 @@ public void clear() {
this.lowerBounds = null;
this.upperBounds = null;
this.sortOrderId = null;
this.schemaId = null;
}

public Builder copy(DeleteFile toCopy) {
Expand All @@ -104,6 +106,7 @@ public Builder copy(DeleteFile toCopy) {
this.keyMetadata =
toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata());
this.sortOrderId = toCopy.sortOrderId();
this.schemaId = toCopy.schemaId();
return this;
}

Expand Down Expand Up @@ -220,6 +223,11 @@ public Builder withSortOrder(SortOrder newSortOrder) {
return this;
}

public Builder withSchemaId(Integer newSchemaId) {
this.schemaId = newSchemaId;
return this;
}

public DeleteFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
Expand All @@ -245,6 +253,7 @@ public DeleteFile build() {
}

return new GenericDeleteFile(
schemaId,
specId,
content,
filePath,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
}

GenericDataFile(
Integer schemaId,
int specId,
String filePath,
FileFormat format,
Expand All @@ -43,6 +44,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
int[] equalityFieldIds,
Integer sortOrderId) {
super(
schemaId,
specId,
FileContent.DATA,
filePath,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
}

GenericDeleteFile(
Integer schemaId,
int specId,
FileContent content,
String filePath,
Expand All @@ -44,6 +45,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
List<Long> splitOffsets,
ByteBuffer keyMetadata) {
super(
schemaId,
specId,
content,
filePath,
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) {
DataFile.UPPER_BOUNDS,
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.SCHEMA_ID);
}

/** Wrapper used to write a ManifestEntry to v1 metadata. */
Expand Down Expand Up @@ -371,6 +372,8 @@ public Object get(int pos) {
return wrapped.splitOffsets();
case 14:
return wrapped.sortOrderId();
case 15:
return wrapped.schemaId();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -480,6 +483,11 @@ public Long fileSequenceNumber() {
return wrapped.fileSequenceNumber();
}

@Override
public Integer schemaId() {
return wrapped.schemaId();
}

@Override
public DataFile copy() {
return wrapped.copy();
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.SCHEMA_ID);
}

static class IndexedManifestEntry<F extends ContentFile<F>>
Expand Down Expand Up @@ -446,6 +447,8 @@ public Object get(int pos) {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
case 16:
return wrapped.schemaId();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -555,6 +558,11 @@ public Long fileSequenceNumber() {
return wrapped.fileSequenceNumber();
}

@Override
public Integer schemaId() {
return wrapped.schemaId();
}

@Override
public F copy() {
throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult>
private final ByteBuffer keyMetadata;
private final int[] equalityFieldIds;
private final SortOrder sortOrder;
private final Integer schemaId;
private DeleteFile deleteFile = null;

public EqualityDeleteWriter(
Expand All @@ -52,13 +53,36 @@ public EqualityDeleteWriter(
EncryptionKeyMetadata keyMetadata,
SortOrder sortOrder,
int... equalityFieldIds) {
this(
appender,
format,
location,
spec,
partition,
keyMetadata,
sortOrder,
null,
equalityFieldIds);
}

public EqualityDeleteWriter(
FileAppender<T> appender,
FileFormat format,
String location,
PartitionSpec spec,
StructLike partition,
EncryptionKeyMetadata keyMetadata,
SortOrder sortOrder,
Integer schemaId,
int... equalityFieldIds) {
this.appender = appender;
this.format = format;
this.location = location;
this.spec = spec;
this.partition = partition;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
this.sortOrder = sortOrder;
this.schemaId = schemaId;
this.equalityFieldIds = equalityFieldIds;
}

Expand Down Expand Up @@ -87,6 +111,7 @@ public void close() throws IOException {
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.withSortOrder(sortOrder)
.withSchemaId(schemaId)
.build();
}
}
Expand Down
Loading