diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index dd08c5c69e7d..1ebdbb463d69 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -48,6 +48,97 @@ class ContentFileParser { private ContentFileParser() {} + static void unboundContentFileToJson( + ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException { + Preconditions.checkArgument(contentFile != null, "Invalid content file: null"); + Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + Preconditions.checkArgument( + contentFile.specId() == spec.specId(), + "Invalid partition spec id from content file: expected = %s, actual = %s", + spec.specId(), + contentFile.specId()); + + generator.writeStartObject(); + // ignore the ordinal position (ContentFile#pos) of the file in a manifest, + // as it isn't used and BaseFile constructor doesn't support it. + + generator.writeNumberField(SPEC_ID, contentFile.specId()); + generator.writeStringField(CONTENT, contentFile.content().name()); + generator.writeStringField(FILE_PATH, contentFile.path().toString()); + generator.writeStringField(FILE_FORMAT, contentFile.format().name()); + + if (contentFile.partition() != null) { + generator.writeFieldName(PARTITION); + SingleValueParser.toJson(spec.partitionType(), contentFile.partition(), generator); + } + + generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes()); + + metricsToJson(contentFile, generator); + + if (contentFile.keyMetadata() != null) { + generator.writeFieldName(KEY_METADATA); + SingleValueParser.toJson(DataFile.KEY_METADATA.type(), contentFile.keyMetadata(), generator); + } + + if (contentFile.splitOffsets() != null) { + JsonUtil.writeLongArray(SPLIT_OFFSETS, contentFile.splitOffsets(), generator); + } + + if (contentFile.equalityFieldIds() != null) { + JsonUtil.writeIntegerArray(EQUALITY_IDS, contentFile.equalityFieldIds(), generator); + } + + if (contentFile.sortOrderId() != null) { + generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId()); + } + + generator.writeEndObject(); + } + + static ContentFile unboundContentFileFromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for content file: null"); + + int specId = JsonUtil.getInt(SPEC_ID, jsonNode); + FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode)); + String filePath = JsonUtil.getString(FILE_PATH, jsonNode); + FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode)); + + long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); + Metrics metrics = metricsFromJson(jsonNode); + ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode); + List splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode); + int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode); + Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode); + + if (fileContent == FileContent.DATA) { + return new UnboundGenericDataFile( + specId, + filePath, + fileFormat, + jsonNode.get(PARTITION), + fileSizeInBytes, + metrics, + keyMetadata, + splitOffsets, + sortOrderId); + } else { + return new UnboundGenericDeleteFile( + specId, + fileContent, + filePath, + fileFormat, + jsonNode.get(PARTITION), + fileSizeInBytes, + metrics, + equalityFieldIds, + sortOrderId, + splitOffsets, + keyMetadata); + } + } + private static boolean hasPartitionData(StructLike partitionData) { return partitionData != null && partitionData.size() > 0; } @@ -125,18 +216,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { PartitionData partitionData = null; if (jsonNode.has(PARTITION)) { - partitionData = new PartitionData(spec.partitionType()); - StructLike structLike = - (StructLike) SingleValueParser.fromJson(spec.partitionType(), jsonNode.get(PARTITION)); - Preconditions.checkState( - partitionData.size() == structLike.size(), - "Invalid partition data size: expected = %s, actual = %s", - partitionData.size(), - structLike.size()); - for (int pos = 0; pos < partitionData.size(); ++pos) { - Class javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass(); - partitionData.set(pos, structLike.get(pos, javaClass)); - } + partitionData = partitionDataFromRawValue(jsonNode.get(PARTITION), spec); } long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); @@ -173,6 +253,27 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { } } + static PartitionData partitionDataFromRawValue(JsonNode rawPartitionValue, PartitionSpec spec) { + if (rawPartitionValue == null) { + return null; + } + + PartitionData partitionData = new PartitionData(spec.partitionType()); + StructLike structLike = + (StructLike) SingleValueParser.fromJson(spec.partitionType(), rawPartitionValue); + Preconditions.checkState( + partitionData.size() == structLike.size(), + "Invalid partition data size: expected = %s, actual = %s", + partitionData.size(), + structLike.size()); + for (int pos = 0; pos < partitionData.size(); ++pos) { + Class javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass(); + partitionData.set(pos, structLike.get(pos, javaClass)); + } + + return partitionData; + } + private static void metricsToJson(ContentFile contentFile, JsonGenerator generator) throws IOException { generator.writeNumberField(RECORD_COUNT, contentFile.recordCount()); diff --git a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java index 7ae7dc74a72e..f06d6f60ed02 100644 --- a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java +++ b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java @@ -86,7 +86,7 @@ static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) { DataFile dataFile = null; if (jsonNode.has(DATA_FILE)) { - dataFile = (DataFile) ContentFileParser.fromJson(jsonNode.get(DATA_FILE), spec); + dataFile = (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, jsonNode), spec); } long start = JsonUtil.getLong(START, jsonNode); diff --git a/core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java b/core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java new file mode 100644 index 000000000000..d7fba3d56c55 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.JsonUtil; + +class RESTFileScanTaskParser { + private static final String DATA_FILE = "data-file"; + private static final String DELETE_FILE_REFERENCES = "delete-file-references"; + private static final String RESIDUAL = "residual-filter"; + + private RESTFileScanTaskParser() {} + + static void toJson( + FileScanTask fileScanTask, + Set deleteFileReferences, + PartitionSpec partitionSpec, + JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + generator.writeFieldName(DATA_FILE); + ContentFileParser.unboundContentFileToJson(fileScanTask.file(), partitionSpec, generator); + if (deleteFileReferences != null) { + JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator); + } + + if (fileScanTask.residual() != null) { + generator.writeFieldName(RESIDUAL); + ExpressionParser.toJson(fileScanTask.residual(), generator); + } + generator.writeEndObject(); + } + + static FileScanTask fromJson(JsonNode jsonNode, List allDeleteFiles) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode); + + UnboundGenericDataFile dataFile = + (UnboundGenericDataFile) + ContentFileParser.unboundContentFileFromJson(JsonUtil.get(DATA_FILE, jsonNode)); + + UnboundGenericDeleteFile[] deleteFiles = null; + Set deleteFileReferences = Sets.newHashSet(); + if (jsonNode.has(DELETE_FILE_REFERENCES)) { + deleteFileReferences.addAll(JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, jsonNode)); + ImmutableList.Builder builder = ImmutableList.builder(); + deleteFileReferences.forEach( + delIdx -> builder.add((UnboundGenericDeleteFile) allDeleteFiles.get(delIdx))); + deleteFiles = builder.build().toArray(new UnboundGenericDeleteFile[0]); + } + + Expression filter = null; + if (jsonNode.has(RESIDUAL)) { + filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL)); + } + + return new UnboundBaseFileScanTask(dataFile, deleteFiles, filter); + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/TableScanResponseParser.java new file mode 100644 index 000000000000..a8dd7a897e58 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/TableScanResponseParser.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.JsonUtil; + +public class TableScanResponseParser { + + private TableScanResponseParser() {} + + static final String FILE_SCAN_TASKS = "file-scan-tasks"; + static final String DELETE_FILES = "delete-files"; + + public static List parseDeleteFiles(JsonNode node) { + if (node.has(DELETE_FILES)) { + JsonNode deleteFiles = JsonUtil.get(DELETE_FILES, node); + Preconditions.checkArgument( + deleteFiles.isArray(), "Cannot parse delete files from non-array: %s", deleteFiles); + ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); + for (JsonNode deleteFileNode : deleteFiles) { + DeleteFile deleteFile = + (DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode); + deleteFilesBuilder.add(deleteFile); + } + return deleteFilesBuilder.build(); + } + + return null; + } + + public static List parseFileScanTasks(JsonNode node, List deleteFiles) { + if (node.has(FILE_SCAN_TASKS)) { + JsonNode scanTasks = JsonUtil.get(FILE_SCAN_TASKS, node); + Preconditions.checkArgument( + scanTasks.isArray(), "Cannot parse file scan tasks from non-array: %s", scanTasks); + ImmutableList.Builder fileScanTaskBuilder = ImmutableList.builder(); + for (JsonNode fileScanTaskNode : scanTasks) { + FileScanTask fileScanTask = RESTFileScanTaskParser.fromJson(fileScanTaskNode, deleteFiles); + fileScanTaskBuilder.add(fileScanTask); + } + + return fileScanTaskBuilder.build(); + } + + return null; + } + + public static void serializeScanTasks( + List fileScanTasks, + List deleteFiles, + Map specsById, + JsonGenerator gen) + throws IOException { + Map deleteFilePathToIndex = Maps.newHashMap(); + if (deleteFiles != null) { + Preconditions.checkArgument( + specsById != null, "Cannot serialize response without specs by ID defined"); + gen.writeArrayFieldStart(DELETE_FILES); + for (int i = 0; i < deleteFiles.size(); i++) { + DeleteFile deleteFile = deleteFiles.get(i); + deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); + ContentFileParser.unboundContentFileToJson( + deleteFiles.get(i), specsById.get(deleteFile.specId()), gen); + } + gen.writeEndArray(); + } + + if (fileScanTasks != null) { + gen.writeArrayFieldStart(FILE_SCAN_TASKS); + Set deleteFileReferences = Sets.newHashSet(); + for (FileScanTask fileScanTask : fileScanTasks) { + if (deleteFiles != null) { + for (DeleteFile taskDelete : fileScanTask.deletes()) { + deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); + } + } + + PartitionSpec spec = specsById.get(fileScanTask.file().specId()); + Preconditions.checkArgument( + spec != null, + "Cannot serialize scan task with unknown spec %s", + fileScanTask.file().specId()); + RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, spec, gen); + } + gen.writeEndArray(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java new file mode 100644 index 000000000000..9905e5be4c21 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ResidualEvaluator; + +class UnboundBaseFileScanTask extends BaseFileScanTask { + private UnboundGenericDataFile unboundDataFile; + private UnboundGenericDeleteFile[] unboundDeleteFiles; + private Expression filter; + + UnboundBaseFileScanTask( + UnboundGenericDataFile unboundDataFile, + UnboundGenericDeleteFile[] unboundDeleteFiles, + Expression filter) { + super(unboundDataFile, unboundDeleteFiles, null, null, ResidualEvaluator.unpartitioned(filter)); + this.unboundDataFile = unboundDataFile; + this.unboundDeleteFiles = unboundDeleteFiles; + this.filter = filter; + } + + @Override + public Schema schema() { + throw new UnsupportedOperationException("schema() is not supported in UnboundBaseFileScanTask"); + } + + @Override + public PartitionSpec spec() { + throw new UnsupportedOperationException("spec() is not supported in UnboundBaseFileScanTask"); + } + + public FileScanTask bind(PartitionSpec spec, boolean caseSensitive) { + GenericDataFile boundDataFile = unboundDataFile.bindToSpec(spec); + DeleteFile[] boundDeleteFiles = new DeleteFile[unboundDeleteFiles.length]; + for (int i = 0; i < unboundDeleteFiles.length; i++) { + boundDeleteFiles[i] = unboundDeleteFiles[i].bindToSpec(spec); + } + + String schemaString = SchemaParser.toJson(spec.schema()); + String specString = PartitionSpecParser.toJson(spec); + ResidualEvaluator boundResidual = ResidualEvaluator.of(spec, filter, caseSensitive); + + return new BaseFileScanTask( + boundDataFile, boundDeleteFiles, schemaString, specString, boundResidual); + } +} diff --git a/core/src/main/java/org/apache/iceberg/UnboundGenericDataFile.java b/core/src/main/java/org/apache/iceberg/UnboundGenericDataFile.java new file mode 100644 index 000000000000..fa3b59fe0f9e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UnboundGenericDataFile.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An UnboundGenericDataFile is a GenericDataFile which keeps track of the raw partition value + * represented as JSON + */ +class UnboundGenericDataFile extends GenericDataFile { + private final JsonNode rawPartitionValue; + + UnboundGenericDataFile( + int specId, + String filePath, + FileFormat format, + JsonNode rawPartitionValue, + long fileSizeInBytes, + Metrics metrics, + ByteBuffer keyMetadata, + List splitOffsets, + Integer sortOrderId) { + super( + specId, + filePath, + format, + null, + fileSizeInBytes, + metrics, + keyMetadata, + splitOffsets, + sortOrderId); + this.rawPartitionValue = rawPartitionValue; + } + + GenericDataFile bindToSpec(PartitionSpec spec) { + return new GenericDataFile( + specId(), + path().toString(), + format(), + ContentFileParser.partitionDataFromRawValue(rawPartitionValue, spec), + fileSizeInBytes(), + new Metrics( + recordCount(), + columnSizes(), + valueCounts(), + nullValueCounts(), + nanValueCounts(), + lowerBounds(), + upperBounds()), + keyMetadata(), + splitOffsets(), + sortOrderId()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/UnboundGenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/UnboundGenericDeleteFile.java new file mode 100644 index 000000000000..bff9c576f589 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UnboundGenericDeleteFile.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An UnboundGenericDeleteFile is a GenericDeleteFile which keeps track of the raw partition value + * represented as JSON + */ +class UnboundGenericDeleteFile extends GenericDeleteFile { + private JsonNode rawPartitionValue; + + UnboundGenericDeleteFile( + int specId, + FileContent content, + String filePath, + FileFormat format, + JsonNode rawPartitionValue, + long fileSizeInBytes, + Metrics metrics, + int[] equalityFieldIds, + Integer sortOrderId, + List splitOffsets, + ByteBuffer keyMetadata) { + super( + specId, + content, + filePath, + format, + null, + fileSizeInBytes, + metrics, + equalityFieldIds, + sortOrderId, + splitOffsets, + keyMetadata); + this.rawPartitionValue = rawPartitionValue; + } + + GenericDeleteFile bindToSpec(PartitionSpec spec) { + return new GenericDeleteFile( + specId(), + content(), + path().toString(), + format(), + ContentFileParser.partitionDataFromRawValue(rawPartitionValue, spec), + fileSizeInBytes(), + new Metrics( + recordCount(), + columnSizes(), + valueCounts(), + nullValueCounts(), + nanValueCounts(), + lowerBounds(), + upperBounds()), + equalityFieldIds().stream().mapToInt(Integer::intValue).toArray(), + sortOrderId(), + splitOffsets(), + keyMetadata()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java b/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java new file mode 100644 index 000000000000..5603d51e9aa2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.Locale; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public enum PlanStatus { + COMPLETED("completed"), + SUBMITTED("submitted"), + CANCELLED("cancelled"), + FAILED("failed"); + + private final String status; + + PlanStatus(String status) { + this.status = status; + } + + public String status() { + return status; + } + + public static PlanStatus fromName(String status) { + Preconditions.checkArgument(status != null, "Status is null"); + try { + return PlanStatus.valueOf(status.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format("Invalid status name: %s", status), e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 7f39d0bc1f5e..02bd59657570 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -46,9 +46,13 @@ import org.apache.iceberg.rest.requests.CommitTransactionRequestParser; import org.apache.iceberg.rest.requests.CreateViewRequest; import org.apache.iceberg.rest.requests.CreateViewRequestParser; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.requests.FetchScanTasksRequestParser; import org.apache.iceberg.rest.requests.ImmutableCreateViewRequest; import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequestParser; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RegisterTableRequestParser; import org.apache.iceberg.rest.requests.ReportMetricsRequest; @@ -59,12 +63,18 @@ import org.apache.iceberg.rest.responses.ConfigResponseParser; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.ErrorResponseParser; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponseParser; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.rest.responses.FetchScanTasksResponseParser; import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadTableResponseParser; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.LoadViewResponseParser; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponseParser; import org.apache.iceberg.util.JsonUtil; public class RESTSerializers { @@ -119,8 +129,19 @@ public static void registerAll(ObjectMapper mapper) { .addSerializer(ConfigResponse.class, new ConfigResponseSerializer<>()) .addDeserializer(ConfigResponse.class, new ConfigResponseDeserializer<>()) .addSerializer(LoadTableResponse.class, new LoadTableResponseSerializer<>()) - .addDeserializer(LoadTableResponse.class, new LoadTableResponseDeserializer<>()); - + .addDeserializer(LoadTableResponse.class, new LoadTableResponseDeserializer<>()) + .addSerializer(PlanTableScanRequest.class, new PlanTableScanRequestSerializer<>()) + .addDeserializer(PlanTableScanRequest.class, new PlanTableScanRequestDeserializer<>()) + .addSerializer(FetchScanTasksRequest.class, new FetchScanTasksRequestSerializer<>()) + .addDeserializer(FetchScanTasksRequest.class, new FetchScanTasksRequestDeserializer<>()) + .addSerializer(PlanTableScanResponse.class, new PlanTableScanResponseSerializer<>()) + .addDeserializer(PlanTableScanResponse.class, new PlanTableScanResponseDeserializer<>()) + .addSerializer( + FetchPlanningResultResponse.class, new FetchPlanningResultResponseSerializer<>()) + .addDeserializer( + FetchPlanningResultResponse.class, new FetchPlanningResultResponseDeserializer<>()) + .addSerializer(FetchScanTasksResponse.class, new FetchScanTaskResponseSerializer<>()) + .addDeserializer(FetchScanTasksResponse.class, new FetchScanTaskResponseDeserializer<>()); mapper.registerModule(module); } @@ -443,4 +464,94 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) LoadTableResponseParser.fromJson(jsonNode); } } + + static class PlanTableScanRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanRequestParser.toJson(request, gen); + } + } + + static class PlanTableScanRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) PlanTableScanRequestParser.fromJson(jsonNode); + } + } + + static class FetchScanTasksRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksRequestParser.toJson(request, gen); + } + } + + static class FetchScanTasksRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchScanTasksRequestParser.fromJson(jsonNode); + } + } + + static class PlanTableScanResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanResponseParser.toJson(response, gen); + } + } + + static class PlanTableScanResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) PlanTableScanResponseParser.fromJson(jsonNode); + } + } + + static class FetchPlanningResultResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchPlanningResultResponseParser.toJson(response, gen); + } + } + + static class FetchPlanningResultResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchPlanningResultResponseParser.fromJson(jsonNode); + } + } + + static class FetchScanTaskResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksResponseParser.toJson(response, gen); + } + } + + static class FetchScanTaskResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchScanTasksResponseParser.fromJson(jsonNode); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java new file mode 100644 index 000000000000..622edc356864 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; + +@SuppressWarnings("checkstyle:VisibilityModifier") +public class FetchScanTasksRequest implements RESTRequest { + + private final String planTask; + + private FetchScanTasksRequest(String planTask) { + this.planTask = planTask; + validate(); + } + + public String planTask() { + return planTask; + } + + @Override + public void validate() { + Preconditions.checkArgument(planTask != null, "Invalid plan task: null"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("planTask", planTask).toString(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Builder() {} + + String planTask; + + public Builder withPlanTask(String task) { + this.planTask = task; + return this; + } + + public FetchScanTasksRequest build() { + return new FetchScanTasksRequest(planTask); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java new file mode 100644 index 000000000000..32f79623a9e0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksRequestParser { + private static final String PLAN_TASK = "plan-task"; + + private FetchScanTasksRequestParser() {} + + public static String toJson(FetchScanTasksRequest request) { + return toJson(request, false); + } + + public static String toJson(FetchScanTasksRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(FetchScanTasksRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid fetch scan tasks request: null"); + gen.writeStartObject(); + gen.writeStringField(PLAN_TASK, request.planTask()); + gen.writeEndObject(); + } + + public static FetchScanTasksRequest fromJson(String json) { + return JsonUtil.parse(json, FetchScanTasksRequestParser::fromJson); + } + + public static FetchScanTasksRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Invalid fetch scan tasks request: null"); + + String planTask = JsonUtil.getString(PLAN_TASK, json); + return FetchScanTasksRequest.builder().withPlanTask(planTask).build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java new file mode 100644 index 000000000000..720e5c74e67a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; + +public class PlanTableScanRequest implements RESTRequest { + private final Long snapshotId; + private final List select; + private final Expression filter; + private final boolean caseSensitive; + private final boolean useSnapshotSchema; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final List statsFields; + + public Long snapshotId() { + return snapshotId; + } + + public List select() { + return select; + } + + public Expression filter() { + return filter; + } + + public boolean caseSensitive() { + return caseSensitive; + } + + public boolean useSnapshotSchema() { + return useSnapshotSchema; + } + + public Long startSnapshotId() { + return startSnapshotId; + } + + public Long endSnapshotId() { + return endSnapshotId; + } + + public List statsFields() { + return statsFields; + } + + private PlanTableScanRequest( + Long snapshotId, + List select, + Expression filter, + boolean caseSensitive, + boolean useSnapshotSchema, + Long startSnapshotId, + Long endSnapshotId, + List statsFields) { + this.snapshotId = snapshotId; + this.select = select; + this.filter = filter; + this.caseSensitive = caseSensitive; + this.useSnapshotSchema = useSnapshotSchema; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.statsFields = statsFields; + validate(); + } + + @Override + public void validate() { + if (snapshotId != null || startSnapshotId != null || endSnapshotId != null) { + Preconditions.checkArgument( + snapshotId != null ^ (startSnapshotId != null && endSnapshotId != null), + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotId", snapshotId) + .add("caseSensitive", caseSensitive) + .add("useSnapshotSchema", useSnapshotSchema) + .add("startSnapshotId", startSnapshotId) + .add("endSnapshotId", endSnapshotId) + .toString(); + } + + public static class Builder { + private Long snapshotId; + private List select; + private Expression filter; + private boolean caseSensitive = true; + private boolean useSnapshotSchema = false; + private Long startSnapshotId; + private Long endSnapshotId; + private List statsFields; + + public Builder() {} + + public Builder withSnapshotId(Long withSnapshotId) { + this.snapshotId = withSnapshotId; + return this; + } + + public Builder withSelect(List projection) { + this.select = projection; + return this; + } + + public Builder withFilter(Expression expression) { + this.filter = expression; + return this; + } + + public Builder withCaseSensitive(boolean value) { + this.caseSensitive = value; + return this; + } + + public Builder withUseSnapshotSchema(boolean snapshotSchema) { + this.useSnapshotSchema = snapshotSchema; + return this; + } + + public Builder withStartSnapshotId(Long startingSnapshotId) { + this.startSnapshotId = startingSnapshotId; + return this; + } + + public Builder withEndSnapshotId(Long endingSnapshotId) { + this.endSnapshotId = endingSnapshotId; + return this; + } + + public Builder withStatsFields(List fields) { + this.statsFields = fields; + return this; + } + + public PlanTableScanRequest build() { + return new PlanTableScanRequest( + snapshotId, + select, + filter, + caseSensitive, + useSnapshotSchema, + startSnapshotId, + endSnapshotId, + statsFields); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java new file mode 100644 index 000000000000..9bbfaa85e7d4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanRequestParser { + private static final String SNAPSHOT_ID = "snapshot-id"; + private static final String SELECT = "select"; + private static final String FILTER = "filter"; + private static final String CASE_SENSITIVE = "case-sensitive"; + private static final String USE_SNAPSHOT_SCHEMA = "use-snapshot-schema"; + private static final String START_SNAPSHOT_ID = "start-snapshot-id"; + private static final String END_SNAPSHOT_ID = "end-snapshot-id"; + private static final String STATS_FIELDS = "stats-fields"; + + private PlanTableScanRequestParser() {} + + public static String toJson(PlanTableScanRequest request) { + return toJson(request, false); + } + + public static String toJson(PlanTableScanRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static void toJson(PlanTableScanRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid plan table scan request: null"); + + gen.writeStartObject(); + serializeSnapshotIdForScan(gen, request); + + if (request.select() != null && !request.select().isEmpty()) { + JsonUtil.writeStringArray(SELECT, request.select(), gen); + } + + if (request.filter() != null) { + gen.writeStringField(FILTER, ExpressionParser.toJson(request.filter())); + } + + gen.writeBooleanField(CASE_SENSITIVE, request.caseSensitive()); + gen.writeBooleanField(USE_SNAPSHOT_SCHEMA, request.useSnapshotSchema()); + + if (request.statsFields() != null && !request.statsFields().isEmpty()) { + JsonUtil.writeStringArray(STATS_FIELDS, request.statsFields(), gen); + } + + gen.writeEndObject(); + } + + private static void serializeSnapshotIdForScan(JsonGenerator gen, PlanTableScanRequest request) + throws IOException { + if (request.snapshotId() != null) { + gen.writeNumberField(SNAPSHOT_ID, request.snapshotId()); + } else if (request.startSnapshotId() != null) { + gen.writeNumberField(START_SNAPSHOT_ID, request.startSnapshotId()); + gen.writeNumberField(END_SNAPSHOT_ID, request.endSnapshotId()); + } + } + + public static PlanTableScanRequest fromJson(String json) { + return JsonUtil.parse(json, PlanTableScanRequestParser::fromJson); + } + + public static PlanTableScanRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Invalid plan table scan request: null"); + + Long snapshotId = JsonUtil.getLongOrNull(SNAPSHOT_ID, json); + Long startSnapshotId = JsonUtil.getLongOrNull(START_SNAPSHOT_ID, json); + Long endSnapshotId = JsonUtil.getLongOrNull(END_SNAPSHOT_ID, json); + + List select = JsonUtil.getStringListOrNull(SELECT, json); + + Expression filter = null; + if (json.has(FILTER)) { + filter = ExpressionParser.fromJson(JsonUtil.getString(FILTER, json)); + } + + boolean caseSensitive = true; + if (json.has(CASE_SENSITIVE)) { + caseSensitive = JsonUtil.getBool(CASE_SENSITIVE, json); + } + + boolean useSnapshotSchema = false; + if (json.has(USE_SNAPSHOT_SCHEMA)) { + useSnapshotSchema = JsonUtil.getBool(USE_SNAPSHOT_SCHEMA, json); + } + + List statsFields = JsonUtil.getStringListOrNull(STATS_FIELDS, json); + + return new PlanTableScanRequest.Builder() + .withSnapshotId(snapshotId) + .withSelect(select) + .withFilter(filter) + .withCaseSensitive(caseSensitive) + .withUseSnapshotSchema(useSnapshotSchema) + .withStartSnapshotId(startSnapshotId) + .withEndSnapshotId(endSnapshotId) + .withStatsFields(statsFields) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java new file mode 100644 index 000000000000..fd14be8c208d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.RESTResponse; + +public class FetchPlanningResultResponse implements RESTResponse { + private final PlanStatus planStatus; + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + private FetchPlanningResultResponse( + PlanStatus planStatus, + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planStatus = planStatus; + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void validate() { + Preconditions.checkArgument(planStatus() != null, "Invalid status: null"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static class Builder { + private Builder() {} + + private PlanStatus planStatus; + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + public Builder withPlanStatus(PlanStatus status) { + this.planStatus = status; + return this; + } + + public Builder withPlanTasks(List tasks) { + this.planTasks = tasks; + return this; + } + + public Builder withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return this; + } + + public Builder withDeleteFiles(List deletes) { + this.deleteFiles = deletes; + return this; + } + + public Builder withSpecsById(Map specs) { + this.specsById = specs; + return this; + } + + public FetchPlanningResultResponse build() { + return new FetchPlanningResultResponse( + planStatus, planTasks, fileScanTasks, deleteFiles, specsById); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..8b8c6d9cbee3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScanResponseParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.util.JsonUtil; + +public class FetchPlanningResultResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchPlanningResultResponseParser() {} + + public static String toJson(FetchPlanningResultResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchPlanningResultResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchPlanningResultResponse response, JsonGenerator gen) + throws IOException { + Preconditions.checkArgument(null != response, "Invalid fetch scan planning response: null"); + Preconditions.checkArgument( + response.specsById() != null + || (response.fileScanTasks() == null || response.fileScanTasks().isEmpty()), + "Cannot serialize fileScanTasks in fetchingPlanningResultResponse without specsById"); + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + public static FetchPlanningResultResponse fromJson(String json) { + Preconditions.checkArgument(json != null, "Invalid fetch scan planning response: null"); + return JsonUtil.parse(json, FetchPlanningResultResponseParser::fromJson); + } + + public static FetchPlanningResultResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), "Invalid fetch scan planning response: null or empty"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + return FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java new file mode 100644 index 000000000000..464dcc3f46c5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTResponse; + +public class FetchScanTasksResponse implements RESTResponse { + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + private FetchScanTasksResponse( + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + validate(); + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void validate() { + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + Preconditions.checkArgument( + planTasks() != null || fileScanTasks() != null, + "Invalid response: planTasks and fileScanTask cannot both be null"); + } + + public static class Builder { + private Builder() {} + + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + public Builder withPlanTasks(List tasks) { + this.planTasks = tasks; + return this; + } + + public Builder withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return this; + } + + public Builder withDeleteFiles(List deletes) { + this.deleteFiles = deletes; + return this; + } + + public Builder withSpecsById(Map specs) { + this.specsById = specs; + return this; + } + + public FetchScanTasksResponse build() { + return new FetchScanTasksResponse(planTasks, fileScanTasks, deleteFiles, specsById); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java new file mode 100644 index 000000000000..d2d910b2197e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScanResponseParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksResponseParser { + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchScanTasksResponseParser() {} + + public static String toJson(FetchScanTasksResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchScanTasksResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchScanTasksResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(response != null, "Invalid fetch scan tasks response: null"); + Preconditions.checkArgument( + response.specsById() != null + || (response.fileScanTasks() == null || response.fileScanTasks().isEmpty()), + "Cannot serialize fileScanTasks in fetchScanTasksResponse without specsById"); + gen.writeStartObject(); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + public static FetchScanTasksResponse fromJson(String json) { + Preconditions.checkArgument(json != null, "Invalid fetch scan tasks response: null"); + return JsonUtil.parse(json, FetchScanTasksResponseParser::fromJson); + } + + public static FetchScanTasksResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), "Invalid fetch scan tasks response: null or empty"); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + return FetchScanTasksResponse.builder() + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java new file mode 100644 index 000000000000..57dd90d8d986 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.RESTResponse; + +public class PlanTableScanResponse implements RESTResponse { + private final PlanStatus planStatus; + private final String planId; + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + private PlanTableScanResponse( + PlanStatus planStatus, + String planId, + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planStatus = planStatus; + this.planId = planId; + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public String planId() { + return planId; + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("planStatus", planStatus) + .add("planId", planId) + .toString(); + } + + @Override + public void validate() { + Preconditions.checkArgument( + planStatus() != null, "Invalid response: plan status must be defined"); + Preconditions.checkArgument( + planStatus() != PlanStatus.SUBMITTED || planId() != null, + "Invalid response: plan id should be defined when status is 'submitted'"); + Preconditions.checkArgument( + planStatus() != PlanStatus.CANCELLED, + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + Preconditions.checkArgument( + planStatus() == PlanStatus.SUBMITTED || planId() == null, + "Invalid response: plan id can only be returned in a 'submitted' status"); + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Builder() {} + + private PlanStatus planStatus; + private String planId; + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + public Builder withPlanStatus(PlanStatus status) { + this.planStatus = status; + return this; + } + + public Builder withPlanId(String id) { + this.planId = id; + return this; + } + + public Builder withPlanTasks(List tasks) { + this.planTasks = tasks; + return this; + } + + public Builder withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return this; + } + + public Builder withDeleteFiles(List deletes) { + this.deleteFiles = deletes; + return this; + } + + public Builder withSpecsById(Map specs) { + this.specsById = specs; + return this; + } + + public PlanTableScanResponse build() { + return new PlanTableScanResponse( + planStatus, planId, planTasks, fileScanTasks, deleteFiles, specsById); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java new file mode 100644 index 000000000000..c8137edf13aa --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScanResponseParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_ID = "plan-id"; + private static final String PLAN_TASKS = "plan-tasks"; + + private PlanTableScanResponseParser() {} + + public static String toJson(PlanTableScanResponse response) { + return toJson(response, false); + } + + public static String toJson(PlanTableScanResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(PlanTableScanResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != response, "Invalid response: planTableScanResponse null"); + Preconditions.checkArgument( + response.planStatus() != null, "Invalid response: status can not be null"); + Preconditions.checkArgument( + response.specsById() != null, "Cannot serialize planTableScanResponse without specsById"); + + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + + if (response.planId() != null) { + gen.writeStringField(PLAN_ID, response.planId()); + } + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + + gen.writeEndObject(); + } + + public static PlanTableScanResponse fromJson(String json) { + Preconditions.checkArgument( + json != null, "Cannot parse planTableScan response from empty or null object"); + return JsonUtil.parse(json, PlanTableScanResponseParser::fromJson); + } + + public static PlanTableScanResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), + "Cannot parse planTableScan response from empty or null object"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + String planId = JsonUtil.getStringOrNull(PLAN_ID, json); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + + return PlanTableScanResponse.builder() + .withPlanId(planId) + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index a0b52b346bf3..78680801433e 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -63,7 +64,9 @@ public class TestBase { public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", BUCKETS_NUMBER).build(); - static final DataFile FILE_A = + public static final Map PARTITION_SPECS_BY_ID = Map.of(0, SPEC); + + public static final DataFile FILE_A = DataFiles.builder(SPEC) .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(10) @@ -77,7 +80,7 @@ public class TestBase { .withPartitionPath("data_bucket=0") // easy way to set partition data for now .withRecordCount(1) .build(); - static final DeleteFile FILE_A_DELETES = + public static final DeleteFile FILE_A_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofPositionDeletes() .withPath("/path/to/data-a-deletes.parquet") @@ -86,7 +89,7 @@ public class TestBase { .withRecordCount(1) .build(); // Equality delete files. - static final DeleteFile FILE_A2_DELETES = + public static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofEqualityDeletes(1) .withPath("/path/to/data-a2-deletes.parquet") diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequestParser.java new file mode 100644 index 000000000000..db498642b7e7 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequestParser.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.jupiter.api.Test; + +public class TestFetchScanTasksRequestParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchScanTasksRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan tasks request: null"); + + assertThatThrownBy(() -> FetchScanTasksRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan tasks request: null"); + + assertThatThrownBy(() -> FetchScanTasksRequestParser.fromJson("{}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: plan-task"); + } + + @Test + public void missingRequiredField() { + String missingRequiredFieldJson = "{\"x\": \"val\"}"; + assertThatThrownBy(() -> FetchScanTasksRequestParser.fromJson(missingRequiredFieldJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: plan-task"); + } + + @Test + public void roundTripSerdeWithPlanTask() { + FetchScanTasksRequest request = + FetchScanTasksRequest.builder().withPlanTask("somePlanTask").build(); + String expectedJson = "{\"plan-task\":\"somePlanTask\"}"; + String json = FetchScanTasksRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on FetchScanTasksRequest because we don't implement + // equals/hashcode + assertThat( + FetchScanTasksRequestParser.toJson(FetchScanTasksRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequestParser.java new file mode 100644 index 000000000000..30876584c748 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequestParser.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +public class TestPlanTableScanRequestParser { + @Test + public void nullCheck() { + assertThatThrownBy(() -> PlanTableScanRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid plan table scan request: null"); + + assertThatThrownBy(() -> PlanTableScanRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid plan table scan request: null"); + } + + @Test + public void roundTripSerdeWithEmptyRequestAndDefaultsPresent() { + PlanTableScanRequest request = new PlanTableScanRequest.Builder().build(); + + String expectedJson = "{" + "\"case-sensitive\":true," + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithSelectField() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withSelect(Lists.newArrayList("col1", "col2")) + .build(); + + String expectedJson = + "{\"snapshot-id\":1," + + "\"select\":[\"col1\",\"col2\"]," + + "\"case-sensitive\":true," + + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithFilterField() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withFilter(Expressions.alwaysFalse()) + .build(); + + String expectedJson = + "{\"snapshot-id\":1," + + "\"filter\":\"false\"," + + "\"case-sensitive\":true," + + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithAllFieldsInvalidRequest() { + PlanTableScanRequest.Builder request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withSelect(Lists.newArrayList("col1", "col2")) + .withFilter(Expressions.alwaysTrue()) + .withStartSnapshotId(1L) + .withEndSnapshotId(2L) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("col1", "col2")); + + assertThatThrownBy(request::build) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + + @Test + public void roundTripSerdeWithAllFieldsExceptSnapShotId() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSelect(Lists.newArrayList("col1", "col2")) + .withFilter(Expressions.alwaysTrue()) + .withStartSnapshotId(1L) + .withEndSnapshotId(2L) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("col1", "col2")) + .build(); + + String expectedJson = + "{\"start-snapshot-id\":1," + + "\"end-snapshot-id\":2," + + "\"select\":[\"col1\",\"col2\"]," + + "\"filter\":\"true\"," + + "\"case-sensitive\":false," + + "\"use-snapshot-schema\":true," + + "\"stats-fields\":[\"col1\",\"col2\"]}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..9334a8e57e0a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.rest.PlanStatus; +import org.junit.jupiter.api.Test; + +public class TestFetchPlanningResultResponseParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchPlanningResultResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan planning response: null"); + + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan planning response: null or empty"); + } + + @Test + public void serdeWithEmptyObject() { + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.toJson( + FetchPlanningResultResponse.builder().build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status: null"); + + String emptyJson = "{ }"; + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(emptyJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan planning response: null or empty"); + } + + @Test + public void missingRequiredField() { + String missingRequiredFieldJson = "{\"x\": \"val\"}"; + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(missingRequiredFieldJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: plan-status"); + } + + @Test + public void serdeWithInvalidPlanStatus() { + String invalidStatusJson = "{\"plan-status\": \"someStatus\"}"; + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(invalidStatusJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status name: someStatus"); + } + + @Test + public void serdeWithValidSubmittedStatus() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + FetchPlanningResultResponse response = + FetchPlanningResultResponse.builder().withPlanStatus(planStatus).build(); + + String expectedJson = "{\"plan-status\":\"submitted\"}"; + String json = FetchPlanningResultResponseParser.toJson(response); + assertThat(json).isEqualTo(expectedJson); + + FetchPlanningResultResponse fromResponse = FetchPlanningResultResponseParser.fromJson(json); + assertThat(FetchPlanningResultResponseParser.toJson(fromResponse)).isEqualTo(expectedJson); + } + + @Test + public void serdeWithInvalidPlanStatusSubmittedWithTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(List.of("task1", "task2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + "\"plan-tasks\":[\"task1\",\"task2\"]}"; + + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + } + + @Test + public void serdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void serdeWithValidStatusAndFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + PlanStatus planStatus = PlanStatus.fromName("completed"); + FetchPlanningResultResponse response = + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + // assume this has been set + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = FetchPlanningResultResponseParser.toJson(response, false); + assertThat(json).isEqualTo(expectedToJson); + + // make an unbound json where you expect to not have partitions for the data file, + // delete files as service does not send partition spec + String expectedFromJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + FetchPlanningResultResponse fromResponse = FetchPlanningResultResponseParser.fromJson(json); + FetchPlanningResultResponse copyResponse = + FetchPlanningResultResponse.builder() + .withPlanStatus(fromResponse.planStatus()) + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(FetchPlanningResultResponseParser.toJson(copyResponse, false)) + .isEqualTo(expectedFromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java new file mode 100644 index 000000000000..aab433b427b4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.junit.jupiter.api.Test; + +public class TestFetchScanTasksResponseParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchScanTasksResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan tasks response: null"); + + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan tasks response: null or empty"); + } + + @Test + public void serdeWithEmptyObject() { + assertThatThrownBy( + () -> FetchScanTasksResponseParser.toJson(FetchScanTasksResponse.builder().build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTasks and fileScanTask cannot both be null"); + + String emptyJson = "{ }"; + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson(emptyJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetch scan tasks response: null or empty"); + } + + @Test + public void missingRequiredField() { + String missingRequiredFieldJson = "{\"x\": \"val\"}"; + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson(missingRequiredFieldJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTasks and fileScanTask cannot both be null"); + } + + @Test + public void serdeWithPlanTasks() { + String expectedJson = "{\"plan-tasks\":[\"task1\",\"task2\"]}"; + String json = + FetchScanTasksResponseParser.toJson( + FetchScanTasksResponse.builder().withPlanTasks(List.of("task1", "task2")).build()); + assertThat(json).isEqualTo(expectedJson); + + FetchScanTasksResponse response = FetchScanTasksResponseParser.fromJson(json); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(FetchScanTasksResponseParser.toJson(response, false)).isEqualTo(expectedJson); + } + + @Test + public void serdeWithDeleteFilesNoFileScanTasksPresent() { + assertThatThrownBy( + () -> + FetchScanTasksResponse.builder() + .withPlanTasks(List.of("task1", "task2")) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-tasks\":[\"task1\",\"task2\"]," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void serdeWithFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + FetchScanTasksResponse response = + FetchScanTasksResponse.builder() + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{" + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = FetchScanTasksResponseParser.toJson(response, false); + assertThat(json).isEqualTo(expectedToJson); + + // create a response where the file scan tasks are unbound + String expectedFromJson = + "{" + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + FetchScanTasksResponse deserializedResponse = FetchScanTasksResponseParser.fromJson(json); + FetchScanTasksResponse responseWithSpecs = + FetchScanTasksResponse.builder() + .withPlanTasks(deserializedResponse.planTasks()) + .withDeleteFiles(deserializedResponse.deleteFiles()) + .withFileScanTasks(deserializedResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + assertThat(FetchScanTasksResponseParser.toJson(responseWithSpecs, false)) + .isEqualTo(expectedFromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java new file mode 100644 index 000000000000..31b3ba3e63c1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.rest.PlanStatus; +import org.junit.jupiter.api.Test; + +public class TestPlanTableScanResponseParser { + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTableScanResponse null"); + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse planTableScan response from empty or null object"); + } + + @Test + public void serdeWithEmptyObject() { + + assertThatThrownBy(() -> PlanTableScanResponse.builder().build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan status must be defined"); + + String emptyJson = "{ }"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(emptyJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse planTableScan response from empty or null object"); + } + + @Test + public void missingRequiredField() { + String missingRequiredFieldJson = "{\"x\": \"val\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(missingRequiredFieldJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: plan-status"); + } + + @Test + public void serdeWithInvalidPlanStatus() { + String invalidStatusJson = "{\"plan-status\": \"someStatus\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidStatusJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status name: someStatus"); + } + + @Test + public void serdeWithInvalidPlanStatusSubmittedWithoutPlanId() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); + + String invalidJson = "{\"plan-status\":\"submitted\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); + } + + @Test + public void serdeWithInvalidPlanStatusCancelled() { + PlanStatus planStatus = PlanStatus.fromName("cancelled"); + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); + + String invalidJson = "{\"plan-status\":\"cancelled\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); + } + + @Test + public void serdeWithInvalidPlanStatusSubmittedWithTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withPlanTasks(List.of("task1", "task2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"plan-id\":\"somePlanId\"," + + "\"plan-tasks\":[\"task1\",\"task2\"]}"; + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + } + + @Test + public void serdeWithInvalidPlanIdWithIncorrectStatus() { + PlanStatus planStatus = PlanStatus.fromName("failed"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id can only be returned in a 'submitted' status"); + + String invalidJson = "{\"plan-status\":\"failed\"," + "\"plan-id\":\"somePlanId\"}"; + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id can only be returned in a 'submitted' status"); + } + + @Test + public void serdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"plan-id\":\"somePlanId\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void serdeWithValidStatusAndFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + PlanStatus planStatus = PlanStatus.fromName("completed"); + PlanTableScanResponse response = + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = PlanTableScanResponseParser.toJson(response); + assertThat(json).isEqualTo(expectedToJson); + + String expectedFromJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + PlanTableScanResponse fromResponse = PlanTableScanResponseParser.fromJson(json); + PlanTableScanResponse copyResponse = + PlanTableScanResponse.builder() + .withPlanStatus(fromResponse.planStatus()) + .withPlanId(fromResponse.planId()) + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanResponseParser.toJson(copyResponse)).isEqualTo(expectedFromJson); + } +}