diff --git a/api/src/main/java/org/apache/iceberg/exceptions/EntityNotFoundException.java b/api/src/main/java/org/apache/iceberg/exceptions/EntityNotFoundException.java new file mode 100644 index 000000000000..1d06a5d2bc26 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/EntityNotFoundException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.exceptions; + +import com.google.errorprone.annotations.FormatMethod; + +/** Exception raised when an entity is not found. */ +public class EntityNotFoundException extends RESTException implements CleanableFailure { + @FormatMethod + public EntityNotFoundException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public EntityNotFoundException(Throwable cause, String message, Object... args) { + super(cause, message, args); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 8f84eb5737b9..7536f56d6388 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -434,6 +434,11 @@ public StructLike partition() { return partitionData; } + public void setPartitionData(PartitionData partitionData) { + // TODO for binding in REST scan + this.partitionData = partitionData; + } + @Override public long recordCount() { return recordCount; diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index dd08c5c69e7d..dabc49016dcb 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -27,7 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.JsonUtil; -class ContentFileParser { +public class ContentFileParser { private static final String SPEC_ID = "spec-id"; private static final String CONTENT = "content"; private static final String FILE_PATH = "file-path"; @@ -48,6 +48,97 @@ class ContentFileParser { private ContentFileParser() {} + public static void unboundContentFileToJson( + ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException { + Preconditions.checkArgument(contentFile != null, "Invalid content file: null"); + Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + Preconditions.checkArgument( + contentFile.specId() == spec.specId(), + "Invalid partition spec id from content file: expected = %s, actual = %s", + spec.specId(), + contentFile.specId()); + + generator.writeStartObject(); + // ignore the ordinal position (ContentFile#pos) of the file in a manifest, + // as it isn't used and BaseFile constructor doesn't support it. + + generator.writeNumberField(SPEC_ID, contentFile.specId()); + generator.writeStringField(CONTENT, contentFile.content().name()); + generator.writeStringField(FILE_PATH, contentFile.path().toString()); + generator.writeStringField(FILE_FORMAT, contentFile.format().name()); + + if (contentFile.partition() != null) { + generator.writeFieldName(PARTITION); + SingleValueParser.toJson(spec.partitionType(), contentFile.partition(), generator); + } + + generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes()); + + metricsToJson(contentFile, generator); + + if (contentFile.keyMetadata() != null) { + generator.writeFieldName(KEY_METADATA); + SingleValueParser.toJson(DataFile.KEY_METADATA.type(), contentFile.keyMetadata(), generator); + } + + if (contentFile.splitOffsets() != null) { + JsonUtil.writeLongArray(SPLIT_OFFSETS, contentFile.splitOffsets(), generator); + } + + if (contentFile.equalityFieldIds() != null) { + JsonUtil.writeIntegerArray(EQUALITY_IDS, contentFile.equalityFieldIds(), generator); + } + + if (contentFile.sortOrderId() != null) { + generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId()); + } + + generator.writeEndObject(); + } + + public static ContentFile unboundContentFileFromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for content file: null"); + + int specId = JsonUtil.getInt(SPEC_ID, jsonNode); + FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode)); + String filePath = JsonUtil.getString(FILE_PATH, jsonNode); + FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode)); + + long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); + Metrics metrics = metricsFromJson(jsonNode); + ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode); + List splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode); + int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode); + Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode); + + if (fileContent == FileContent.DATA) { + return new UnboundGenericDataFile( + specId, + filePath, + fileFormat, + jsonNode.get(PARTITION), + fileSizeInBytes, + metrics, + keyMetadata, + splitOffsets, + sortOrderId); + } else { + return new UnboundGenericDeleteFile( + specId, + fileContent, + filePath, + fileFormat, + jsonNode.get(PARTITION), + fileSizeInBytes, + metrics, + equalityFieldIds, + sortOrderId, + splitOffsets, + keyMetadata); + } + } + private static boolean hasPartitionData(StructLike partitionData) { return partitionData != null && partitionData.size() > 0; } @@ -125,18 +216,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { PartitionData partitionData = null; if (jsonNode.has(PARTITION)) { - partitionData = new PartitionData(spec.partitionType()); - StructLike structLike = - (StructLike) SingleValueParser.fromJson(spec.partitionType(), jsonNode.get(PARTITION)); - Preconditions.checkState( - partitionData.size() == structLike.size(), - "Invalid partition data size: expected = %s, actual = %s", - partitionData.size(), - structLike.size()); - for (int pos = 0; pos < partitionData.size(); ++pos) { - Class javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass(); - partitionData.set(pos, structLike.get(pos, javaClass)); - } + partitionData = partitionDataFromRawValue(jsonNode.get(PARTITION), spec); } long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); @@ -173,6 +253,27 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { } } + static PartitionData partitionDataFromRawValue(JsonNode rawPartitionValue, PartitionSpec spec) { + if (rawPartitionValue == null) { + return null; + } + + PartitionData partitionData = new PartitionData(spec.partitionType()); + StructLike structLike = + (StructLike) SingleValueParser.fromJson(spec.partitionType(), rawPartitionValue); + Preconditions.checkState( + partitionData.size() == structLike.size(), + "Invalid partition data size: expected = %s, actual = %s", + partitionData.size(), + structLike.size()); + for (int pos = 0; pos < partitionData.size(); ++pos) { + Class javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass(); + partitionData.set(pos, structLike.get(pos, javaClass)); + } + + return partitionData; + } + private static void metricsToJson(ContentFile contentFile, JsonGenerator generator) throws IOException { generator.writeNumberField(RECORD_COUNT, contentFile.recordCount()); diff --git a/core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java b/core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java new file mode 100644 index 000000000000..15e3392999f3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.JsonUtil; + +public class RESTFileScanTaskParser { + private static final String DATA_FILE = "data-file"; + private static final String DELETE_FILE_REFERENCES = "delete-file-references"; + private static final String RESIDUAL = "residual-filter"; + + private RESTFileScanTaskParser() {} + + public static void toJson( + FileScanTask fileScanTask, + Set deleteFileReferences, + PartitionSpec partitionSpec, + JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + generator.writeFieldName(DATA_FILE); + ContentFileParser.unboundContentFileToJson(fileScanTask.file(), partitionSpec, generator); + if (deleteFileReferences != null) { + JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator); + } + + if (fileScanTask.residual() != null) { + generator.writeFieldName(RESIDUAL); + ExpressionParser.toJson(fileScanTask.residual(), generator); + } + generator.writeEndObject(); + } + + public static FileScanTask fromJson(JsonNode jsonNode, List allDeleteFiles) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode); + + UnboundGenericDataFile dataFile = + (UnboundGenericDataFile) + ContentFileParser.unboundContentFileFromJson(JsonUtil.get(DATA_FILE, jsonNode)); + + UnboundGenericDeleteFile[] deleteFiles = null; + Set deleteFileReferences = Sets.newHashSet(); + if (jsonNode.has(DELETE_FILE_REFERENCES)) { + deleteFileReferences.addAll(JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, jsonNode)); + ImmutableList.Builder builder = ImmutableList.builder(); + deleteFileReferences.forEach( + delIdx -> builder.add((UnboundGenericDeleteFile) allDeleteFiles.get(delIdx))); + deleteFiles = builder.build().toArray(new UnboundGenericDeleteFile[0]); + } + + Expression filter = null; + if (jsonNode.has(RESIDUAL)) { + filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL)); + } + + return new UnboundBaseFileScanTask(dataFile, deleteFiles, filter); + } +} diff --git a/core/src/main/java/org/apache/iceberg/RESTPlanningMode.java b/core/src/main/java/org/apache/iceberg/RESTPlanningMode.java new file mode 100644 index 000000000000..185276ecbff7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RESTPlanningMode.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Locale; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public enum RESTPlanningMode { + REQUIRED("required"), + SUPPORTED("supported"), + UNSUPPORTED("unsupported"); + private final String planningMode; + + RESTPlanningMode(String planningMode) { + this.planningMode = planningMode; + } + + public String mode() { + return planningMode; + } + + public static RESTPlanningMode fromName(String planningMode) { + Preconditions.checkArgument(planningMode != null, "planningMode is null"); + try { + return RESTPlanningMode.valueOf(planningMode.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("Invalid planningMode name: %s", planningMode), e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/RESTTable.java b/core/src/main/java/org/apache/iceberg/RESTTable.java new file mode 100644 index 000000000000..c45c9feb581c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RESTTable.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.ResourcePaths; + +public class RESTTable extends BaseTable { + private final RESTClient client; + private final String path; + private final Supplier> headers; + private final MetricsReporter reporter; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + + public RESTTable( + TableOperations ops, + String name, + MetricsReporter reporter, + RESTClient client, + String path, + Supplier> headers, + TableIdentifier tableIdentifier, + ResourcePaths resourcePaths) { + super(ops, name, reporter); + this.reporter = reporter; + this.client = client; + this.headers = headers; + this.path = path; + this.tableIdentifier = tableIdentifier; + this.resourcePaths = resourcePaths; + } + + @Override + public TableScan newScan() { + // TODO when looking at ImmutableTableScanContext how do we ensure + // correct snapshotId to use for point in time cases. When looking at spark + // it seems it follows similar approach, see class SparkDistributedDataScan + + return new RESTTableScan( + this, + schema(), + ImmutableTableScanContext.builder().metricsReporter(reporter).build(), + client, + path, + headers, + operations(), + tableIdentifier, + resourcePaths); + } +} diff --git a/core/src/main/java/org/apache/iceberg/RESTTableScan.java b/core/src/main/java/org/apache/iceberg/RESTTableScan.java new file mode 100644 index 000000000000..76a1a9d69548 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RESTTableScan.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.rest.ErrorHandlers; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.ResourcePaths; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ParallelIterable; + +public class RESTTableScan extends DataTableScan { + private final RESTClient client; + private final String path; + private final Supplier> headers; + private final TableOperations operations; + private final Table table; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + + // TODO revisit if this property should be configurable + private static final int FETCH_PLANNING_SLEEP_DURATION_MS = 1000; + + public RESTTableScan( + Table table, + Schema schema, + TableScanContext context, + RESTClient client, + String path, + Supplier> headers, + TableOperations operations, + TableIdentifier tableIdentifier, + ResourcePaths resourcePaths) { + super(table, schema, context); + this.table = table; + this.client = client; + this.headers = headers; + this.path = path; + this.operations = operations; + this.tableIdentifier = tableIdentifier; + this.resourcePaths = resourcePaths; + } + + @Override + protected TableScan newRefinedScan( + Table refinedTable, Schema refinedSchema, TableScanContext refinedContext) { + return new RESTTableScan( + refinedTable, + refinedSchema, + refinedContext, + client, + path, + headers, + operations, + tableIdentifier, + resourcePaths); + } + + @Override + public CloseableIterable planFiles() { + List selectedColumns = + schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList()); + + List statsFields = null; + if (columnsToKeepStats() != null) { + statsFields = + columnsToKeepStats().stream() + .map(columnId -> schema().findColumnName(columnId)) + .collect(Collectors.toList()); + } + + Long startSnapshotId = context().fromSnapshotId(); + Long endSnapshotId = context().toSnapshotId(); + Long snapshotId = snapshotId(); + + PlanTableScanRequest.Builder planTableScanRequestBuilder = + new PlanTableScanRequest.Builder() + .withSelect(selectedColumns) + .withFilter(filter()) + .withCaseSensitive(isCaseSensitive()) + .withStatsFields(statsFields); + + if (startSnapshotId != null && endSnapshotId != null) { + planTableScanRequestBuilder + .withStartSnapshotId(startSnapshotId) + .withEndSnapshotId(endSnapshotId) + .withUseSnapshotSchema(true); + + } else if (snapshotId != null) { + boolean useSnapShotSchema = snapshotId != table.currentSnapshot().snapshotId(); + planTableScanRequestBuilder + .withSnapshotId(snapshotId) + .withUseSnapshotSchema(useSnapShotSchema); + + } else { + planTableScanRequestBuilder.withSnapshotId(table().currentSnapshot().snapshotId()); + } + + return planTableScan(planTableScanRequestBuilder.build()); + } + + private CloseableIterable planTableScan(PlanTableScanRequest planTableScanRequest) { + PlanTableScanResponse response = + client.post( + resourcePaths.planTableScan(tableIdentifier), + planTableScanRequest, + PlanTableScanResponse.class, + headers, + ErrorHandlers.defaultErrorHandler()); + + PlanStatus planStatus = response.planStatus(); + switch (planStatus) { + case COMPLETED: + // List fileScanTasks = bindFileScanTasksWithSpec(response.fileScanTasks()); + return getScanTasksIterable(response.planTasks(), response.fileScanTasks()); + case SUBMITTED: + return fetchPlanningResult(response.planId()); + case FAILED: + throw new RuntimeException( + "Received \"failed\" status from service when planning a table scan"); + default: + throw new RuntimeException( + String.format("Invalid planStatus during planTableScan: %s", planStatus)); + } + } + + private CloseableIterable fetchPlanningResult(String planId) { + + // TODO need to introduce a max wait time for this loop potentially + boolean planningFinished = false; + while (!planningFinished) { + FetchPlanningResultResponse response = + client.get( + resourcePaths.fetchPlanningResult(tableIdentifier, planId), + FetchPlanningResultResponse.class, + headers, + ErrorHandlers.defaultErrorHandler()); + + PlanStatus planStatus = response.planStatus(); + switch (planStatus) { + case COMPLETED: + // List fileScanTasks = bindFileScanTasksWithSpec(response.fileScanTasks()); + return getScanTasksIterable(response.planTasks(), response.fileScanTasks()); + case SUBMITTED: + try { + Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while fetching plan status", e); + } + break; + case FAILED: + throw new RuntimeException( + "Received \"failed\" status from service when fetching a table scan"); + case CANCELLED: + throw new RuntimeException( + String.format( + "Received \"cancelled\" status from service when fetching a table scan, planId: %s is invalid", + planId)); + default: + throw new RuntimeException( + String.format("Invalid planStatus during fetchPlanningResult: %s", planStatus)); + } + } + return null; + } + + public CloseableIterable getScanTasksIterable( + List planTasks, List fileScanTasks) { + List iterableOfScanTaskIterables = Lists.newArrayList(); + if (fileScanTasks != null) { + // add this to the list for below if planTasks will also be present + ScanTasksIterable scanTasksIterable = + new ScanTasksIterable( + fileScanTasks, + client, + resourcePaths, + tableIdentifier, + headers, + planExecutor(), + table.specs(), + isCaseSensitive()); + iterableOfScanTaskIterables.add(scanTasksIterable); + } + if (planTasks != null) { + // Use parallel iterable since planTasks are present + for (String planTask : planTasks) { + ScanTasksIterable iterable = + new ScanTasksIterable( + planTask, + client, + resourcePaths, + tableIdentifier, + headers, + planExecutor(), + table.specs(), + isCaseSensitive()); + iterableOfScanTaskIterables.add(iterable); + } + return new ParallelIterable<>(iterableOfScanTaskIterables, planExecutor()); + // another idea is to keep concating to the original parallel iterable??? + } + // use a single scanTasks iterable since no need to parallelize since no planTasks + return new ScanTasksIterable( + fileScanTasks, + client, + resourcePaths, + tableIdentifier, + headers, + planExecutor(), + table.specs(), + isCaseSensitive()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ScanTasksIterable.java b/core/src/main/java/org/apache/iceberg/ScanTasksIterable.java new file mode 100644 index 000000000000..df5dc3f9149b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ScanTasksIterable.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.rest.ErrorHandlers; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.ResourcePaths; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.util.ParallelIterable; + +public class ScanTasksIterable implements CloseableIterable { + private final RESTClient client; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + private final Supplier> headers; + private final String + planTask; // parallelizing on this where a planTask produces a list of file scan tasks, as + // well more planTasks + private final List fileScanTasks; + private ExecutorService executorService; + private Map specsById; + private boolean caseSensitive; + + public ScanTasksIterable( + String planTask, + RESTClient client, + ResourcePaths resourcePaths, + TableIdentifier tableIdentifier, + Supplier> headers, + ExecutorService executorService, + Map specsById, + boolean caseSensitive) { + this.planTask = planTask; + this.fileScanTasks = null; + this.client = client; + this.resourcePaths = resourcePaths; + this.tableIdentifier = tableIdentifier; + this.headers = headers; + this.executorService = executorService; + this.specsById = specsById; + this.caseSensitive = caseSensitive; + } + + public ScanTasksIterable( + List fileScanTasks, + RESTClient client, + ResourcePaths resourcePaths, + TableIdentifier tableIdentifier, + Supplier> headers, + ExecutorService executorService, + Map specsById, + boolean caseSensitive) { + this.planTask = null; + this.fileScanTasks = fileScanTasks; + this.client = client; + this.resourcePaths = resourcePaths; + this.tableIdentifier = tableIdentifier; + this.headers = headers; + this.executorService = executorService; + this.specsById = specsById; + this.caseSensitive = caseSensitive; + } + + @Override + public CloseableIterator iterator() { + return new ScanTasksIterator( + planTask, + fileScanTasks, + client, + resourcePaths, + tableIdentifier, + headers, + executorService, + specsById, + caseSensitive); + } + + @Override + public void close() throws IOException {} + + private static class ScanTasksIterator implements CloseableIterator { + private final RESTClient client; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + private final Supplier> headers; + private String planTask; + private List fileScanTasks; + private ExecutorService executorService; + private Map specsById; + private boolean caseSensitive; + + ScanTasksIterator( + String planTask, + List fileScanTasks, + RESTClient client, + ResourcePaths resourcePaths, + TableIdentifier tableIdentifier, + Supplier> headers, + ExecutorService executorService, + Map specsById, + boolean caseSensitive) { + this.client = client; + this.resourcePaths = resourcePaths; + this.tableIdentifier = tableIdentifier; + this.headers = headers; + this.planTask = planTask; + this.fileScanTasks = fileScanTasks != null ? fileScanTasks : Lists.newArrayList(); + this.executorService = executorService; + this.specsById = specsById; + this.caseSensitive = caseSensitive; + } + + @Override + public boolean hasNext() { + if (!fileScanTasks.isEmpty()) { + // Have file scan tasks so continue to consume + return true; + } + // Out of file scan tasks, so need to now fetch more from each planTask + // Service can send back more planTasks which acts as pagination + if (planTask != null) { + fetchScanTasks(planTask); + planTask = null; + // Make another hasNext() call, as more fileScanTasks have been fetched + return hasNext(); + } + // we have no file scan tasks left to consume + // so means we are finished + return false; + } + + @Override + public FileScanTask next() { + FileScanTask task = fileScanTasks.remove(0); + if (task instanceof UnboundBaseFileScanTask) { + // bind partition spec data to task + UnboundBaseFileScanTask unboundBaseFileScanTask = (UnboundBaseFileScanTask) task; + Integer specId = task.file().specId(); + return unboundBaseFileScanTask.bind(specsById.get(specId), caseSensitive); + } else { + return task; + } + } + + private void fetchScanTasks(String withPlanTask) { + FetchScanTasksRequest fetchScanTasksRequest = new FetchScanTasksRequest(withPlanTask); + FetchScanTasksResponse response = + client.post( + resourcePaths.fetchScanTasks(tableIdentifier), + fetchScanTasksRequest, + FetchScanTasksResponse.class, + headers, + ErrorHandlers.defaultErrorHandler()); + if (response.fileScanTasks() != null) { + fileScanTasks.addAll(response.fileScanTasks()); + } + + if (response.planTasks() != null) { + // this is the case where a plan task returned an additional plan task, so ensure that this + // result is added to top level fileScanTasks list. + // confirmed working with catalog test + // #testPlanTableScanAndFetchScanTasksWithCompletedStatusAndNestedPlanTasks + Iterable fileScanTasksFromPlanTasks = + getScanTasksIterable(response.planTasks()); + fileScanTasksFromPlanTasks.forEach(task -> fileScanTasks.add(task)); + } + } + + public CloseableIterable getScanTasksIterable(List planTasks) { + List iterableOfScanTaskIterables = Lists.newArrayList(); + for (String withPlanTask : planTasks) { + ScanTasksIterable iterable = + new ScanTasksIterable( + withPlanTask, + client, + resourcePaths, + tableIdentifier, + headers, + executorService, + specsById, + caseSensitive); + iterableOfScanTaskIterables.add(iterable); + } + return new ParallelIterable<>(iterableOfScanTaskIterables, executorService); + } + + @Override + public void close() throws IOException {} + } +} diff --git a/core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java new file mode 100644 index 000000000000..9905e5be4c21 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ResidualEvaluator; + +class UnboundBaseFileScanTask extends BaseFileScanTask { + private UnboundGenericDataFile unboundDataFile; + private UnboundGenericDeleteFile[] unboundDeleteFiles; + private Expression filter; + + UnboundBaseFileScanTask( + UnboundGenericDataFile unboundDataFile, + UnboundGenericDeleteFile[] unboundDeleteFiles, + Expression filter) { + super(unboundDataFile, unboundDeleteFiles, null, null, ResidualEvaluator.unpartitioned(filter)); + this.unboundDataFile = unboundDataFile; + this.unboundDeleteFiles = unboundDeleteFiles; + this.filter = filter; + } + + @Override + public Schema schema() { + throw new UnsupportedOperationException("schema() is not supported in UnboundBaseFileScanTask"); + } + + @Override + public PartitionSpec spec() { + throw new UnsupportedOperationException("spec() is not supported in UnboundBaseFileScanTask"); + } + + public FileScanTask bind(PartitionSpec spec, boolean caseSensitive) { + GenericDataFile boundDataFile = unboundDataFile.bindToSpec(spec); + DeleteFile[] boundDeleteFiles = new DeleteFile[unboundDeleteFiles.length]; + for (int i = 0; i < unboundDeleteFiles.length; i++) { + boundDeleteFiles[i] = unboundDeleteFiles[i].bindToSpec(spec); + } + + String schemaString = SchemaParser.toJson(spec.schema()); + String specString = PartitionSpecParser.toJson(spec); + ResidualEvaluator boundResidual = ResidualEvaluator.of(spec, filter, caseSensitive); + + return new BaseFileScanTask( + boundDataFile, boundDeleteFiles, schemaString, specString, boundResidual); + } +} diff --git a/core/src/main/java/org/apache/iceberg/UnboundGenericDataFile.java b/core/src/main/java/org/apache/iceberg/UnboundGenericDataFile.java new file mode 100644 index 000000000000..fa3b59fe0f9e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UnboundGenericDataFile.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An UnboundGenericDataFile is a GenericDataFile which keeps track of the raw partition value + * represented as JSON + */ +class UnboundGenericDataFile extends GenericDataFile { + private final JsonNode rawPartitionValue; + + UnboundGenericDataFile( + int specId, + String filePath, + FileFormat format, + JsonNode rawPartitionValue, + long fileSizeInBytes, + Metrics metrics, + ByteBuffer keyMetadata, + List splitOffsets, + Integer sortOrderId) { + super( + specId, + filePath, + format, + null, + fileSizeInBytes, + metrics, + keyMetadata, + splitOffsets, + sortOrderId); + this.rawPartitionValue = rawPartitionValue; + } + + GenericDataFile bindToSpec(PartitionSpec spec) { + return new GenericDataFile( + specId(), + path().toString(), + format(), + ContentFileParser.partitionDataFromRawValue(rawPartitionValue, spec), + fileSizeInBytes(), + new Metrics( + recordCount(), + columnSizes(), + valueCounts(), + nullValueCounts(), + nanValueCounts(), + lowerBounds(), + upperBounds()), + keyMetadata(), + splitOffsets(), + sortOrderId()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/UnboundGenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/UnboundGenericDeleteFile.java new file mode 100644 index 000000000000..bff9c576f589 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UnboundGenericDeleteFile.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An UnboundGenericDeleteFile is a GenericDeleteFile which keeps track of the raw partition value + * represented as JSON + */ +class UnboundGenericDeleteFile extends GenericDeleteFile { + private JsonNode rawPartitionValue; + + UnboundGenericDeleteFile( + int specId, + FileContent content, + String filePath, + FileFormat format, + JsonNode rawPartitionValue, + long fileSizeInBytes, + Metrics metrics, + int[] equalityFieldIds, + Integer sortOrderId, + List splitOffsets, + ByteBuffer keyMetadata) { + super( + specId, + content, + filePath, + format, + null, + fileSizeInBytes, + metrics, + equalityFieldIds, + sortOrderId, + splitOffsets, + keyMetadata); + this.rawPartitionValue = rawPartitionValue; + } + + GenericDeleteFile bindToSpec(PartitionSpec spec) { + return new GenericDeleteFile( + specId(), + content(), + path().toString(), + format(), + ContentFileParser.partitionDataFromRawValue(rawPartitionValue, spec), + fileSizeInBytes(), + new Metrics( + recordCount(), + columnSizes(), + valueCounts(), + nullValueCounts(), + nanValueCounts(), + lowerBounds(), + upperBounds()), + equalityFieldIds().stream().mapToInt(Integer::intValue).toArray(), + sortOrderId(), + splitOffsets(), + keyMetadata()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java b/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java new file mode 100644 index 000000000000..5603d51e9aa2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/PlanStatus.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.Locale; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public enum PlanStatus { + COMPLETED("completed"), + SUBMITTED("submitted"), + CANCELLED("cancelled"), + FAILED("failed"); + + private final String status; + + PlanStatus(String status) { + this.status = status; + } + + public String status() { + return status; + } + + public static PlanStatus fromName(String status) { + Preconditions.checkArgument(status != null, "Status is null"); + try { + return PlanStatus.valueOf(status.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format("Invalid status name: %s", status), e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 7f39d0bc1f5e..02bd59657570 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -46,9 +46,13 @@ import org.apache.iceberg.rest.requests.CommitTransactionRequestParser; import org.apache.iceberg.rest.requests.CreateViewRequest; import org.apache.iceberg.rest.requests.CreateViewRequestParser; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.requests.FetchScanTasksRequestParser; import org.apache.iceberg.rest.requests.ImmutableCreateViewRequest; import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequestParser; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RegisterTableRequestParser; import org.apache.iceberg.rest.requests.ReportMetricsRequest; @@ -59,12 +63,18 @@ import org.apache.iceberg.rest.responses.ConfigResponseParser; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.ErrorResponseParser; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponseParser; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.rest.responses.FetchScanTasksResponseParser; import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadTableResponseParser; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.LoadViewResponseParser; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponseParser; import org.apache.iceberg.util.JsonUtil; public class RESTSerializers { @@ -119,8 +129,19 @@ public static void registerAll(ObjectMapper mapper) { .addSerializer(ConfigResponse.class, new ConfigResponseSerializer<>()) .addDeserializer(ConfigResponse.class, new ConfigResponseDeserializer<>()) .addSerializer(LoadTableResponse.class, new LoadTableResponseSerializer<>()) - .addDeserializer(LoadTableResponse.class, new LoadTableResponseDeserializer<>()); - + .addDeserializer(LoadTableResponse.class, new LoadTableResponseDeserializer<>()) + .addSerializer(PlanTableScanRequest.class, new PlanTableScanRequestSerializer<>()) + .addDeserializer(PlanTableScanRequest.class, new PlanTableScanRequestDeserializer<>()) + .addSerializer(FetchScanTasksRequest.class, new FetchScanTasksRequestSerializer<>()) + .addDeserializer(FetchScanTasksRequest.class, new FetchScanTasksRequestDeserializer<>()) + .addSerializer(PlanTableScanResponse.class, new PlanTableScanResponseSerializer<>()) + .addDeserializer(PlanTableScanResponse.class, new PlanTableScanResponseDeserializer<>()) + .addSerializer( + FetchPlanningResultResponse.class, new FetchPlanningResultResponseSerializer<>()) + .addDeserializer( + FetchPlanningResultResponse.class, new FetchPlanningResultResponseDeserializer<>()) + .addSerializer(FetchScanTasksResponse.class, new FetchScanTaskResponseSerializer<>()) + .addDeserializer(FetchScanTasksResponse.class, new FetchScanTaskResponseDeserializer<>()); mapper.registerModule(module); } @@ -443,4 +464,94 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) LoadTableResponseParser.fromJson(jsonNode); } } + + static class PlanTableScanRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanRequestParser.toJson(request, gen); + } + } + + static class PlanTableScanRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) PlanTableScanRequestParser.fromJson(jsonNode); + } + } + + static class FetchScanTasksRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksRequestParser.toJson(request, gen); + } + } + + static class FetchScanTasksRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchScanTasksRequestParser.fromJson(jsonNode); + } + } + + static class PlanTableScanResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanResponseParser.toJson(response, gen); + } + } + + static class PlanTableScanResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) PlanTableScanResponseParser.fromJson(jsonNode); + } + } + + static class FetchPlanningResultResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchPlanningResultResponseParser.toJson(response, gen); + } + } + + static class FetchPlanningResultResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchPlanningResultResponseParser.fromJson(jsonNode); + } + } + + static class FetchScanTaskResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksResponseParser.toJson(response, gen); + } + } + + static class FetchScanTaskResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchScanTasksResponseParser.fromJson(jsonNode); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index cc42604f700d..a5699ed9d6f5 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -43,10 +43,12 @@ import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RESTTable; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseViewSessionCatalog; @@ -117,6 +119,9 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"; // for backwards compatibility with older REST servers where it can be assumed that a particular // server supports view endpoints but doesn't send the "endpoints" field in the ConfigResponse + public static final String REST_SERVER_PLANNING_ENABLED = "rest-server-planning-enabled"; + private static final String REST_TABLE_SCAN_PLANNING_PROPERTY = "table.rest-scan-planning"; + static final String VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported"; public static final String REST_PAGE_SIZE = "rest-page-size"; private static final List TOKEN_PREFERENCE_ORDER = @@ -175,6 +180,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private FileIO io = null; private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; + private boolean restServerPlanningEnabled; private Integer pageSize = null; private CloseableGroup closeables = null; private Set endpoints; @@ -328,6 +334,9 @@ public void initialize(String name, Map unresolved) { this.reportingViaRestEnabled = PropertyUtil.propertyAsBoolean(mergedProps, REST_METRICS_REPORTING_ENABLED, true); + + this.restServerPlanningEnabled = + PropertyUtil.propertyAsBoolean(mergedProps, REST_SERVER_PLANNING_ENABLED, false); super.initialize(name, mergedProps); } @@ -512,6 +521,11 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { trackFileIO(ops); + RESTTable restTable = tableSupportsRemoteScanPlanning(ops, finalIdentifier, session); + if (restTable != null) { + return restTable; + } + BaseTable table = new BaseTable( ops, @@ -524,6 +538,26 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { return table; } + private RESTTable tableSupportsRemoteScanPlanning( + TableOperations ops, TableIdentifier finalIdentifier, AuthSession session) { + if (ops.current().properties().containsKey(REST_TABLE_SCAN_PLANNING_PROPERTY)) { + boolean tableSupportsRemotePlanning = + ops.current().propertyAsBoolean(REST_TABLE_SCAN_PLANNING_PROPERTY, false); + if (tableSupportsRemotePlanning && restServerPlanningEnabled) { + return new RESTTable( + ops, + fullTableName(finalIdentifier), + metricsReporter(paths.metrics(finalIdentifier), session::headers), + this.client, + paths.table(finalIdentifier), + session::headers, + finalIdentifier, + paths); + } + } + return null; + } + private void trackFileIO(RESTTableOperations ops) { if (io != ops.io()) { fileIOTracker.track(ops); @@ -587,6 +621,11 @@ public Table registerTable( trackFileIO(ops); + RESTTable restTable = tableSupportsRemoteScanPlanning(ops, ident, session); + if (restTable != null) { + return restTable; + } + return new BaseTable( ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers)); } @@ -820,6 +859,11 @@ public Table create() { trackFileIO(ops); + RESTTable restTable = tableSupportsRemoteScanPlanning(ops, ident, session); + if (restTable != null) { + return restTable; + } + return new BaseTable( ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers)); } diff --git a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java index 5ba7eae28262..1b5797f23f01 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java +++ b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java @@ -125,4 +125,38 @@ public String view(TableIdentifier ident) { public String renameView() { return SLASH.join("v1", prefix, "views", "rename"); } + + public String planTableScan(TableIdentifier ident) { + return SLASH.join( + "v1", + prefix, + "namespaces", + RESTUtil.encodeNamespace(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "plan"); + } + + public String fetchPlanningResult(TableIdentifier ident, String planId) { + return SLASH.join( + "v1", + prefix, + "namespaces", + RESTUtil.encodeNamespace(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "plan", + planId); + } + + public String fetchScanTasks(TableIdentifier ident) { + return SLASH.join( + "v1", + prefix, + "namespaces", + RESTUtil.encodeNamespace(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "tasks"); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java new file mode 100644 index 000000000000..2293baac999e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; + +public class FetchScanTasksRequest implements RESTRequest { + + private final String planTask; + + public FetchScanTasksRequest(String planTask) { + this.planTask = planTask; + validate(); + } + + public String planTask() { + return planTask; + } + + @Override + public void validate() { + Preconditions.checkArgument(planTask != null, "Invalid planTask: null"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("planTask", planTask).toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java new file mode 100644 index 000000000000..fa9af3da0c90 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksRequestParser { + private static final String PLAN_TASK = "plan-task"; + + private FetchScanTasksRequestParser() {} + + public static String toJson(FetchScanTasksRequest request) { + return toJson(request, false); + } + + public static String toJson(FetchScanTasksRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(FetchScanTasksRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid request: fetchScanTasks request null"); + gen.writeStartObject(); + gen.writeStringField(PLAN_TASK, request.planTask()); + gen.writeEndObject(); + } + + public static FetchScanTasksRequest fromJson(String json) { + return JsonUtil.parse(json, FetchScanTasksRequestParser::fromJson); + } + + public static FetchScanTasksRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Invalid request: fetchScanTasks null"); + + String planTask = JsonUtil.getString(PLAN_TASK, json); + return new FetchScanTasksRequest(planTask); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java new file mode 100644 index 000000000000..d85ee324b0dd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; + +public class PlanTableScanRequest implements RESTRequest { + private final Long snapshotId; + private final List select; + private final Expression filter; + private final boolean caseSensitive; + private final boolean useSnapshotSchema; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final List statsFields; + + public Long snapshotId() { + return snapshotId; + } + + public List select() { + return select; + } + + public Expression filter() { + return filter; + } + + public boolean caseSensitive() { + return caseSensitive; + } + + public boolean useSnapshotSchema() { + return useSnapshotSchema; + } + + public Long startSnapshotId() { + return startSnapshotId; + } + + public Long endSnapshotId() { + return endSnapshotId; + } + + public List statsFields() { + return statsFields; + } + + private PlanTableScanRequest( + Long snapshotId, + List select, + Expression filter, + boolean caseSensitive, + boolean useSnapshotSchema, + Long startSnapshotId, + Long endSnapshotId, + List statsFields) { + this.snapshotId = snapshotId; + this.select = select; + this.filter = filter; + this.caseSensitive = caseSensitive; + this.useSnapshotSchema = useSnapshotSchema; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.statsFields = statsFields; + } + + @Override + public void validate() { + if (snapshotId != null || startSnapshotId != null || endSnapshotId != null) { + Preconditions.checkArgument( + snapshotId != null ^ (startSnapshotId != null && endSnapshotId != null), + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotId", snapshotId) + .add("caseSensitive", caseSensitive) + .add("useSnapshotSchema", useSnapshotSchema) + .add("startSnapshotId", startSnapshotId) + .add("endSnapshotId", endSnapshotId) + .toString(); + } + + public static class Builder { + private Long snapshotId; + private List select; + private Expression filter; + private boolean caseSensitive = true; + private boolean useSnapshotSchema = false; + private Long startSnapshotId; + private Long endSnapshotId; + private List statsFields; + + public Builder() {} + + public Builder withSnapshotId(Long withSnapshotId) { + this.snapshotId = withSnapshotId; + return this; + } + + public Builder withSelect(List projection) { + this.select = projection; + return this; + } + + public Builder withFilter(Expression expression) { + this.filter = expression; + return this; + } + + public Builder withCaseSensitive(boolean value) { + this.caseSensitive = value; + return this; + } + + public Builder withUseSnapshotSchema(boolean snapshotSchema) { + this.useSnapshotSchema = snapshotSchema; + return this; + } + + public Builder withStartSnapshotId(Long startingSnapshotId) { + this.startSnapshotId = startingSnapshotId; + return this; + } + + public Builder withEndSnapshotId(Long endingSnapshotId) { + this.endSnapshotId = endingSnapshotId; + return this; + } + + public Builder withStatsFields(List fields) { + this.statsFields = fields; + return this; + } + + public PlanTableScanRequest build() { + return new PlanTableScanRequest( + snapshotId, + select, + filter, + caseSensitive, + useSnapshotSchema, + startSnapshotId, + endSnapshotId, + statsFields); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java new file mode 100644 index 000000000000..e840841fcfcf --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanRequestParser { + private static final String SNAPSHOT_ID = "snapshot-id"; + private static final String SELECT = "select"; + private static final String FILTER = "filter"; + private static final String CASE_SENSITIVE = "case-sensitive"; + private static final String USE_SNAPSHOT_SCHEMA = "use-snapshot-schema"; + private static final String START_SNAPSHOT_ID = "start-snapshot-id"; + private static final String END_SNAPSHOT_ID = "end-snapshot-id"; + private static final String STATS_FIELDS = "stats-fields"; + + private PlanTableScanRequestParser() {} + + public static String toJson(PlanTableScanRequest request) { + return toJson(request, false); + } + + public static String toJson(PlanTableScanRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static void toJson(PlanTableScanRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid request: planTableScanRequest null"); + + if (request.snapshotId() != null + || request.startSnapshotId() != null + || request.endSnapshotId() != null) { + Preconditions.checkArgument( + request.snapshotId() != null + ^ (request.startSnapshotId() != null && request.endSnapshotId() != null), + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + + gen.writeStartObject(); + if (request.snapshotId() != null) { + gen.writeNumberField(SNAPSHOT_ID, request.snapshotId()); + } + + if (request.startSnapshotId() != null) { + gen.writeNumberField(START_SNAPSHOT_ID, request.startSnapshotId()); + } + + if (request.endSnapshotId() != null) { + gen.writeNumberField(END_SNAPSHOT_ID, request.endSnapshotId()); + } + + if (request.select() != null && !request.select().isEmpty()) { + JsonUtil.writeStringArray(SELECT, request.select(), gen); + } + + if (request.filter() != null) { + gen.writeStringField(FILTER, ExpressionParser.toJson(request.filter())); + } + + gen.writeBooleanField(CASE_SENSITIVE, request.caseSensitive()); + gen.writeBooleanField(USE_SNAPSHOT_SCHEMA, request.useSnapshotSchema()); + + if (request.statsFields() != null && !request.statsFields().isEmpty()) { + JsonUtil.writeStringArray(STATS_FIELDS, request.statsFields(), gen); + } + + gen.writeEndObject(); + } + + public static PlanTableScanRequest fromJson(String json) { + return JsonUtil.parse(json, PlanTableScanRequestParser::fromJson); + } + + public static PlanTableScanRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Invalid request: planTableScanRequest null"); + + Long snapshotId = JsonUtil.getLongOrNull(SNAPSHOT_ID, json); + Long startSnapshotId = JsonUtil.getLongOrNull(START_SNAPSHOT_ID, json); + Long endSnapshotId = JsonUtil.getLongOrNull(END_SNAPSHOT_ID, json); + + List select = JsonUtil.getStringListOrNull(SELECT, json); + + Expression filter = null; + if (json.has(FILTER)) { + filter = ExpressionParser.fromJson(json.get(FILTER).textValue()); + } + + boolean caseSensitive = true; + if (json.has(CASE_SENSITIVE)) { + caseSensitive = JsonUtil.getBool(CASE_SENSITIVE, json); + } + + boolean useSnapshotSchema = false; + if (json.has(USE_SNAPSHOT_SCHEMA)) { + useSnapshotSchema = JsonUtil.getBool(USE_SNAPSHOT_SCHEMA, json); + } + + List statsFields = JsonUtil.getStringListOrNull(STATS_FIELDS, json); + + return new PlanTableScanRequest.Builder() + .withSnapshotId(snapshotId) + .withSelect(select) + .withFilter(filter) + .withCaseSensitive(caseSensitive) + .withUseSnapshotSchema(useSnapshotSchema) + .withStartSnapshotId(startSnapshotId) + .withEndSnapshotId(endSnapshotId) + .withStatsFields(statsFields) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java new file mode 100644 index 000000000000..29af32a03d38 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; + +public class FetchPlanningResultResponse implements TableScanResponse { + private final PlanStatus planStatus; + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + private FetchPlanningResultResponse( + PlanStatus planStatus, + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planStatus = planStatus; + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void validate() { + Preconditions.checkArgument(planStatus() != null, "Invalid status: null"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static class Builder { + private Builder() {} + + private PlanStatus planStatus; + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + public Builder withPlanStatus(PlanStatus status) { + this.planStatus = status; + return this; + } + + public Builder withPlanTasks(List tasks) { + this.planTasks = tasks; + return this; + } + + public Builder withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return this; + } + + public Builder withDeleteFiles(List deletes) { + this.deleteFiles = deletes; + return this; + } + + public Builder withSpecsById(Map specs) { + this.specsById = specs; + return this; + } + + public FetchPlanningResultResponse build() { + return new FetchPlanningResultResponse( + planStatus, planTasks, fileScanTasks, deleteFiles, specsById); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..a5ac0cdcc212 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.util.JsonUtil; + +public class FetchPlanningResultResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchPlanningResultResponseParser() {} + + public static String toJson(FetchPlanningResultResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchPlanningResultResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchPlanningResultResponse response, JsonGenerator gen) + throws IOException { + Preconditions.checkArgument( + null != response, "Invalid response: fetchPanningResultResponse null"); + Preconditions.checkArgument( + response.specsById() != null + || (response.fileScanTasks() == null || response.fileScanTasks().isEmpty()), + "Cannot serialize fileScanTasks in fetchingPlanningResultResponse without specsById"); + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + public static FetchPlanningResultResponse fromJson(String json) { + Preconditions.checkArgument(json != null, "Invalid response: fetchPanningResultResponse null"); + return JsonUtil.parse(json, FetchPlanningResultResponseParser::fromJson); + } + + public static FetchPlanningResultResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), + "Invalid response: fetchPanningResultResponse null or empty"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + return FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java new file mode 100644 index 000000000000..fd512ab3b499 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FetchScanTasksResponse implements TableScanResponse { + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + private FetchScanTasksResponse( + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + validate(); + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void validate() { + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + Preconditions.checkArgument( + planTasks() != null || fileScanTasks() != null, + "Invalid response: planTasks and fileScanTask cannot both be null"); + } + + public static class Builder { + private Builder() {} + + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + public Builder withPlanTasks(List tasks) { + this.planTasks = tasks; + return this; + } + + public Builder withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return this; + } + + public Builder withDeleteFiles(List deletes) { + this.deleteFiles = deletes; + return this; + } + + public Builder withSpecsById(Map specs) { + this.specsById = specs; + return this; + } + + public FetchScanTasksResponse build() { + return new FetchScanTasksResponse(planTasks, fileScanTasks, deleteFiles, specsById); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java new file mode 100644 index 000000000000..eefd165c4960 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksResponseParser { + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchScanTasksResponseParser() {} + + public static String toJson(FetchScanTasksResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchScanTasksResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchScanTasksResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(response != null, "Invalid response: fetchScanTasksResponse null"); + Preconditions.checkArgument( + response.specsById() != null + || (response.fileScanTasks() == null || response.fileScanTasks().isEmpty()), + "Cannot serialize fileScanTasks in fetchScanTasksResponse without specsById"); + gen.writeStartObject(); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + public static FetchScanTasksResponse fromJson(String json) { + Preconditions.checkArgument(json != null, "Cannot parse fetchScanTasks response from null"); + return JsonUtil.parse(json, FetchScanTasksResponseParser::fromJson); + } + + public static FetchScanTasksResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), "Invalid response: fetchScanTasksResponse null"); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + return FetchScanTasksResponse.builder() + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java new file mode 100644 index 000000000000..dbe0efaf1de8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; + +public class PlanTableScanResponse implements TableScanResponse { + private final PlanStatus planStatus; + private final String planId; + private final List planTasks; + private final List fileScanTasks; + private final List deleteFiles; + private final Map specsById; + + private PlanTableScanResponse( + PlanStatus planStatus, + String planId, + List planTasks, + List fileScanTasks, + List deleteFiles, + Map specsById) { + this.planStatus = planStatus; + this.planId = planId; + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public String planId() { + return planId; + } + + public List planTasks() { + return planTasks; + } + + public List fileScanTasks() { + return fileScanTasks; + } + + public List deleteFiles() { + return deleteFiles; + } + + public Map specsById() { + return specsById; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("planStatus", planStatus) + .add("planId", planId) + .toString(); + } + + @Override + public void validate() { + Preconditions.checkArgument( + planStatus() != null, "Invalid response: plan status must be defined"); + Preconditions.checkArgument( + planStatus() != PlanStatus.SUBMITTED || planId() != null, + "Invalid response: plan id should be defined when status is 'submitted'"); + Preconditions.checkArgument( + planStatus() != PlanStatus.CANCELLED, + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + Preconditions.checkArgument( + planStatus() == PlanStatus.SUBMITTED || planId() == null, + "Invalid response: plan id can only be returned in a 'submitted' status"); + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Builder() {} + + private PlanStatus planStatus; + private String planId; + private List planTasks; + private List fileScanTasks; + private List deleteFiles; + private Map specsById; + + public Builder withPlanStatus(PlanStatus status) { + this.planStatus = status; + return this; + } + + public Builder withPlanId(String id) { + this.planId = id; + return this; + } + + public Builder withPlanTasks(List tasks) { + this.planTasks = tasks; + return this; + } + + public Builder withFileScanTasks(List tasks) { + this.fileScanTasks = tasks; + return this; + } + + public Builder withDeleteFiles(List deletes) { + this.deleteFiles = deletes; + return this; + } + + public Builder withSpecsById(Map specs) { + this.specsById = specs; + return this; + } + + public PlanTableScanResponse build() { + return new PlanTableScanResponse( + planStatus, planId, planTasks, fileScanTasks, deleteFiles, specsById); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java new file mode 100644 index 000000000000..25d8f11d9ac4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_ID = "plan-id"; + private static final String PLAN_TASKS = "plan-tasks"; + + private PlanTableScanResponseParser() {} + + public static String toJson(PlanTableScanResponse response) { + return toJson(response, false); + } + + public static String toJson(PlanTableScanResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(PlanTableScanResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != response, "Invalid response: planTableScanResponse null"); + Preconditions.checkArgument( + response.planStatus() != null, "Invalid response: status can not be null"); + Preconditions.checkArgument( + response.specsById() != null, "Cannot serialize planTableScanResponse without specsById"); + + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + + if (response.planId() != null) { + gen.writeStringField(PLAN_ID, response.planId()); + } + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + + gen.writeEndObject(); + } + + public static PlanTableScanResponse fromJson(String json) { + Preconditions.checkArgument( + json != null, "Cannot parse planTableScan response from empty or null object"); + return JsonUtil.parse(json, PlanTableScanResponseParser::fromJson); + } + + public static PlanTableScanResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), + "Cannot parse planTableScan response from empty or null object"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + String planId = JsonUtil.getStringOrNull(PLAN_ID, json); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + + return PlanTableScanResponse.builder() + .withPlanId(planId) + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponse.java new file mode 100644 index 000000000000..4213b50881b9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponse.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import org.apache.iceberg.rest.RESTResponse; + +public interface TableScanResponse extends RESTResponse {} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponseParser.java new file mode 100644 index 000000000000..0e111755eaa9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponseParser.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ContentFileParser; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RESTFileScanTaskParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.JsonUtil; + +class TableScanResponseParser { + + private TableScanResponseParser() {} + + static final String FILE_SCAN_TASKS = "file-scan-tasks"; + static final String DELETE_FILES = "delete-files"; + + public static List parseDeleteFiles(JsonNode node) { + if (node.has(DELETE_FILES)) { + JsonNode deleteFiles = JsonUtil.get(DELETE_FILES, node); + Preconditions.checkArgument( + deleteFiles.isArray(), "Cannot parse delete files from non-array: %s", deleteFiles); + ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); + for (JsonNode deleteFileNode : deleteFiles) { + DeleteFile deleteFile = + (DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode); + deleteFilesBuilder.add(deleteFile); + } + return deleteFilesBuilder.build(); + } + + return null; + } + + public static List parseFileScanTasks(JsonNode node, List deleteFiles) { + if (node.has(FILE_SCAN_TASKS)) { + JsonNode scanTasks = JsonUtil.get(FILE_SCAN_TASKS, node); + Preconditions.checkArgument( + scanTasks.isArray(), "Cannot parse file scan tasks from non-array: %s", scanTasks); + List fileScanTaskList = Lists.newArrayList(); + for (JsonNode fileScanTaskNode : scanTasks) { + FileScanTask fileScanTask = RESTFileScanTaskParser.fromJson(fileScanTaskNode, deleteFiles); + fileScanTaskList.add(fileScanTask); + } + + return fileScanTaskList; + } + + return null; + } + + public static void serializeScanTasks( + List fileScanTasks, + List deleteFiles, + Map specsById, + JsonGenerator gen) + throws IOException { + Map deleteFilePathToIndex = Maps.newHashMap(); + if (deleteFiles != null) { + Preconditions.checkArgument( + specsById != null, "Cannot serialize response without specs by ID defined"); + gen.writeArrayFieldStart(DELETE_FILES); + for (int i = 0; i < deleteFiles.size(); i++) { + DeleteFile deleteFile = deleteFiles.get(i); + deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); + ContentFileParser.unboundContentFileToJson( + deleteFiles.get(i), specsById.get(deleteFile.specId()), gen); + } + gen.writeEndArray(); + } + + if (fileScanTasks != null) { + gen.writeArrayFieldStart(FILE_SCAN_TASKS); + Set deleteFileReferences = Sets.newHashSet(); + for (FileScanTask fileScanTask : fileScanTasks) { + if (deleteFiles != null) { + for (DeleteFile taskDelete : fileScanTask.deletes()) { + deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); + } + } + + PartitionSpec spec = specsById.get(fileScanTask.file().specId()); + Preconditions.checkArgument( + spec != null, + "Cannot serialize scan task with unknown spec %s", + fileScanTask.file().specId()); + RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, spec, gen); + } + gen.writeEndArray(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index a0b52b346bf3..78680801433e 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -63,7 +64,9 @@ public class TestBase { public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", BUCKETS_NUMBER).build(); - static final DataFile FILE_A = + public static final Map PARTITION_SPECS_BY_ID = Map.of(0, SPEC); + + public static final DataFile FILE_A = DataFiles.builder(SPEC) .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(10) @@ -77,7 +80,7 @@ public class TestBase { .withPartitionPath("data_bucket=0") // easy way to set partition data for now .withRecordCount(1) .build(); - static final DeleteFile FILE_A_DELETES = + public static final DeleteFile FILE_A_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofPositionDeletes() .withPath("/path/to/data-a-deletes.parquet") @@ -86,7 +89,7 @@ public class TestBase { .withRecordCount(1) .build(); // Equality delete files. - static final DeleteFile FILE_A2_DELETES = + public static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofEqualityDeletes(1) .withPath("/path/to/data-a2-deletes.parquet") diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 5402a13d7d4b..6fd0f81b53ee 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -38,6 +38,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.FilesTable; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.ReplaceSortOrder; @@ -142,6 +143,16 @@ public abstract class CatalogTests { .withRecordCount(2) // needs at least one record or else metrics will filter it out .build(); + protected static final Namespace REST_DB = Namespace.of("restDB"); + public static final TableIdentifier TABLE_COMPLETED_WITH_FILE_SCAN_TASK = + TableIdentifier.of(REST_DB, "table_completed_with_file_scan_task"); + public static final TableIdentifier TABLE_SUBMITTED_WITH_FILE_SCAN_TASK = + TableIdentifier.of(REST_DB, "table_submitted_with_file_scan_task"); + public static final TableIdentifier TABLE_COMPLETED_WITH_PLAN_TASK = + TableIdentifier.of(REST_DB, "table_completed_with_plan_task"); + public static final TableIdentifier TABLE_COMPLETED_WITH_NESTED_PLAN_TASK = + TableIdentifier.of(REST_DB, "table_completed_with_nested_plan_task"); + protected abstract C catalog(); protected boolean supportsNamespaceProperties() { @@ -2765,6 +2776,36 @@ public void assertFilesPartitionSpec(Table table) { } } + public void assertBoundFileScanTasks(Table table, PartitionSpec partitionSpec) { + PartitionData partitionData = new PartitionData(partitionSpec.partitionType()); + try (CloseableIterable tasks = table.newScan().planFiles()) { + Streams.stream(tasks) + .forEach( + task -> { + // assert file scan task spec being bound + assertThat(task.spec().equals(partitionSpec)); + // assert data file spec being bound + assertThat(task.file().partition().equals(partitionData)); + // assert all delete files in task are bound + task.deletes() + .forEach( + deleteFile -> assertThat(deleteFile.partition().equals(partitionData))); + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void assertBoundFiles(Table table, DataFile dataFile) { + try (CloseableIterable tasks = table.newScan().planFiles()) { + Streams.stream(tasks) + .map(FileScanTask::file) + .forEach(file -> assertThat(file.partition()).isEqualTo(dataFile.partition())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private List concat(List starting, Namespace... additional) { List namespaces = Lists.newArrayList(); namespaces.addAll(starting); diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index 6477dfcd00eb..511a62b4b8da 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -18,15 +18,23 @@ */ package org.apache.iceberg.rest; +import static org.apache.iceberg.catalog.CatalogTests.TABLE_COMPLETED_WITH_FILE_SCAN_TASK; +import static org.apache.iceberg.catalog.CatalogTests.TABLE_COMPLETED_WITH_NESTED_PLAN_TASK; +import static org.apache.iceberg.catalog.CatalogTests.TABLE_COMPLETED_WITH_PLAN_TASK; +import static org.apache.iceberg.catalog.CatalogTests.TABLE_SUBMITTED_WITH_FILE_SCAN_TASK; + import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.Catalog; @@ -47,13 +55,17 @@ import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; @@ -62,12 +74,15 @@ import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; import org.apache.iceberg.rest.responses.ListNamespacesResponse; import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -97,12 +112,16 @@ public class RESTCatalogAdapter implements RESTClient { private final Catalog catalog; private final SupportsNamespaces asNamespaceCatalog; private final ViewCatalog asViewCatalog; + private Map> planToFileScanTasks; + private Map planToPlanTasks; public RESTCatalogAdapter(Catalog catalog) { this.catalog = catalog; this.asNamespaceCatalog = catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null; this.asViewCatalog = catalog instanceof ViewCatalog ? (ViewCatalog) catalog : null; + this.planToFileScanTasks = Maps.newHashMap(); + this.planToPlanTasks = Maps.newHashMap(); } enum HTTPMethod { @@ -138,6 +157,21 @@ enum Route { CreateTableRequest.class, LoadTableResponse.class), LOAD_TABLE(HTTPMethod.GET, ResourcePaths.V1_TABLE, null, LoadTableResponse.class), + PLAN_TABLE_SCAN( + HTTPMethod.POST, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan", + PlanTableScanRequest.class, + PlanTableScanResponse.class), + FETCH_PLANNING_RESULT( + HTTPMethod.GET, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}", + null, + FetchPlanningResultResponse.class), + FETCH_SCAN_TASKS( + HTTPMethod.POST, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks", + FetchScanTasksRequest.class, + FetchScanTasksResponse.class), REGISTER_TABLE( HTTPMethod.POST, ResourcePaths.V1_TABLE_REGISTER, @@ -498,6 +532,154 @@ public T handleRequest( break; } + case PLAN_TABLE_SCAN: + { + TableIdentifier ident = tableIdentFromPathVars(vars); + PlanTableScanRequest request = castRequest(PlanTableScanRequest.class, body); + Table table = catalog.loadTable(ident); + TableScan tableScan = table.newScan(); + + if (request.snapshotId() != null) { + tableScan.useSnapshot(request.snapshotId()); + } + if (request.select() != null) { + tableScan.select(request.select()); + } + if (request.filter() != null) { + tableScan.filter(request.filter()); + } + if (request.statsFields() != null) { + tableScan.includeColumnStats(request.statsFields()); + } + tableScan.caseSensitive(request.caseSensitive()); + + List fileScanTasks = Lists.newArrayList(); + CloseableIterable returnedTasks = tableScan.planFiles(); + returnedTasks.forEach(task -> fileScanTasks.add(task)); + + if (ident.equals(TABLE_COMPLETED_WITH_FILE_SCAN_TASK)) { + return castResponse( + responseType, + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withFileScanTasks(fileScanTasks) + .withSpecsById(table.specs()) + .build()); + } + + if (ident.equals(TABLE_SUBMITTED_WITH_FILE_SCAN_TASK)) { + // this is the case where we return a plan-id, then call fetchPlanningResult to get the + // tasks at a later point + String planId = "plan-id-" + UUID.randomUUID(); + planToFileScanTasks.put(planId, fileScanTasks); + return castResponse( + responseType, + PlanTableScanResponse.builder() + .withPlanId(planId) + .withPlanStatus(PlanStatus.SUBMITTED) + .withSpecsById(table.specs()) + .build()); + } + + if (ident.equals(TABLE_COMPLETED_WITH_PLAN_TASK)) { + // this is the case where we return a list of plan-task, and then call fetchScanTasks + // for each + List planTasks = + List.of("plan-task-" + UUID.randomUUID(), "plan-task-" + UUID.randomUUID()); + planTasks.forEach(task -> planToFileScanTasks.put(task, fileScanTasks)); + return castResponse( + responseType, + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(planTasks) + .withSpecsById(table.specs()) + .build()); + } + + if (ident.equals(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK)) { + // this is the case where our plan tasks, can return additional plan tasks, and those + // can return file scan tasks. + List outerPlanTasks = + List.of( + "outer-plan-task-" + UUID.randomUUID(), "outer-plan-task-" + UUID.randomUUID()); + List innerPlanTasks = + List.of( + "inner-plan-task-" + UUID.randomUUID(), "inner-plan-task-" + UUID.randomUUID()); + + for (int i = 0; i < outerPlanTasks.size(); i++) { + planToPlanTasks.put(outerPlanTasks.get(i), innerPlanTasks.get(i)); + planToFileScanTasks.put(innerPlanTasks.get(i), fileScanTasks); + } + + return castResponse( + responseType, + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(outerPlanTasks) + .withSpecsById(table.specs()) + .build()); + } + break; + } + + case FETCH_PLANNING_RESULT: + { + TableIdentifier ident = tableIdentFromPathVars(vars); + Table table = catalog.loadTable(ident); + if (ident.equals(TABLE_SUBMITTED_WITH_FILE_SCAN_TASK)) { + String planId = planIDFromPathVars(vars); + return castResponse( + responseType, + FetchPlanningResultResponse.builder() + .withPlanStatus(PlanStatus.fromName("completed")) + .withFileScanTasks(planToFileScanTasks.get(planId)) + .withSpecsById(table.specs()) + .build()); + } + break; + } + + case FETCH_SCAN_TASKS: + { + TableIdentifier ident = tableIdentFromPathVars(vars); + Table table = catalog.loadTable(ident); + FetchScanTasksRequest request = castRequest(FetchScanTasksRequest.class, body); + if (ident.equals(TABLE_COMPLETED_WITH_PLAN_TASK)) { + return castResponse( + responseType, + FetchScanTasksResponse.builder() + .withFileScanTasks(planToFileScanTasks.get(request.planTask())) + .withSpecsById(table.specs()) + .build()); + } + + if (ident.equals(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK)) { + // this is the case where we return another round of nested plan tasks + if (planToPlanTasks.containsKey(request.planTask())) { + String innerPlanTask = planToPlanTasks.remove(request.planTask()); + return castResponse( + responseType, + FetchScanTasksResponse.builder() + .withPlanTasks(List.of(innerPlanTask)) + .withSpecsById(table.specs()) + .build()); + } + + if (planToFileScanTasks.containsKey(request.planTask())) { + // this is the case where we get from nested plan tasks the file scan tasks + List fileScanTasksFromPlanTask = + planToFileScanTasks.remove(request.planTask()); + return castResponse( + responseType, + FetchScanTasksResponse.builder() + .withFileScanTasks(fileScanTasksFromPlanTask) + .withSpecsById(table.specs()) + .build()); + } + } + break; + } + default: if (responseType == OAuthTokenResponse.class) { return castResponse(responseType, handleOAuthRequest(body)); @@ -687,4 +869,8 @@ private static TableIdentifier viewIdentFromPathVars(Map pathVar return TableIdentifier.of( namespaceFromPathVars(pathVars), RESTUtil.decodeString(pathVars.get("view"))); } + + private static String planIDFromPathVars(Map pathVars) { + return RESTUtil.decodeString(pathVars.get("plan-id")); + } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 1c15cfab43a3..53804c1f590e 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -83,6 +83,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.rest.responses.TableScanResponse; import org.apache.iceberg.types.Types; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; @@ -150,6 +151,13 @@ public T execute( T response = super.execute( method, path, queryParams, request, responseType, headers, errorHandler); + + if (response instanceof TableScanResponse) { + // This is for the case where the response does not roundTrip + // the plan table related responses follow this case + return response; + } + T responseAfterSerialization = roundTripSerialize(response, "response"); return responseAfterSerialization; } @@ -2683,6 +2691,71 @@ public void testNoCleanupForNonCleanableReplaceTransaction() { .isTrue(); } + @Test + public void testPlanTableScanWithCompletedStatusAndFileScanTask() throws IOException { + Table table = createRESTTableAndInsertData(TABLE_COMPLETED_WITH_FILE_SCAN_TASK); + assertBoundFileScanTasks(table, SPEC); + } + + @Test + public void testPlanTableScanAndFetchPlanningResultWithSubmittedStatusAndFileScanTask() + throws IOException { + Table table = createRESTTableAndInsertData(TABLE_SUBMITTED_WITH_FILE_SCAN_TASK); + assertBoundFileScanTasks(table, SPEC); + } + + @Test + public void testPlanTableScanAndFetchScanTasksWithCompletedStatusAndPlanTask() + throws IOException { + Table table = createRESTTableAndInsertData(TABLE_COMPLETED_WITH_PLAN_TASK); + assertBoundFileScanTasks(table, SPEC); + } + + @Test + public void testPlanTableScanAndFetchScanTasksWithCompletedStatusAndNestedPlanTasks() + throws IOException { + Table table = createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK); + assertBoundFileScanTasks(table, SPEC); + } + + public Table createRESTTableAndInsertData(TableIdentifier tableIdentifier) throws IOException { + SessionCatalog.SessionContext context = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()); + RESTCatalog catalog = + new RESTCatalog( + context, + (config) -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()); + catalog.initialize( + "test", + ImmutableMap.of( + RESTSessionCatalog.REST_SERVER_PLANNING_ENABLED, + "true", + CatalogProperties.URI, + httpServer.getURI().toString(), + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO", + "credential", + "catalog:secret")); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(tableIdentifier.namespace()); + } + + Table table = + catalog + .buildTable(tableIdentifier, SCHEMA) + .withProperty("table.rest-scan-planning", "true") + .withPartitionSpec(SPEC) + .create(); + + table.newAppend().appendFile(FILE_A).commit(); + return table; + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequest.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequest.java new file mode 100644 index 000000000000..a68f0d3d2b29 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestFetchScanTasksRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.jupiter.api.Test; + +public class TestFetchScanTasksRequest { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchScanTasksRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid request: fetchScanTasks request null"); + + assertThatThrownBy(() -> FetchScanTasksRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid request: fetchScanTasks null"); + } + + @Test + public void roundTripSerdeWithPlanTask() { + FetchScanTasksRequest request = new FetchScanTasksRequest("somePlanTask"); + String expectedJson = "{\"plan-task\":\"somePlanTask\"}"; + String json = FetchScanTasksRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on FetchScanTasksRequest because we don't implement + // equals/hashcode + assertThat( + FetchScanTasksRequestParser.toJson(FetchScanTasksRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequest.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequest.java new file mode 100644 index 000000000000..43cf0d8b3aa4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestPlanTableScanRequest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +public class TestPlanTableScanRequest { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> PlanTableScanRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid request: planTableScanRequest null"); + + assertThatThrownBy(() -> PlanTableScanRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid request: planTableScanRequest null"); + } + + @Test + public void roundTripSerdeWithEmptyRequestAndDefaultsPresent() { + PlanTableScanRequest request = new PlanTableScanRequest.Builder().build(); + + String expectedJson = "{" + "\"case-sensitive\":true," + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithSelectField() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withSelect(Lists.newArrayList("col1", "col2")) + .build(); + + String expectedJson = + "{\"snapshot-id\":1," + + "\"select\":[\"col1\",\"col2\"]," + + "\"case-sensitive\":true," + + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithFilterField() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withFilter(Expressions.alwaysFalse()) + .build(); + + String expectedJson = + "{\"snapshot-id\":1," + + "\"filter\":\"false\"," + + "\"case-sensitive\":true," + + "\"use-snapshot-schema\":false}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithAllFieldsInvalidRequest() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSnapshotId(1L) + .withSelect(Lists.newArrayList("col1", "col2")) + .withFilter(Expressions.alwaysTrue()) + .withStartSnapshotId(1L) + .withEndSnapshotId(2L) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("col1", "col2")) + .build(); + + assertThatThrownBy(() -> PlanTableScanRequestParser.toJson(request)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided"); + } + + @Test + public void roundTripSerdeWithAllFieldsExceptSnapShotId() { + PlanTableScanRequest request = + new PlanTableScanRequest.Builder() + .withSelect(Lists.newArrayList("col1", "col2")) + .withFilter(Expressions.alwaysTrue()) + .withStartSnapshotId(1L) + .withEndSnapshotId(2L) + .withCaseSensitive(false) + .withUseSnapshotSchema(true) + .withStatsFields(Lists.newArrayList("col1", "col2")) + .build(); + + String expectedJson = + "{\"start-snapshot-id\":1," + + "\"end-snapshot-id\":2," + + "\"select\":[\"col1\",\"col2\"]," + + "\"filter\":\"true\"," + + "\"case-sensitive\":false," + + "\"use-snapshot-schema\":true," + + "\"stats-fields\":[\"col1\",\"col2\"]}"; + + String json = PlanTableScanRequestParser.toJson(request, false); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanRequestParser.toJson(PlanTableScanRequestParser.fromJson(json), false)) + .isEqualTo(expectedJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..215ea1481da0 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.rest.PlanStatus; +import org.junit.jupiter.api.Test; + +public class TestFetchPlanningResultResponseParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchPlanningResultResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchPanningResultResponse null"); + + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchPanningResultResponse null or empty"); + } + + @Test + public void roundTripSerdeWithEmptyObject() { + assertThatThrownBy( + () -> + FetchPlanningResultResponseParser.toJson( + FetchPlanningResultResponse.builder().build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status: null"); + + String emptyJson = "{ }"; + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(emptyJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchPanningResultResponse null or empty"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatus() { + String invalidStatusJson = "{\"plan-status\": \"someStatus\"}"; + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(invalidStatusJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status name: someStatus"); + } + + @Test + public void roundTripSerdeWithValidSubmittedStatus() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + FetchPlanningResultResponse response = + FetchPlanningResultResponse.builder().withPlanStatus(planStatus).build(); + + String expectedJson = "{\"plan-status\":\"submitted\"}"; + String json = FetchPlanningResultResponseParser.toJson(response); + assertThat(json).isEqualTo(expectedJson); + + FetchPlanningResultResponse fromResponse = FetchPlanningResultResponseParser.fromJson(json); + assertThat(FetchPlanningResultResponseParser.toJson(fromResponse)).isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(List.of("task1", "task2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + "\"plan-tasks\":[\"task1\",\"task2\"]}"; + + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy(() -> FetchPlanningResultResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void roundTripSerdeWithValidStatusAndFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + PlanStatus planStatus = PlanStatus.fromName("completed"); + FetchPlanningResultResponse response = + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + // assume this has been set + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = FetchPlanningResultResponseParser.toJson(response, false); + assertThat(json).isEqualTo(expectedToJson); + + // make an unbound json where you expect to not have partitions for the data file, + // delete files as service does not send partition spec + String expectedFromJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + FetchPlanningResultResponse fromResponse = FetchPlanningResultResponseParser.fromJson(json); + // Need to make a new response with partitionSpec set + FetchPlanningResultResponse copyResponse = + FetchPlanningResultResponse.builder() + .withPlanStatus(fromResponse.planStatus()) + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(FetchPlanningResultResponseParser.toJson(copyResponse, false)) + .isEqualTo(expectedFromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java new file mode 100644 index 000000000000..4ddfbd1fcced --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.junit.jupiter.api.Test; + +public class TestFetchScanTasksResponseParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> FetchScanTasksResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchScanTasksResponse null"); + + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchScanTasksResponse null"); + } + + @Test + public void roundTripSerdeWithEmptyObject() { + assertThatThrownBy( + () -> FetchScanTasksResponseParser.toJson(FetchScanTasksResponse.builder().build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTasks and fileScanTask cannot both be null"); + + String emptyJson = "{ }"; + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson(emptyJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: fetchScanTasksResponse null"); + } + + @Test + public void roundTripSerdeWithPlanTasks() { + String expectedJson = "{\"plan-tasks\":[\"task1\",\"task2\"]}"; + String json = + FetchScanTasksResponseParser.toJson( + FetchScanTasksResponse.builder().withPlanTasks(List.of("task1", "task2")).build()); + assertThat(json).isEqualTo(expectedJson); + + FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(FetchScanTasksResponseParser.toJson(fromResponse, false)).isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithDeleteFilesNoFileScanTasksPresent() { + assertThatThrownBy( + () -> + FetchScanTasksResponse.builder() + .withPlanTasks(List.of("task1", "task2")) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-tasks\":[\"task1\",\"task2\"]," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void roundTripSerdeWithFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + FetchScanTasksResponse response = + FetchScanTasksResponse.builder() + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + // assume you have set this already + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{" + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = FetchScanTasksResponseParser.toJson(response, false); + assertThat(json).isEqualTo(expectedToJson); + + // make an unbound json where you expect to not have partitions for the data file, + // delete files as service does not send parition spec + String expectedFromJson = + "{" + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); + // Need to make a new response with partitionSpec set + FetchScanTasksResponse copyResponse = + FetchScanTasksResponse.builder() + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(FetchScanTasksResponseParser.toJson(copyResponse, false)) + .isEqualTo(expectedFromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java new file mode 100644 index 000000000000..ef39e47bdc31 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.rest.PlanStatus; +import org.junit.jupiter.api.Test; + +public class TestPlanTableScanResponseParser { + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: planTableScanResponse null"); + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse planTableScan response from empty or null object"); + } + + @Test + public void roundTripSerdeWithEmptyObject() { + + assertThatThrownBy(() -> PlanTableScanResponse.builder().build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan status must be defined"); + + String emptyJson = "{ }"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(emptyJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse planTableScan response from empty or null object"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatus() { + String invalidStatusJson = "{\"plan-status\": \"someStatus\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidStatusJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status name: someStatus"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithoutPlanId() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); + + String invalidJson = "{\"plan-status\":\"submitted\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusCancelled() { + PlanStatus planStatus = PlanStatus.fromName("cancelled"); + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); + + String invalidJson = "{\"plan-status\":\"cancelled\"}"; + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withPlanTasks(List.of("task1", "task2")) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"plan-id\":\"somePlanId\"," + + "\"plan-tasks\":[\"task1\",\"task2\"]}"; + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); + } + + @Test + public void roundTripSerdeWithInvalidPlanIdWithIncorrectStatus() { + PlanStatus planStatus = PlanStatus.fromName("failed"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id can only be returned in a 'submitted' status"); + + String invalidJson = "{\"plan-status\":\"failed\"," + "\"plan-id\":\"somePlanId\"}"; + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid response: plan id can only be returned in a 'submitted' status"); + } + + @Test + public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { + PlanStatus planStatus = PlanStatus.fromName("submitted"); + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + + String invalidJson = + "{\"plan-status\":\"submitted\"," + + "\"plan-id\":\"somePlanId\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]" + + "}"; + + assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + @Test + public void roundTripSerdeWithValidStatusAndFileScanTasks() { + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true); + FileScanTask fileScanTask = + new BaseFileScanTask( + FILE_A, + new DeleteFile[] {FILE_A_DELETES}, + SchemaParser.toJson(SCHEMA), + PartitionSpecParser.toJson(SPEC), + residualEvaluator); + + PlanStatus planStatus = PlanStatus.fromName("completed"); + PlanTableScanResponse response = + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withFileScanTasks(List.of(fileScanTask)) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + String expectedToJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + String json = PlanTableScanResponseParser.toJson(response); + assertThat(json).isEqualTo(expectedToJson); + + String expectedFromJson = + "{\"plan-status\":\"completed\"," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}]," + + "\"file-scan-tasks\":[" + + "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"delete-file-references\":[0]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" + + "}"; + + PlanTableScanResponse fromResponse = PlanTableScanResponseParser.fromJson(json); + PlanTableScanResponse copyResponse = + PlanTableScanResponse.builder() + .withPlanStatus(fromResponse.planStatus()) + .withPlanId(fromResponse.planId()) + .withPlanTasks(fromResponse.planTasks()) + .withDeleteFiles(fromResponse.deleteFiles()) + .withFileScanTasks(fromResponse.fileScanTasks()) + .withSpecsById(PARTITION_SPECS_BY_ID) + .build(); + + // can't do an equality comparison on PlanTableScanRequest because we don't implement + // equals/hashcode + assertThat(PlanTableScanResponseParser.toJson(copyResponse)).isEqualTo(expectedFromJson); + } +}