diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index 63cd606356db..b48334d8222d 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -27,7 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.JsonUtil; -class ContentFileParser { +public class ContentFileParser { private static final String SPEC_ID = "spec-id"; private static final String CONTENT = "content"; private static final String FILE_PATH = "file-path"; @@ -56,12 +56,12 @@ private static boolean hasPartitionData(StructLike partitionData) { return partitionData != null && partitionData.size() > 0; } - static String toJson(ContentFile contentFile, PartitionSpec spec) { + public static String toJson(ContentFile contentFile, PartitionSpec spec) { return JsonUtil.generate( generator -> ContentFileParser.toJson(contentFile, spec, generator), false); } - static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) + public static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException { Preconditions.checkArgument(contentFile != null, "Invalid content file: null"); Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); @@ -134,13 +134,18 @@ static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator.writeEndObject(); } - static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { + public static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { + return fromJson(jsonNode, spec == null ? null : Map.of(spec.specId(), spec)); + } + + public static ContentFile fromJson(JsonNode jsonNode, Map specsById) { Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for content file: null"); Preconditions.checkArgument( jsonNode.isObject(), "Invalid JSON node for content file: non-object (%s)", jsonNode); - Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); - + Preconditions.checkArgument(specsById != null, "Invalid partition spec: null"); int specId = JsonUtil.getInt(SPEC_ID, jsonNode); + PartitionSpec spec = specsById.get(specId); + Preconditions.checkArgument(spec != null, "Invalid partition specId: %s", specId); FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode)); String filePath = JsonUtil.getString(FILE_PATH, jsonNode); FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode)); diff --git a/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java b/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java new file mode 100644 index 000000000000..7ad1291b5140 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.Locale; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public enum PlanStatus { + COMPLETED("completed"), + SUBMITTED("submitted"), + CANCELLED("cancelled"), + FAILED("failed"); + + private final String status; + + PlanStatus(String status) { + this.status = status; + } + + public String status() { + return status; + } + + public static PlanStatus fromName(String status) { + Preconditions.checkArgument(status != null, "Status is null"); + try { + return PlanStatus.valueOf(status.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format("Invalid status name: %s", status), e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTFileScanTaskParser.java b/core/src/main/java/org/apache/iceberg/rest/RESTFileScanTaskParser.java new file mode 100644 index 000000000000..0ada9083eea6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTFileScanTaskParser.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.ContentFileParser; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +class RESTFileScanTaskParser { + private static final String DATA_FILE = "data-file"; + private static final String DELETE_FILE_REFERENCES = "delete-file-references"; + private static final String RESIDUAL_FILTER = "residual-filter"; + + private RESTFileScanTaskParser() {} + + public static void toJson( + FileScanTask fileScanTask, + Set deleteFileReferences, + PartitionSpec partitionSpec, + JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + generator.writeFieldName(DATA_FILE); + ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator); + if (deleteFileReferences != null) { + JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator); + } + + if (fileScanTask.residual() != null) { + generator.writeFieldName(RESIDUAL_FILTER); + ExpressionParser.toJson(fileScanTask.residual(), generator); + } + + generator.writeEndObject(); + } + + public static FileScanTask fromJson( + JsonNode jsonNode, + List allDeleteFiles, + Map specsById, + boolean isCaseSensitive) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode); + + DataFile dataFile = + (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, jsonNode), specsById); + int specId = dataFile.specId(); + + DeleteFile[] deleteFiles = null; + if (jsonNode.has(DELETE_FILE_REFERENCES)) { + List indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, jsonNode); + Preconditions.checkArgument( + Collections.max(indices) < allDeleteFiles.size(), + "Invalid delete file references: %s, expected indices < %s", + indices, + allDeleteFiles.size()); + deleteFiles = indices.stream().map(allDeleteFiles::get).toArray(DeleteFile[]::new); + } + + Expression filter = null; + if (jsonNode.has(RESIDUAL_FILTER)) { + filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL_FILTER)); + } + + String schemaString = SchemaParser.toJson(specsById.get(specId).schema()); + String specString = PartitionSpecParser.toJson(specsById.get(specId)); + ResidualEvaluator boundResidual = + ResidualEvaluator.of(specsById.get(specId), filter, isCaseSensitive); + + return new BaseFileScanTask(dataFile, deleteFiles, schemaString, specString, boundResidual); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 667142698633..3e0e1750115f 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -28,8 +28,10 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import java.io.IOException; +import java.util.Map; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.MetadataUpdateParser; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -46,9 +48,13 @@ import org.apache.iceberg.rest.requests.CommitTransactionRequestParser; import org.apache.iceberg.rest.requests.CreateViewRequest; import org.apache.iceberg.rest.requests.CreateViewRequestParser; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.requests.FetchScanTasksRequestParser; import org.apache.iceberg.rest.requests.ImmutableCreateViewRequest; import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequestParser; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RegisterTableRequestParser; import org.apache.iceberg.rest.requests.ReportMetricsRequest; @@ -59,6 +65,10 @@ import org.apache.iceberg.rest.responses.ConfigResponseParser; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.ErrorResponseParser; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponseParser; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.rest.responses.FetchScanTasksResponseParser; import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; import org.apache.iceberg.rest.responses.LoadCredentialsResponse; @@ -68,6 +78,8 @@ import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.LoadViewResponseParser; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponseParser; import org.apache.iceberg.util.JsonUtil; public class RESTSerializers { @@ -123,6 +135,18 @@ public static void registerAll(ObjectMapper mapper) { .addDeserializer(ConfigResponse.class, new ConfigResponseDeserializer<>()) .addSerializer(LoadTableResponse.class, new LoadTableResponseSerializer<>()) .addDeserializer(LoadTableResponse.class, new LoadTableResponseDeserializer<>()) + .addSerializer(PlanTableScanRequest.class, new PlanTableScanRequestSerializer<>()) + .addDeserializer(PlanTableScanRequest.class, new PlanTableScanRequestDeserializer<>()) + .addSerializer(FetchScanTasksRequest.class, new FetchScanTasksRequestSerializer<>()) + .addDeserializer(FetchScanTasksRequest.class, new FetchScanTasksRequestDeserializer<>()) + .addSerializer(PlanTableScanResponse.class, new PlanTableScanResponseSerializer<>()) + .addDeserializer(PlanTableScanResponse.class, new PlanTableScanResponseDeserializer<>()) + .addSerializer( + FetchPlanningResultResponse.class, new FetchPlanningResultResponseSerializer<>()) + .addDeserializer( + FetchPlanningResultResponse.class, new FetchPlanningResultResponseDeserializer<>()) + .addSerializer(FetchScanTasksResponse.class, new FetchScanTaskResponseSerializer<>()) + .addDeserializer(FetchScanTasksResponse.class, new FetchScanTaskResponseDeserializer<>()) .addSerializer(LoadCredentialsResponse.class, new LoadCredentialsResponseSerializer<>()) .addSerializer( ImmutableLoadCredentialsResponse.class, new LoadCredentialsResponseSerializer<>()) @@ -470,4 +494,133 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) LoadCredentialsResponseParser.fromJson(jsonNode); } } + + static class PlanTableScanRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanRequestParser.toJson(request, gen); + } + } + + static class PlanTableScanRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) PlanTableScanRequestParser.fromJson(jsonNode); + } + } + + static class FetchScanTasksRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksRequestParser.toJson(request, gen); + } + } + + static class FetchScanTasksRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchScanTasksRequestParser.fromJson(jsonNode); + } + } + + static class PlanTableScanResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanResponseParser.toJson(response, gen); + } + } + + static class PlanTableScanResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + TableScanResponseContext scanContext = parseScanResponseContext(context); + + return (T) + PlanTableScanResponseParser.fromJson( + jsonNode, scanContext.getSpecsById(), scanContext.isCaseSensitive()); + } + } + + static class FetchPlanningResultResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchPlanningResultResponseParser.toJson(response, gen); + } + } + + static class FetchPlanningResultResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + + TableScanResponseContext scanContext = parseScanResponseContext(context); + return (T) + FetchPlanningResultResponseParser.fromJson( + jsonNode, scanContext.getSpecsById(), scanContext.isCaseSensitive()); + } + } + + static class FetchScanTaskResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksResponseParser.toJson(response, gen); + } + } + + static class FetchScanTaskResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + + TableScanResponseContext scanContext = parseScanResponseContext(context); + return (T) + FetchScanTasksResponseParser.fromJson( + jsonNode, scanContext.getSpecsById(), scanContext.isCaseSensitive()); + } + } + + private static TableScanResponseContext parseScanResponseContext(DeserializationContext context) + throws IOException { + @SuppressWarnings("unchecked") + Map specsById = + (Map) context.findInjectableValue("specsById", null, null); + boolean caseSensitive = (boolean) context.findInjectableValue("caseSensitive", null, null); + return new TableScanResponseContext(specsById, caseSensitive); + } + + static class TableScanResponseContext { + private final Map specsById; + private final boolean caseSensitive; + + TableScanResponseContext(Map specs, boolean isCaseSensitive) { + this.specsById = specs; + this.caseSensitive = isCaseSensitive; + } + + Map getSpecsById() { + return specsById; + } + + boolean isCaseSensitive() { + return caseSensitive; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/TableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/TableScanResponseParser.java new file mode 100644 index 000000000000..67f71c418440 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/TableScanResponseParser.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ContentFileParser; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.JsonUtil; + +public class TableScanResponseParser { + + private TableScanResponseParser() {} + + static final String FILE_SCAN_TASKS = "file-scan-tasks"; + static final String DELETE_FILES = "delete-files"; + + public static List parseDeleteFiles( + JsonNode node, Map specsById) { + if (node.has(DELETE_FILES)) { + JsonNode deleteFiles = JsonUtil.get(DELETE_FILES, node); + Preconditions.checkArgument( + deleteFiles.isArray(), "Cannot parse delete files from non-array: %s", deleteFiles); + ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); + for (JsonNode deleteFileNode : deleteFiles) { + DeleteFile deleteFile = (DeleteFile) ContentFileParser.fromJson(deleteFileNode, specsById); + deleteFilesBuilder.add(deleteFile); + } + + return deleteFilesBuilder.build(); + } + + return Lists.newArrayList(); + } + + public static List parseFileScanTasks( + JsonNode node, + List deleteFiles, + Map specsById, + boolean caseSensitive) { + if (node.has(FILE_SCAN_TASKS)) { + JsonNode scanTasks = JsonUtil.get(FILE_SCAN_TASKS, node); + Preconditions.checkArgument( + scanTasks.isArray(), "Cannot parse file scan tasks from non-array: %s", scanTasks); + List fileScanTaskList = Lists.newArrayList(); + for (JsonNode fileScanTaskNode : scanTasks) { + FileScanTask fileScanTask = + RESTFileScanTaskParser.fromJson( + fileScanTaskNode, deleteFiles, specsById, caseSensitive); + fileScanTaskList.add(fileScanTask); + } + + return fileScanTaskList; + } + + return null; + } + + public static void serializeScanTasks( + List fileScanTasks, + List deleteFiles, + Map specsById, + JsonGenerator gen) + throws IOException { + Map deleteFilePathToIndex = Maps.newHashMap(); + if (deleteFiles != null && !deleteFiles.isEmpty()) { + Preconditions.checkArgument( + specsById != null, "Cannot serialize response without specs by ID defined"); + gen.writeArrayFieldStart(DELETE_FILES); + for (int i = 0; i < deleteFiles.size(); i++) { + DeleteFile deleteFile = deleteFiles.get(i); + deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); + ContentFileParser.toJson(deleteFiles.get(i), specsById.get(deleteFile.specId()), gen); + } + + gen.writeEndArray(); + } + + if (fileScanTasks != null) { + gen.writeArrayFieldStart(FILE_SCAN_TASKS); + Set deleteFileReferences = Sets.newHashSet(); + for (FileScanTask fileScanTask : fileScanTasks) { + if (deleteFiles != null) { + for (DeleteFile taskDelete : fileScanTask.deletes()) { + deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); + } + } + + PartitionSpec spec = specsById.get(fileScanTask.file().specId()); + Preconditions.checkArgument( + spec != null, + "Cannot serialize scan task with unknown spec %s", + fileScanTask.file().specId()); + RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, spec, gen); + } + + gen.writeEndArray(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java new file mode 100644 index 000000000000..2293baac999e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; + +public class FetchScanTasksRequest implements RESTRequest { + + private final String planTask; + + public FetchScanTasksRequest(String planTask) { + this.planTask = planTask; + validate(); + } + + public String planTask() { + return planTask; + } + + @Override + public void validate() { + Preconditions.checkArgument(planTask != null, "Invalid planTask: null"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("planTask", planTask).toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java new file mode 100644 index 000000000000..81f559e47b4f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksRequestParser { + private static final String PLAN_TASK = "plan-task"; + + private FetchScanTasksRequestParser() {} + + public static String toJson(FetchScanTasksRequest request) { + return toJson(request, false); + } + + public static String toJson(FetchScanTasksRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(FetchScanTasksRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid fetchScanTasks request: null"); + gen.writeStartObject(); + gen.writeStringField(PLAN_TASK, request.planTask()); + gen.writeEndObject(); + } + + public static FetchScanTasksRequest fromJson(String json) { + return JsonUtil.parse(json, FetchScanTasksRequestParser::fromJson); + } + + public static FetchScanTasksRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Invalid fetchScanTasks request: null"); + + String planTask = JsonUtil.getString(PLAN_TASK, json); + return new FetchScanTasksRequest(planTask); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java new file mode 100644 index 000000000000..14e14eab4bc7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; + +public class PlanTableScanRequest implements RESTRequest { + private final Long snapshotId; + private final List select; + private final Expression filter; + private final boolean caseSensitive; + private final boolean useSnapshotSchema; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final List statsFields; + + public Long snapshotId() { + return snapshotId; + } + + public List select() { + return select; + } + + public Expression filter() { + return filter; + } + + public boolean caseSensitive() { + return caseSensitive; + } + + public boolean useSnapshotSchema() { + return useSnapshotSchema; + } + + public Long startSnapshotId() { + return startSnapshotId; + } + + public Long endSnapshotId() { + return endSnapshotId; + } + + public List statsFields() { + return statsFields; + } + + private PlanTableScanRequest( + Long snapshotId, + List select, + Expression filter, + boolean caseSensitive, + boolean useSnapshotSchema, + Long startSnapshotId, + Long endSnapshotId, + List statsFields) { + this.snapshotId = snapshotId; + this.select = select; + this.filter = filter; + this.caseSensitive = caseSensitive; + this.useSnapshotSchema = useSnapshotSchema; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.statsFields = statsFields; + validate(); + } + + @Override + public void validate() { + Preconditions.checkArgument( + snapshotId != null ^ (startSnapshotId != null && endSnapshotId != null), + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotId", snapshotId) + .add("select", select) + .add("filter", filter) + .add("caseSensitive", caseSensitive) + .add("useSnapshotSchema", useSnapshotSchema) + .add("startSnapshotId", startSnapshotId) + .add("endSnapshotId", endSnapshotId) + .add("statsFields", statsFields) + .toString(); + } + + public static class Builder { + private Long snapshotId; + private List select; + private Expression filter; + private boolean caseSensitive = true; + private boolean useSnapshotSchema = false; + private Long startSnapshotId; + private Long endSnapshotId; + private List statsFields; + + public Builder() {} + + public Builder withSnapshotId(Long withSnapshotId) { + this.snapshotId = withSnapshotId; + return this; + } + + public Builder withSelect(List projection) { + this.select = projection; + return this; + } + + public Builder withFilter(Expression expression) { + this.filter = expression; + return this; + } + + public Builder withCaseSensitive(boolean value) { + this.caseSensitive = value; + return this; + } + + public Builder withUseSnapshotSchema(boolean snapshotSchema) { + this.useSnapshotSchema = snapshotSchema; + return this; + } + + public Builder withStartSnapshotId(Long startingSnapshotId) { + this.startSnapshotId = startingSnapshotId; + return this; + } + + public Builder withEndSnapshotId(Long endingSnapshotId) { + this.endSnapshotId = endingSnapshotId; + return this; + } + + public Builder withStatsFields(List fields) { + this.statsFields = fields; + return this; + } + + public PlanTableScanRequest build() { + return new PlanTableScanRequest( + snapshotId, + select, + filter, + caseSensitive, + useSnapshotSchema, + startSnapshotId, + endSnapshotId, + statsFields); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java new file mode 100644 index 000000000000..9b2eb9adb4e1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanRequestParser { + private static final String SNAPSHOT_ID = "snapshot-id"; + private static final String SELECT = "select"; + private static final String FILTER = "filter"; + private static final String CASE_SENSITIVE = "case-sensitive"; + private static final String USE_SNAPSHOT_SCHEMA = "use-snapshot-schema"; + private static final String START_SNAPSHOT_ID = "start-snapshot-id"; + private static final String END_SNAPSHOT_ID = "end-snapshot-id"; + private static final String STATS_FIELDS = "stats-fields"; + + private PlanTableScanRequestParser() {} + + public static String toJson(PlanTableScanRequest request) { + return toJson(request, false); + } + + public static String toJson(PlanTableScanRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static void toJson(PlanTableScanRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid planTableScanRequest: null"); + + if (request.snapshotId() != null + || request.startSnapshotId() != null + || request.endSnapshotId() != null) { + Preconditions.checkArgument( + request.snapshotId() != null + ^ (request.startSnapshotId() != null && request.endSnapshotId() != null), + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + + gen.writeStartObject(); + if (request.snapshotId() != null) { + gen.writeNumberField(SNAPSHOT_ID, request.snapshotId()); + } + + if (request.startSnapshotId() != null) { + gen.writeNumberField(START_SNAPSHOT_ID, request.startSnapshotId()); + } + + if (request.endSnapshotId() != null) { + gen.writeNumberField(END_SNAPSHOT_ID, request.endSnapshotId()); + } + + if (request.select() != null && !request.select().isEmpty()) { + JsonUtil.writeStringArray(SELECT, request.select(), gen); + } + + if (request.filter() != null) { + gen.writeStringField(FILTER, ExpressionParser.toJson(request.filter())); + } + + gen.writeBooleanField(CASE_SENSITIVE, request.caseSensitive()); + gen.writeBooleanField(USE_SNAPSHOT_SCHEMA, request.useSnapshotSchema()); + + if (request.statsFields() != null && !request.statsFields().isEmpty()) { + JsonUtil.writeStringArray(STATS_FIELDS, request.statsFields(), gen); + } + + gen.writeEndObject(); + } + + public static PlanTableScanRequest fromJson(String json) { + return JsonUtil.parse(json, PlanTableScanRequestParser::fromJson); + } + + public static PlanTableScanRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Invalid planTableScanRequest: null"); + + Long snapshotId = JsonUtil.getLongOrNull(SNAPSHOT_ID, json); + Long startSnapshotId = JsonUtil.getLongOrNull(START_SNAPSHOT_ID, json); + Long endSnapshotId = JsonUtil.getLongOrNull(END_SNAPSHOT_ID, json); + + List select = JsonUtil.getStringListOrNull(SELECT, json); + + Expression filter = null; + if (json.has(FILTER)) { + filter = ExpressionParser.fromJson(json.get(FILTER).textValue()); + } + + boolean caseSensitive = true; + if (json.has(CASE_SENSITIVE)) { + caseSensitive = JsonUtil.getBool(CASE_SENSITIVE, json); + } + + boolean useSnapshotSchema = false; + if (json.has(USE_SNAPSHOT_SCHEMA)) { + useSnapshotSchema = JsonUtil.getBool(USE_SNAPSHOT_SCHEMA, json); + } + + List statsFields = JsonUtil.getStringListOrNull(STATS_FIELDS, json); + + return new PlanTableScanRequest.Builder() + .withSnapshotId(snapshotId) + .withSelect(select) + .withFilter(filter) + .withCaseSensitive(caseSensitive) + .withUseSnapshotSchema(useSnapshotSchema) + .withStartSnapshotId(startSnapshotId) + .withEndSnapshotId(endSnapshotId) + .withStatsFields(statsFields) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/BaseScanTaskResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/BaseScanTaskResponse.java new file mode 100644 index 000000000000..b7649618e809 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/BaseScanTaskResponse.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.rest.RESTResponse; + +public abstract class BaseScanTaskResponse implements RESTResponse { + + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + protected BaseScanTaskResponse( + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + public abstract static class Builder, R extends BaseScanTaskResponse> { + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + protected Builder() {} + + @SuppressWarnings("unchecked") + public B self() { + return (B) this; + } + + public B withPlanTasks(List tasks) { + this.planTasks = tasks; + return self(); + } + + public B withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return self(); + } + + public B withDeleteFiles(List deleteFilesList) { + this.deleteFiles = deleteFilesList; + return self(); + } + + public B withSpecsById(Map specs) { + this.specsById = specs; + return self(); + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + public abstract R build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java new file mode 100644 index 000000000000..05d64a235891 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; + +public class FetchPlanningResultResponse extends BaseScanTaskResponse { + private final PlanStatus planStatus; + + private FetchPlanningResultResponse( + PlanStatus planStatus, + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + super(planTasks, fileScanTasks, deleteFiles, specsById); + this.planStatus = planStatus; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void validate() { + Preconditions.checkArgument(planStatus() != null, "Invalid status: null"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + if (fileScanTasks() == null || fileScanTasks().isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static class Builder + extends BaseScanTaskResponse.Builder { + private Builder() {} + + private PlanStatus planStatus; + + public Builder withPlanStatus(PlanStatus status) { + this.planStatus = status; + return this; + } + + @Override + public FetchPlanningResultResponse build() { + return new FetchPlanningResultResponse( + planStatus, planTasks(), fileScanTasks(), deleteFiles(), specsById()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..5dcfe2d59a2f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.TableScanResponseParser; +import org.apache.iceberg.util.JsonUtil; + +public class FetchPlanningResultResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchPlanningResultResponseParser() {} + + public static String toJson(FetchPlanningResultResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchPlanningResultResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchPlanningResultResponse response, JsonGenerator gen) + throws IOException { + Preconditions.checkArgument(null != response, "Invalid fetchPlanningResult response: null"); + Preconditions.checkArgument( + response.specsById() != null + || (response.fileScanTasks() == null || response.fileScanTasks().isEmpty()), + "Cannot serialize fileScanTasks in fetchingPlanningResultResponse without specsById"); + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + @VisibleForTesting + static FetchPlanningResultResponse fromJson( + String json, Map specsById, boolean caseSensitive) { + Preconditions.checkArgument( + json != null, "Invalid fetchPlanningResult response: null or empty"); + return JsonUtil.parse( + json, + node -> { + return fromJson(node, specsById, caseSensitive); + }); + } + + public static FetchPlanningResultResponse fromJson( + JsonNode json, Map specsById, boolean caseSensitive) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), "Invalid fetchPlanningResult response: null or empty"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json, specsById); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles, specsById, caseSensitive); + return FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java new file mode 100644 index 000000000000..6dcd85e6d307 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FetchScanTasksResponse extends BaseScanTaskResponse { + + private FetchScanTasksResponse( + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + super(planTasks, fileScanTasks, deleteFiles, specsById); + validate(); + } + + @Override + public void validate() { + if (fileScanTasks() == null || fileScanTasks().isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + Preconditions.checkArgument( + planTasks() != null || fileScanTasks() != null, + "Invalid response: planTasks and fileScanTask cannot both be null"); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder + extends BaseScanTaskResponse.Builder { + private Builder() {} + + @Override + public FetchScanTasksResponse build() { + return new FetchScanTasksResponse(planTasks(), fileScanTasks(), deleteFiles(), specsById()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java new file mode 100644 index 000000000000..f54243b8772e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.TableScanResponseParser; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksResponseParser { + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchScanTasksResponseParser() {} + + public static String toJson(FetchScanTasksResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchScanTasksResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchScanTasksResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(response != null, "Invalid response: fetchScanTasksResponse null"); + Preconditions.checkArgument( + response.specsById() != null + || (response.fileScanTasks() == null || response.fileScanTasks().isEmpty()), + "Cannot serialize fileScanTasks in fetchScanTasksResponse without specsById"); + gen.writeStartObject(); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + @VisibleForTesting + static FetchScanTasksResponse fromJson( + String json, Map specsById, boolean caseSensitive) { + Preconditions.checkArgument(json != null, "Cannot parse fetchScanTasks response from null"); + return JsonUtil.parse( + json, + node -> { + return fromJson(node, specsById, caseSensitive); + }); + } + + public static FetchScanTasksResponse fromJson( + JsonNode json, Map specsById, boolean caseSensitive) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), "Invalid response: fetchScanTasksResponse null"); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json, specsById); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles, specsById, caseSensitive); + return FetchScanTasksResponse.builder() + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java new file mode 100644 index 000000000000..4596f8d5cda2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; + +public class PlanTableScanResponse extends BaseScanTaskResponse { + private final PlanStatus planStatus; + private final String planId; + + private PlanTableScanResponse( + PlanStatus planStatus, + String planId, + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + super(planTasks, fileScanTasks, deleteFiles, specsById); + this.planStatus = planStatus; + this.planId = planId; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public String planId() { + return planId; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("planStatus", planStatus()) + .add("planId", planId()) + .toString(); + } + + @Override + public void validate() { + Preconditions.checkArgument( + planStatus() != null, "Invalid response: plan status must be defined"); + Preconditions.checkArgument( + planStatus() != PlanStatus.SUBMITTED || planId() != null, + "Invalid response: plan id should be defined when status is 'submitted'"); + Preconditions.checkArgument( + planStatus() != PlanStatus.CANCELLED, + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be defined when status is 'completed'"); + Preconditions.checkArgument( + planStatus() == PlanStatus.SUBMITTED || planId() == null, + "Invalid response: plan id can only be defined when status is 'submitted'"); + if (fileScanTasks() == null || fileScanTasks().isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends BaseScanTaskResponse.Builder { + private PlanStatus planStatus; + private String planId; + + public Builder withPlanStatus(PlanStatus status) { + this.planStatus = status; + return this; + } + + public Builder withPlanId(String id) { + this.planId = id; + return this; + } + + @Override + public PlanTableScanResponse build() { + return new PlanTableScanResponse( + planStatus, planId, planTasks(), fileScanTasks(), deleteFiles(), specsById()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java new file mode 100644 index 000000000000..523770e6cc36 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.TableScanResponseParser; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_ID = "plan-id"; + private static final String PLAN_TASKS = "plan-tasks"; + + private PlanTableScanResponseParser() {} + + public static String toJson(PlanTableScanResponse response) { + return toJson(response, false); + } + + public static String toJson(PlanTableScanResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(PlanTableScanResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != response, "Invalid response: planTableScanResponse null"); + Preconditions.checkArgument( + response.planStatus() != null, "Invalid response: status can not be null"); + Preconditions.checkArgument( + response.specsById() != null, "Cannot serialize planTableScanResponse without specsById"); + + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + + if (response.planId() != null) { + gen.writeStringField(PLAN_ID, response.planId()); + } + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + + gen.writeEndObject(); + } + + @VisibleForTesting + static PlanTableScanResponse fromJson( + String json, Map specsById, boolean caseSensitive) { + Preconditions.checkArgument( + json != null, "Cannot parse planTableScan response from empty or null object"); + return JsonUtil.parse( + json, + node -> { + return PlanTableScanResponseParser.fromJson(node, specsById, caseSensitive); + }); + } + + public static PlanTableScanResponse fromJson( + JsonNode json, Map specsById, boolean caseSensitive) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), + "Cannot parse planTableScan response from empty or null object"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + String planId = JsonUtil.getStringOrNull(PLAN_ID, json); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json, specsById); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles, specsById, caseSensitive); + + return PlanTableScanResponse.builder() + .withPlanId(planId) + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 8930be7a36ba..30c1fb7191fd 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -65,7 +66,9 @@ public class TestBase { public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", BUCKETS_NUMBER).build(); - static final DataFile FILE_A = + public static final Map PARTITION_SPECS_BY_ID = Map.of(0, SPEC); + + public static final DataFile FILE_A = DataFiles.builder(SPEC) .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(10) @@ -79,7 +82,7 @@ public class TestBase { .withPartitionPath("data_bucket=0") // easy way to set partition data for now .withRecordCount(1) .build(); - static final DeleteFile FILE_A_DELETES = + public static final DeleteFile FILE_A_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofPositionDeletes() .withPath("/path/to/data-a-deletes.parquet") diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java index 759f2f8ecd0b..3f463f722e9e 100644 --- a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java +++ b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.stream.Stream; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Comparators; @@ -51,13 +52,14 @@ public void testNullArguments() throws Exception { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid JSON generator: null"); - assertThatThrownBy(() -> ContentFileParser.fromJson(null, TestBase.SPEC)) + assertThatThrownBy( + () -> ContentFileParser.fromJson(null, Map.of(TestBase.SPEC.specId(), TestBase.SPEC))) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid JSON node for content file: null"); String jsonStr = ContentFileParser.toJson(TestBase.FILE_A, TestBase.SPEC); JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr); - assertThatThrownBy(() -> ContentFileParser.fromJson(jsonNode, null)) + assertThatThrownBy(() -> ContentFileParser.fromJson(jsonNode, (PartitionSpec) null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid partition spec: null"); } @@ -69,7 +71,8 @@ public void testDataFile(PartitionSpec spec, DataFile dataFile, String expectedJ String jsonStr = ContentFileParser.toJson(dataFile, spec); assertThat(jsonStr).isEqualTo(expectedJson); JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr); - ContentFile deserializedContentFile = ContentFileParser.fromJson(jsonNode, spec); + ContentFile deserializedContentFile = + ContentFileParser.fromJson(jsonNode, Map.of(TestBase.SPEC.specId(), spec)); assertThat(deserializedContentFile).isInstanceOf(DataFile.class); assertContentFileEquals(dataFile, deserializedContentFile, spec); } @@ -81,7 +84,8 @@ public void testDeleteFile(PartitionSpec spec, DeleteFile deleteFile, String exp String jsonStr = ContentFileParser.toJson(deleteFile, spec); assertThat(jsonStr).isEqualTo(expectedJson); JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr); - ContentFile deserializedContentFile = ContentFileParser.fromJson(jsonNode, spec); + ContentFile deserializedContentFile = + ContentFileParser.fromJson(jsonNode, Map.of(spec.specId(), TestBase.SPEC)); assertThat(deserializedContentFile).isInstanceOf(DeleteFile.class); assertContentFileEquals(deleteFile, deserializedContentFile, spec); } diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequest.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequest.java new file mode 100644 index 000000000000..a911bb5ae403 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.jupiter.api.Test; + +public class TestFetchScanTasksRequest { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchScanTasksRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetchScanTasks request: null"); + + assertThatThrownBy(() -> FetchScanTasksRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetchScanTasks request: null"); + } + + @Test + public void roundTripSerdeWithPlanTask() { + FetchScanTasksRequest request = new FetchScanTasksRequest("somePlanTask"); + String expectedJson = "{\"plan-task\":\"somePlanTask\"}"; + String json = FetchScanTasksRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + assertThat( + FetchScanTasksRequestParser.toJson(FetchScanTasksRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequest.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequest.java new file mode 100644 index 000000000000..f18928a1a349 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +public class TestPlanTableScanRequest { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> PlanTableScanRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid planTableScanRequest: null"); + + assertThatThrownBy(() -> PlanTableScanRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid planTableScanRequest: null"); + } + + @Test + public void roundTripSerdeWithSelectField() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withSelect(Lists.newArrayList("col1", "col2")) + .build(); + + String expectedJson = + "{\"snapshot-id\":1," + + "\"select\":[\"col1\",\"col2\"]," + + "\"case-sensitive\":true," + + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithFilterField() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withFilter(Expressions.alwaysFalse()) + .build(); + + String expectedJson = + "{\"snapshot-id\":1," + + "\"filter\":\"false\"," + + "\"case-sensitive\":true," + + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void planTableScanRequestWithAllFieldsInvalidRequest() { + assertThatThrownBy( + () -> + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withSelect(Lists.newArrayList("col1", "col2")) + .withFilter(Expressions.alwaysTrue()) + .withStartSnapshotId(1L) + .withEndSnapshotId(2L) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("col1", "col2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + + @Test + public void roundTripSerdeWithAllFieldsExceptSnapShotId() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSelect(Lists.newArrayList("col1", "col2")) + .withFilter(Expressions.alwaysTrue()) + .withStartSnapshotId(1L) + .withEndSnapshotId(2L) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("col1", "col2")) + .build(); + + String expectedJson = + "{\"start-snapshot-id\":1," + + "\"end-snapshot-id\":2," + + "\"select\":[\"col1\",\"col2\"]," + + "\"filter\":\"true\"," + + "\"case-sensitive\":false," + + "\"use-snapshot-schema\":true," + + "\"stats-fields\":[\"col1\",\"col2\"]}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void testToStringContainsAllFields() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(123L) + .withSelect(Lists.newArrayList("colA", "colB")) + .withFilter(Expressions.alwaysTrue()) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("stat1")) + .build(); + + String str = request.toString(); + assertThat(str).contains("snapshotId=123"); + assertThat(str).contains("select=[colA, colB]"); + assertThat(str).contains("filter=true"); + assertThat(str).contains("caseSensitive=false"); + assertThat(str).contains("useSnapshotSchema=true"); + assertThat(str).contains("statsFields=[stat1]"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..bac182b77ea2 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonFactoryBuilder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.RESTSerializers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestFetchPlanningResultResponseParser { + + private static final JsonFactory FACTORY = + new JsonFactoryBuilder() + .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false) + .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false) + .build(); + private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY); + + @BeforeEach + public void before() { + RESTSerializers.registerAll(MAPPER); + } + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchPlanningResultResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetchPlanningResult response: null"); + + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.fromJson( + (String) null, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetchPlanningResult response: null or empty"); + } + + @Test + public void roundTripSerdeWithEmptyObject() { + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.toJson( + FetchPlanningResultResponse.builder().build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status: null"); + + String emptyJson = "{ }"; + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.fromJson(emptyJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid fetchPlanningResult response: null or empty"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatus() { + String invalidStatusJson = "{\"plan-status\": \"someStatus\"}"; + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.fromJson( + invalidStatusJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status name: someStatus"); + } + + @Test + public void roundTripSerdeWithValidSubmittedStatus() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + FetchPlanningResultResponse response = + FetchPlanningResultResponse.builder().withPlanStatus(planStatus).build(); + + String expectedJson = "{\"plan-status\":\"submitted\"}"; + String json = FetchPlanningResultResponseParser.toJson(response); + assertThat(json).isEqualTo(expectedJson); + + FetchPlanningResultResponse fromResponse = + FetchPlanningResultResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false); + assertThat(FetchPlanningResultResponseParser.toJson(fromResponse)).isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(List.of("task1", "task2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + "\"plan-tasks\":[\"task1\",\"task2\"]}"; + + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.fromJson( + invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { + + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> { + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build(); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.fromJson( + invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void roundTripSerdeWithValidStatusAndFileScanTasks() throws JsonProcessingException { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + PlanStatus planStatus = PlanStatus.fromName("completed"); + FetchPlanningResultResponse response = + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + // assume this has been set + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = FetchPlanningResultResponseParser.toJson(response, false); + assertThat(json).isEqualTo(expectedToJson); + + // use RESTObjectMapper to read this + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue("specsById", PARTITION_SPECS_BY_ID); + injectableValues.addValue("caseSensitive", false); + ObjectReader objectReader = + MAPPER.readerFor(FetchPlanningResultResponse.class).with(injectableValues); + FetchPlanningResultResponse fromResponse = objectReader.readValue(json); + // Need to make a new response with partitionSpec set + FetchPlanningResultResponse copyResponse = + FetchPlanningResultResponse.builder() + .withPlanStatus(fromResponse.planStatus()) + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + assertThat(FetchPlanningResultResponseParser.toJson(copyResponse, false)) + .isEqualTo(expectedToJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java new file mode 100644 index 000000000000..01ca8288fb40 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.junit.jupiter.api.Test; + +public class TestFetchScanTasksResponseParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchScanTasksResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchScanTasksResponse null"); + + assertThatThrownBy( + () -> + FetchScanTasksResponseParser.fromJson( + (JsonNode) null, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchScanTasksResponse null"); + } + + @Test + public void roundTripSerdeWithEmptyObject() { + assertThatThrownBy( + () -> FetchScanTasksResponseParser.toJson(FetchScanTasksResponse.builder().build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTasks and fileScanTask cannot both be null"); + + String emptyJson = "{ }"; + assertThatThrownBy( + () -> FetchScanTasksResponseParser.fromJson(emptyJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchScanTasksResponse null"); + } + + @Test + public void roundTripSerdeWithPlanTasks() { + String expectedJson = "{\"plan-tasks\":[\"task1\",\"task2\"]}"; + String json = + FetchScanTasksResponseParser.toJson( + FetchScanTasksResponse.builder().withPlanTasks(List.of("task1", "task2")).build()); + assertThat(json).isEqualTo(expectedJson); + + FetchScanTasksResponse fromResponse = + FetchScanTasksResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false); + + assertThat(FetchScanTasksResponseParser.toJson(fromResponse, false)).isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithDeleteFilesNoFileScanTasksPresent() { + assertThatThrownBy( + () -> + FetchScanTasksResponse.builder() + .withPlanTasks(List.of("task1", "task2")) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-tasks\":[\"task1\",\"task2\"]," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy( + () -> FetchScanTasksResponseParser.fromJson(invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void roundTripSerdeWithFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + FetchScanTasksResponse response = + FetchScanTasksResponse.builder() + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + // assume you have set this already + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{" + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = FetchScanTasksResponseParser.toJson(response, false); + assertThat(json).isEqualTo(expectedToJson); + + FetchScanTasksResponse fromResponse = + FetchScanTasksResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false); + // Need to make a new response with partitionSpec set + FetchScanTasksResponse copyResponse = + FetchScanTasksResponse.builder() + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + assertThat(FetchScanTasksResponseParser.toJson(copyResponse, false)).isEqualTo(expectedToJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java new file mode 100644 index 000000000000..49c0ad1fa0af --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.rest.PlanStatus; +import org.junit.jupiter.api.Test; + +public class TestPlanTableScanResponseParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTableScanResponse null"); + + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson((String) null, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse planTableScan response from empty or null object"); + } + + @Test + public void roundTripSerdeWithEmptyObject() { + + assertThatThrownBy(() -> PlanTableScanResponse.builder().build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan status must be defined"); + + String emptyJson = "{ }"; + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson(emptyJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse planTableScan response from empty or null object"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatus() { + String invalidStatusJson = "{\"plan-status\": \"someStatus\"}"; + assertThatThrownBy( + () -> + PlanTableScanResponseParser.fromJson( + invalidStatusJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status name: someStatus"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithoutPlanId() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); + + String invalidJson = "{\"plan-status\":\"submitted\"}"; + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson(invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusCancelled() { + PlanStatus planStatus = PlanStatus.fromName("cancelled"); + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); + + String invalidJson = "{\"plan-status\":\"cancelled\"}"; + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson(invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withPlanTasks(List.of("task1", "task2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be defined when status is 'completed'"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"plan-id\":\"somePlanId\"," + + "\"plan-tasks\":[\"task1\",\"task2\"]}"; + + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson(invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be defined when status is 'completed'"); + } + + @Test + public void roundTripSerdeWithInvalidPlanIdWithIncorrectStatus() { + PlanStatus planStatus = PlanStatus.fromName("failed"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id can only be defined when status is 'submitted'"); + + String invalidJson = "{\"plan-status\":\"failed\"," + "\"plan-id\":\"somePlanId\"}"; + + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson(invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id can only be defined when status is 'submitted'"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"plan-id\":\"somePlanId\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy( + () -> PlanTableScanResponseParser.fromJson(invalidJson, PARTITION_SPECS_BY_ID, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void roundTripSerdeWithValidStatusAndFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + PlanStatus planStatus = PlanStatus.fromName("completed"); + PlanTableScanResponse response = + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = PlanTableScanResponseParser.toJson(response); + assertThat(json).isEqualTo(expectedToJson); + + PlanTableScanResponse fromResponse = + PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false); + PlanTableScanResponse copyResponse = + PlanTableScanResponse.builder() + .withPlanStatus(fromResponse.planStatus()) + .withPlanId(fromResponse.planId()) + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + assertThat(PlanTableScanResponseParser.toJson(copyResponse)).isEqualTo(expectedToJson); + } +}