Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 113 additions & 12 deletions core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,97 @@ class ContentFileParser {

private ContentFileParser() {}

static void unboundContentFileToJson(
ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException {
Preconditions.checkArgument(contentFile != null, "Invalid content file: null");
Preconditions.checkArgument(spec != null, "Invalid partition spec: null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally try to avoid exposing JsonGenerator or other Jackson classes in public methods. Can this be package-private? See SnapshotParser for an example of how other parsers handle visibility.

Preconditions.checkArgument(
contentFile.specId() == spec.specId(),
"Invalid partition spec id from content file: expected = %s, actual = %s",
spec.specId(),
contentFile.specId());

generator.writeStartObject();
// ignore the ordinal position (ContentFile#pos) of the file in a manifest,
// as it isn't used and BaseFile constructor doesn't support it.
Comment on lines +63 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need this comment, we can remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar I actually am not the original author of that comment.
Screenshot 2024-11-27 at 11 15 47 AM

This was the pr that added it: b8db3f0. Do you still want me to remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is okay since it is pre-existing. It's also good context since the position isn't in the spec and is used to help streaming readers keep incremental state.


generator.writeNumberField(SPEC_ID, contentFile.specId());
generator.writeStringField(CONTENT, contentFile.content().name());
generator.writeStringField(FILE_PATH, contentFile.path().toString());
generator.writeStringField(FILE_FORMAT, contentFile.format().name());

if (contentFile.partition() != null) {
generator.writeFieldName(PARTITION);
SingleValueParser.toJson(spec.partitionType(), contentFile.partition(), generator);
}

generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes());

metricsToJson(contentFile, generator);

if (contentFile.keyMetadata() != null) {
generator.writeFieldName(KEY_METADATA);
SingleValueParser.toJson(DataFile.KEY_METADATA.type(), contentFile.keyMetadata(), generator);
}

if (contentFile.splitOffsets() != null) {
JsonUtil.writeLongArray(SPLIT_OFFSETS, contentFile.splitOffsets(), generator);
}

if (contentFile.equalityFieldIds() != null) {
JsonUtil.writeIntegerArray(EQUALITY_IDS, contentFile.equalityFieldIds(), generator);
}

if (contentFile.sortOrderId() != null) {
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
}

generator.writeEndObject();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this method basically duplicates toJson below. What is the difference? Can we share the code?

}

static ContentFile<?> unboundContentFileFromJson(JsonNode jsonNode) {
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for content file: null");

int specId = JsonUtil.getInt(SPEC_ID, jsonNode);
FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode));
String filePath = JsonUtil.getString(FILE_PATH, jsonNode);
FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode));

long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode);
Metrics metrics = metricsFromJson(jsonNode);
ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match the write side, which uses SingleValueParser. Can you also use SingleValueParser here? Or use JsonUtil above?

List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode);
int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode);
Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);

if (fileContent == FileContent.DATA) {
return new UnboundGenericDataFile(
specId,
filePath,
fileFormat,
jsonNode.get(PARTITION),
fileSizeInBytes,
metrics,
keyMetadata,
splitOffsets,
sortOrderId);
} else {
return new UnboundGenericDeleteFile(
specId,
fileContent,
filePath,
fileFormat,
jsonNode.get(PARTITION),
fileSizeInBytes,
metrics,
equalityFieldIds,
sortOrderId,
splitOffsets,
keyMetadata);
}
}

private static boolean hasPartitionData(StructLike partitionData) {
return partitionData != null && partitionData.size() > 0;
}
Expand Down Expand Up @@ -125,18 +216,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {

PartitionData partitionData = null;
if (jsonNode.has(PARTITION)) {
partitionData = new PartitionData(spec.partitionType());
StructLike structLike =
(StructLike) SingleValueParser.fromJson(spec.partitionType(), jsonNode.get(PARTITION));
Preconditions.checkState(
partitionData.size() == structLike.size(),
"Invalid partition data size: expected = %s, actual = %s",
partitionData.size(),
structLike.size());
for (int pos = 0; pos < partitionData.size(); ++pos) {
Class<?> javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass();
partitionData.set(pos, structLike.get(pos, javaClass));
}
partitionData = partitionDataFromRawValue(jsonNode.get(PARTITION), spec);
}

long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode);
Expand Down Expand Up @@ -173,6 +253,27 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
}
}

static PartitionData partitionDataFromRawValue(JsonNode rawPartitionValue, PartitionSpec spec) {
if (rawPartitionValue == null) {
return null;
}

PartitionData partitionData = new PartitionData(spec.partitionType());
StructLike structLike =
(StructLike) SingleValueParser.fromJson(spec.partitionType(), rawPartitionValue);
Preconditions.checkState(
partitionData.size() == structLike.size(),
"Invalid partition data size: expected = %s, actual = %s",
partitionData.size(),
structLike.size());
for (int pos = 0; pos < partitionData.size(); ++pos) {
Class<?> javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass();
partitionData.set(pos, structLike.get(pos, javaClass));
}

return partitionData;
}

