diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java index 7c208cb7db685b..0d09a9ef35ced8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java @@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory { public static final String EXPIRE_SNAPSHOTS = "expire_snapshots"; public static final String REWRITE_DATA_FILES = "rewrite_data_files"; public static final String PUBLISH_CHANGES = "publish_changes"; + public static final String REWRITE_MANIFESTS = "rewrite_manifests"; /** * Create an Iceberg-specific ExecuteAction instance. @@ -84,6 +85,9 @@ public static ExecuteAction createAction(String actionType, Map case PUBLISH_CHANGES: return new IcebergPublishChangesAction(properties, partitionNamesInfo, whereCondition); + case REWRITE_MANIFESTS: + return new IcebergRewriteManifestsAction(properties, partitionNamesInfo, + whereCondition); default: throw new DdlException("Unsupported Iceberg procedure: " + actionType + ". Supported procedures: " + String.join(", ", getSupportedActions())); @@ -104,7 +108,8 @@ public static String[] getSupportedActions() { FAST_FORWARD, EXPIRE_SNAPSHOTS, REWRITE_DATA_FILES, - PUBLISH_CHANGES + PUBLISH_CHANGES, + REWRITE_MANIFESTS }; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java new file mode 100644 index 00000000000000..430e9fe9d5e22d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.action; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.ArgumentParsers; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.iceberg.rewrite.RewriteManifestExecutor; +import org.apache.doris.info.PartitionNamesInfo; +import org.apache.doris.nereids.trees.expressions.Expression; + +import com.google.common.collect.Lists; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Action for rewriting Iceberg manifest files to optimize metadata layout + */ +public class IcebergRewriteManifestsAction extends BaseIcebergAction { + private static final Logger LOG = LogManager.getLogger(IcebergRewriteManifestsAction.class); + public static final String SPEC_ID = "spec_id"; + + public IcebergRewriteManifestsAction(Map properties, + Optional partitionNamesInfo, + Optional whereCondition) { + super("rewrite_manifests", properties, partitionNamesInfo, whereCondition); + } + + @Override + protected void registerIcebergArguments() { + namedArguments.registerOptionalArgument(SPEC_ID, + "Spec id of the manifests to rewrite (defaults to current spec id)", + null, + ArgumentParsers.intRange(SPEC_ID, 0, Integer.MAX_VALUE)); + } + + @Override + protected void validateIcebergAction() throws UserException { + validateNoPartitions(); + validateNoWhereCondition(); + } + + @Override + protected List executeAction(TableIf table) throws UserException { + try { + Table icebergTable = ((IcebergExternalTable) table).getIcebergTable(); + Snapshot current = icebergTable.currentSnapshot(); + if (current == null) { + // No current snapshot means the table is empty, no manifests to rewrite + return Lists.newArrayList("0", "0"); + } + + // Get optional spec_id parameter + Integer specId = namedArguments.getInt(SPEC_ID); + + // Execute rewrite operation + RewriteManifestExecutor executor = new RewriteManifestExecutor(); + RewriteManifestExecutor.Result result = executor.execute( + icebergTable, + (ExternalTable) table, + specId); + + return result.toStringList(); + } catch (Exception e) { + LOG.warn("Failed to rewrite manifests for table: {}", table.getName(), e); + throw new UserException("Rewrite manifests failed: " + e.getMessage(), e); + } + } + + @Override + protected List getResultSchema() { + return Lists.newArrayList( + new Column("rewritten_manifests_count", Type.INT, false, + "Number of manifests which were re-written by this command"), + new Column("added_manifests_count", Type.INT, false, + "Number of new manifest files which were written by this command") + ); + } + + @Override + public String getDescription() { + return "Rewrite Iceberg manifest files to optimize metadata layout"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java new file mode 100644 index 00000000000000..f2e5ab77adbfde --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.rewrite; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; + +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.RewriteManifests; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * Executor for manifest rewrite operations + */ +public class RewriteManifestExecutor { + private static final Logger LOG = LogManager.getLogger(RewriteManifestExecutor.class); + + public static class Result { + private final int rewrittenCount; + private final int addedCount; + + public Result(int rewrittenCount, int addedCount) { + this.rewrittenCount = rewrittenCount; + this.addedCount = addedCount; + } + + public java.util.List toStringList() { + return java.util.Arrays.asList(String.valueOf(rewrittenCount), + String.valueOf(addedCount)); + } + } + + /** + * Execute manifest rewrite using Iceberg RewriteManifests API + */ + public Result execute(Table table, ExternalTable extTable, Integer specId) throws UserException { + try { + // Get current snapshot and return early if table is empty + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot == null) { + return new Result(0, 0); + } + + // Collect manifests before rewrite and filter by specId if provided + List manifestsBefore = currentSnapshot.dataManifests(table.io()); + List manifestsBeforeTargeted = filterBySpecId(manifestsBefore, specId); + + int rewrittenCount = manifestsBeforeTargeted.size(); + + if (rewrittenCount == 0) { + return new Result(0, 0); + } + + // Configure rewrite operation, optionally restricting manifests by specId + RewriteManifests rm = table.rewriteManifests(); + + if (specId != null) { + final int targetSpecId = specId; + rm.rewriteIf(manifest -> manifest.partitionSpecId() == targetSpecId); + } + + // Commit manifest rewrite + rm.commit(); + + // Refresh snapshot after rewrite + Snapshot snapshotAfter = table.currentSnapshot(); + if (snapshotAfter == null) { + return new Result(rewrittenCount, 0); + } + + // Collect manifests after rewrite and filter by specId + List manifestsAfter = snapshotAfter.dataManifests(table.io()); + List manifestsAfterTargeted = filterBySpecId(manifestsAfter, specId); + + // Compute addedCount as newly produced manifests (path not in before set) + java.util.Set beforePaths = manifestsBeforeTargeted.stream() + .map(ManifestFile::path) + .collect(java.util.stream.Collectors.toSet()); + + int addedCount = (int) manifestsAfterTargeted.stream() + .map(ManifestFile::path) + .filter(path -> !beforePaths.contains(path)) + .count(); + + // Invalidate table cache to ensure metadata is refreshed + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable); + + return new Result(rewrittenCount, addedCount); + } catch (Exception e) { + LOG.warn("Failed to execute manifest rewrite for table: {}", extTable.getName(), e); + throw new UserException("Failed to rewrite manifests: " + e.getMessage(), e); + } + } + + private List filterBySpecId(List manifests, Integer specId) { + if (specId == null) { + return manifests; + } + final int targetSpecId = specId; + return manifests.stream() + .filter(manifest -> manifest.partitionSpecId() == targetSpecId) + .collect(java.util.stream.Collectors.toList()); + } +} diff --git a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out new file mode 100644 index 00000000000000..0d68ae09dc03a2 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out @@ -0,0 +1,44 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !before_basic_rewrite -- +1 item1 electronics 100 2024-01-01 +2 item2 electronics 200 2024-01-02 +3 item3 books 300 2024-01-03 +4 item4 books 400 2024-01-04 +5 item5 clothing 500 2024-01-05 +6 item6 clothing 600 2024-01-06 +7 item7 electronics 700 2024-01-07 +8 item8 electronics 800 2024-01-08 + +-- !after_basic_rewrite -- +1 item1 electronics 100 2024-01-01 +2 item2 electronics 200 2024-01-02 +3 item3 books 300 2024-01-03 +4 item4 books 400 2024-01-04 +5 item5 clothing 500 2024-01-05 +6 item6 clothing 600 2024-01-06 +7 item7 electronics 700 2024-01-07 +8 item8 electronics 800 2024-01-08 + +-- !before_partitioned_rewrite -- +1 item1 100 2024 1 +2 item2 200 2024 1 +3 item3 300 2024 2 +4 item4 400 2024 2 +5 item5 500 2024 3 +6 item6 600 2024 3 + +-- !after_partitioned_rewrite -- +1 item1 100 2024 1 +2 item2 200 2024 1 +3 item3 300 2024 2 +4 item4 400 2024 2 +5 item5 500 2024 3 +6 item6 600 2024 3 + +-- !after_spec_id_rewrite -- +1 item1 100 2024 1 15 +2 item2 200 2024 1 16 +3 item3 300 2024 2 17 +4 item4 400 2024 3 18 +5 item5 500 2024 3 19 + diff --git a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy new file mode 100644 index 00000000000000..de33deaeeeffbd --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy @@ -0,0 +1,364 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_rewrite_manifests", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalog_name = "test_iceberg_rewrite_manifests" + String db_name = "test_db" + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """switch ${catalog_name}""" + sql """CREATE DATABASE IF NOT EXISTS ${db_name} """ + sql """use ${db_name}""" + + // ===================================================================================== + // Test Case 1: Basic rewrite_manifests operation + // Tests the ability to rewrite multiple manifest files into fewer, optimized files + // ===================================================================================== + logger.info("Starting basic rewrite_manifests test case") + + def table_name = "test_rewrite_manifests_basic" + + // Clean up if table exists + sql """DROP TABLE IF EXISTS ${db_name}.${table_name}""" + + // Create a test table + sql """ + CREATE TABLE ${db_name}.${table_name} ( + id BIGINT, + name STRING, + category STRING, + value INT, + created_date DATE + ) ENGINE=iceberg + """ + logger.info("Created test table: ${table_name}") + + // Insert data in multiple single-row operations to create multiple manifest files + // Each INSERT operation typically creates a new manifest file + sql """INSERT INTO ${db_name}.${table_name} VALUES (1, 'item1', 'electronics', 100, '2024-01-01')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (2, 'item2', 'electronics', 200, '2024-01-02')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (3, 'item3', 'books', 300, '2024-01-03')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (4, 'item4', 'books', 400, '2024-01-04')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (5, 'item5', 'clothing', 500, '2024-01-05')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (6, 'item6', 'clothing', 600, '2024-01-06')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (7, 'item7', 'electronics', 700, '2024-01-07')""" + sql """INSERT INTO ${db_name}.${table_name} VALUES (8, 'item8', 'electronics', 800, '2024-01-08')""" + + // Verify data before rewrite + qt_before_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id""" + + // Check manifest count before rewrite + List> manifestsBefore = sql """ + SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests + """ + logger.info("Manifest count before rewrite: ${manifestsBefore}") + + // Execute basic rewrite_manifests operation (no parameters - rewrite all manifests) + List> rewriteResult = sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE rewrite_manifests() + """ + logger.info("Basic rewrite_manifests result: ${rewriteResult}") + + // Verify the result structure + assertTrue(rewriteResult.size() == 1, "Expected exactly 1 result row") + assertTrue(rewriteResult[0].size() == 2, "Expected 2 columns in result") + + // Extract rewritten and added manifest counts + int rewrittenCount = rewriteResult[0][0] as int + int addedCount = rewriteResult[0][1] as int + + logger.info("Rewritten manifests: ${rewrittenCount}, Added manifests: ${addedCount}") + assertTrue(rewrittenCount > 0, "Should have rewritten at least 1 manifest") + assertTrue(addedCount >= 0, "Added count should be non-negative") + // Note: addedCount can be 0 if Iceberg determines manifests are already optimal + // or if it reuses existing manifest files + if (addedCount > 0) { + assertTrue(addedCount <= rewrittenCount, "Added count should be <= rewritten count (consolidation)") + } + + // Verify data integrity after rewrite + qt_after_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id""" + + // Check manifest count after rewrite (should be fewer or equal) + List> manifestsAfter = sql """ + SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests + """ + logger.info("Manifest count after rewrite: ${manifestsAfter}") + assertTrue(manifestsAfter[0][0] as int <= manifestsBefore[0][0] as int, + "Manifest count after rewrite should be <= count before") + + // ===================================================================================== + // Test Case 2: rewrite_manifests on partitioned table + // Tests manifest rewriting on a table with partition specifications + // ===================================================================================== + logger.info("Starting rewrite_manifests on partitioned table test case") + + def partitioned_table = "test_rewrite_manifests_partitioned" + + // Clean up if table exists + sql """DROP TABLE IF EXISTS ${db_name}.${partitioned_table}""" + + // Create a partitioned table + sql """ + CREATE TABLE ${db_name}.${partitioned_table} ( + id BIGINT, + name STRING, + value INT, + year INT, + month INT + ) ENGINE=iceberg + PARTITION BY (year, month)() + """ + logger.info("Created partitioned test table: ${partitioned_table}") + + // Insert data into different partitions to create multiple manifest files + sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (1, 'item1', 100, 2024, 1)""" + sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (2, 'item2', 200, 2024, 1)""" + sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (3, 'item3', 300, 2024, 2)""" + sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (4, 'item4', 400, 2024, 2)""" + sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (5, 'item5', 500, 2024, 3)""" + sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (6, 'item6', 600, 2024, 3)""" + + // Verify data before rewrite + qt_before_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER BY id""" + + // Check manifest count before rewrite + List> partitionedManifestsBefore = sql """ + SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests + """ + logger.info("Partitioned table manifest count before rewrite: ${partitionedManifestsBefore}") + + // Execute rewrite_manifests on partitioned table + List> partitionedRewriteResult = sql """ + ALTER TABLE ${catalog_name}.${db_name}.${partitioned_table} + EXECUTE rewrite_manifests() + """ + logger.info("Partitioned table rewrite_manifests result: ${partitionedRewriteResult}") + + // Verify result structure + assertTrue(partitionedRewriteResult.size() == 1, "Expected exactly 1 result row for partitioned table") + assertTrue(partitionedRewriteResult[0].size() == 2, "Expected 2 columns in result for partitioned table") + + int partitionedRewrittenCount = partitionedRewriteResult[0][0] as int + int partitionedAddedCount = partitionedRewriteResult[0][1] as int + + logger.info("Partitioned table - Rewritten manifests: ${partitionedRewrittenCount}, Added manifests: ${partitionedAddedCount}") + assertTrue(partitionedRewrittenCount > 0, "Partitioned table should have rewritten at least 1 manifest") + assertTrue(partitionedAddedCount >= 0, "Partitioned table added count should be non-negative") + + // Verify data integrity after rewrite + qt_after_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER BY id""" + + // Check manifest count after rewrite + List> partitionedManifestsAfter = sql """ + SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests + """ + logger.info("Partitioned table manifest count after rewrite: ${partitionedManifestsAfter}") + assertTrue(partitionedManifestsAfter[0][0] as int <= partitionedManifestsBefore[0][0] as int, + "Partitioned table manifest count after rewrite should be <= count before") + + // ===================================================================================== + // Test Case 3: rewrite_manifests with spec_id parameter + // Tests manifest rewriting for a specific partition spec + // ===================================================================================== + logger.info("Starting rewrite_manifests with spec_id test case") + + def spec_id_table = "test_rewrite_manifests_spec_id" + + // Clean up if table exists + sql """DROP TABLE IF EXISTS ${db_name}.${spec_id_table}""" + + // Create a partitioned table (this will have spec_id = 0) + sql """ + CREATE TABLE ${db_name}.${spec_id_table} ( + id BIGINT, + name STRING, + value INT, + year INT, + month INT, + day INT + ) ENGINE=iceberg + PARTITION BY (year, month)() + """ + logger.info("Created spec_id test table: ${spec_id_table}") + + // Insert data to create manifests with spec_id 0 + sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (1, 'item1', 100, 2024, 1, 15)""" + sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (2, 'item2', 200, 2024, 1, 16)""" + sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (3, 'item3', 300, 2024, 2, 17)""" + + // Check initial spec_id and manifest count + List> initialSpecs = sql """ + SELECT partition_spec_id, COUNT(*) as manifest_count + FROM ${spec_id_table}\$manifests + GROUP BY partition_spec_id + ORDER BY partition_spec_id + """ + logger.info("Initial spec IDs and manifest counts: ${initialSpecs}") + + // Add day as a new partition field to create spec_id = 1 + sql """ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} ADD PARTITION KEY day""" + + // Insert more data to create manifests with spec_id 1 + sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (4, 'item4', 400, 2024, 3, 18)""" + sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (5, 'item5', 500, 2024, 3, 19)""" + + // Check spec_ids after adding new partition field + List> allSpecs = sql """ + SELECT partition_spec_id, COUNT(*) as manifest_count + FROM ${spec_id_table}\$manifests + GROUP BY partition_spec_id + ORDER BY partition_spec_id + """ + logger.info("All spec IDs and manifest counts: ${allSpecs}") + + if (allSpecs.size() > 0) { + int targetSpec = allSpecs[0][0] as int + int targetCount = allSpecs[0][1] as int + List> specResult = sql """ + ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} + EXECUTE rewrite_manifests('spec_id' = '${targetSpec}') + """ + int specRewritten = specResult[0][0] as int + assertTrue(specRewritten == targetCount, + "Should rewrite exactly ${targetCount} manifests for spec_id=${targetSpec}, got ${specRewritten}") + qt_after_spec_id_rewrite """SELECT * FROM ${spec_id_table} ORDER BY id""" + logger.info("spec_id filtering test completed successfully") + } else { + logger.warn("Could not create spec_id, skipping spec_id filtering test") + } + + // ===================================================================================== + // Test Case 4: rewrite_manifests on empty table (no current snapshot) + // Tests that rewrite_manifests handles tables with no current snapshot gracefully + // ===================================================================================== + logger.info("Starting rewrite_manifests on empty table test case") + + def empty_table = "test_empty_table" + sql """DROP TABLE IF EXISTS ${db_name}.${empty_table}""" + sql """ + CREATE TABLE ${db_name}.${empty_table} ( + id BIGINT, + name STRING + ) ENGINE=iceberg + """ + logger.info("Created empty test table: ${empty_table}") + + // Execute rewrite_manifests on empty table (no current snapshot) + List> emptyTableResult = sql """ + ALTER TABLE ${catalog_name}.${db_name}.${empty_table} + EXECUTE rewrite_manifests() + """ + logger.info("Empty table rewrite_manifests result: ${emptyTableResult}") + + // Verify result structure + assertTrue(emptyTableResult.size() == 1, "Expected exactly 1 result row for empty table") + assertTrue(emptyTableResult[0].size() == 2, "Expected 2 columns in result for empty table") + + // Should return 0 rewritten manifests and 0 added manifests for empty table + int emptyRewrittenCount = emptyTableResult[0][0] as int + int emptyAddedCount = emptyTableResult[0][1] as int + + assertTrue(emptyRewrittenCount == 0, "Empty table should have 0 rewritten manifests, got: ${emptyRewrittenCount}") + assertTrue(emptyAddedCount == 0, "Empty table should have 0 added manifests, got: ${emptyAddedCount}") + + logger.info("Empty table test completed: rewritten=${emptyRewrittenCount}, added=${emptyAddedCount}") + + // ===================================================================================== + // Negative Test Cases: Parameter validation and error handling + // ===================================================================================== + logger.info("Starting negative test cases for rewrite_manifests") + + // Test with invalid spec_id format + + test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE rewrite_manifests('spec_id' = 'not-a-number') + """ + exception "Invalid" + } + + // Test with non-existent spec_id (on spec_id table) + + // Test with non-existent spec_id (very large number unlikely to exist) on spec_id table + List> nonExistentSpecOnSpecTable = sql """ + ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} + EXECUTE rewrite_manifests('spec_id' = '99999') + """ + assertTrue(nonExistentSpecOnSpecTable[0][0] as int == 0, "Non-existent spec_id on spec_id_table should return 0 rewritten") + assertTrue(nonExistentSpecOnSpecTable[0][1] as int == 0, "Non-existent spec_id on spec_id_table should return 0 added") + + // Test with unknown parameter + test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE rewrite_manifests('unknown-parameter' = 'value') + """ + exception "Unknown argument: unknown-parameter" + } + + // Test rewrite_manifests with partition specification (should fail) + test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE rewrite_manifests() PARTITIONS (part1) + """ + exception "Action 'rewrite_manifests' does not support partition specification" + } + + // Test rewrite_manifests with WHERE condition (should fail) + test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE rewrite_manifests() WHERE id > 0 + """ + exception "Action 'rewrite_manifests' does not support WHERE condition" + } + + // Test on non-existent table + test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.non_existent_table + EXECUTE rewrite_manifests() + """ + exception "Table non_existent_table does not exist" + } + + logger.info("All rewrite_manifests test cases completed successfully") +} \ No newline at end of file