private static void metricsToJson(ContentFile<?> contentFile, JsonGenerator generator)
throws IOException {
generator.writeNumberField(RECORD_COUNT, contentFile.recordCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {

DataFile dataFile = null;
if (jsonNode.has(DATA_FILE)) {
dataFile = (DataFile) ContentFileParser.fromJson(jsonNode.get(DATA_FILE), spec);
dataFile = (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, jsonNode), spec);
}

long start = JsonUtil.getLong(START, jsonNode);
Expand Down
89 changes: 89 additions & 0 deletions core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.JsonUtil;

class RESTFileScanTaskParser {
private static final String DATA_FILE = "data-file";
private static final String DELETE_FILE_REFERENCES = "delete-file-references";
private static final String RESIDUAL = "residual-filter";

private RESTFileScanTaskParser() {}

static void toJson(
FileScanTask fileScanTask,
Set<Integer> deleteFileReferences,
PartitionSpec partitionSpec,
JsonGenerator generator)
throws IOException {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");

generator.writeStartObject();
generator.writeFieldName(DATA_FILE);
ContentFileParser.unboundContentFileToJson(fileScanTask.file(), partitionSpec, generator);
if (deleteFileReferences != null) {
JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator);
}

if (fileScanTask.residual() != null) {
generator.writeFieldName(RESIDUAL);
ExpressionParser.toJson(fileScanTask.residual(), generator);
}
generator.writeEndObject();
}

static FileScanTask fromJson(JsonNode jsonNode, List<DeleteFile> allDeleteFiles) {
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null");
Preconditions.checkArgument(
jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode);

UnboundGenericDataFile dataFile =
(UnboundGenericDataFile)
ContentFileParser.unboundContentFileFromJson(JsonUtil.get(DATA_FILE, jsonNode));

UnboundGenericDeleteFile[] deleteFiles = null;
Set<Integer> deleteFileReferences = Sets.newHashSet();
if (jsonNode.has(DELETE_FILE_REFERENCES)) {
deleteFileReferences.addAll(JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, jsonNode));
ImmutableList.Builder<UnboundGenericDeleteFile> builder = ImmutableList.builder();
deleteFileReferences.forEach(
delIdx -> builder.add((UnboundGenericDeleteFile) allDeleteFiles.get(delIdx)));
deleteFiles = builder.build().toArray(new UnboundGenericDeleteFile[0]);
}

Expression filter = null;
if (jsonNode.has(RESIDUAL)) {
filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL));
}

return new UnboundBaseFileScanTask(dataFile, deleteFiles, filter);
}
}
114 changes: 114 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableScanResponseParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.JsonUtil;

public class TableScanResponseParser {

private TableScanResponseParser() {}

static final String FILE_SCAN_TASKS = "file-scan-tasks";
static final String DELETE_FILES = "delete-files";

public static List<DeleteFile> parseDeleteFiles(JsonNode node) {
if (node.has(DELETE_FILES)) {
JsonNode deleteFiles = JsonUtil.get(DELETE_FILES, node);
Preconditions.checkArgument(
deleteFiles.isArray(), "Cannot parse delete files from non-array: %s", deleteFiles);
ImmutableList.Builder<DeleteFile> deleteFilesBuilder = ImmutableList.builder();
for (JsonNode deleteFileNode : deleteFiles) {
DeleteFile deleteFile =
(DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode);
deleteFilesBuilder.add(deleteFile);
}
return deleteFilesBuilder.build();
}

return null;
}

public static List<FileScanTask> parseFileScanTasks(JsonNode node, List<DeleteFile> deleteFiles) {
if (node.has(FILE_SCAN_TASKS)) {
JsonNode scanTasks = JsonUtil.get(FILE_SCAN_TASKS, node);
Preconditions.checkArgument(
scanTasks.isArray(), "Cannot parse file scan tasks from non-array: %s", scanTasks);
ImmutableList.Builder<FileScanTask> fileScanTaskBuilder = ImmutableList.builder();
for (JsonNode fileScanTaskNode : scanTasks) {
FileScanTask fileScanTask = RESTFileScanTaskParser.fromJson(fileScanTaskNode, deleteFiles);
fileScanTaskBuilder.add(fileScanTask);
}

return fileScanTaskBuilder.build();
}

return null;
}

public static void serializeScanTasks(
List<FileScanTask> fileScanTasks,
List<DeleteFile> deleteFiles,
Map<Integer, PartitionSpec> specsById,
JsonGenerator gen)
throws IOException {
Map<String, Integer> deleteFilePathToIndex = Maps.newHashMap();
if (deleteFiles != null) {
Preconditions.checkArgument(
specsById != null, "Cannot serialize response without specs by ID defined");
gen.writeArrayFieldStart(DELETE_FILES);
for (int i = 0; i < deleteFiles.size(); i++) {
DeleteFile deleteFile = deleteFiles.get(i);
deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i);
ContentFileParser.unboundContentFileToJson(
deleteFiles.get(i), specsById.get(deleteFile.specId()), gen);
}
gen.writeEndArray();
}

if (fileScanTasks != null) {
gen.writeArrayFieldStart(FILE_SCAN_TASKS);
Set<Integer> deleteFileReferences = Sets.newHashSet();
for (FileScanTask fileScanTask : fileScanTasks) {
if (deleteFiles != null) {
for (DeleteFile taskDelete : fileScanTask.deletes()) {
deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString()));
}
}

PartitionSpec spec = specsById.get(fileScanTask.file().specId());
Preconditions.checkArgument(
spec != null,
"Cannot serialize scan task with unknown spec %s",
fileScanTask.file().specId());
RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, spec, gen);
}
gen.writeEndArray();
}
}
}
Loading