diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index bdec6802004c05..79bebbf2125a1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -455,6 +455,11 @@ public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceB + ", error message is: {} " + ExceptionUtils.getRootCauseMessage(e), e); } String branchName = branchInfo.getBranchName(); + + if (branchName == null || branchName.trim().isEmpty()) { + throw new UserException("Branch name cannot be empty"); + } + boolean refExists = null != icebergTable.refs().get(branchName); boolean create = branchInfo.getCreate(); boolean replace = branchInfo.getReplace(); @@ -535,6 +540,11 @@ public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagI } String tagName = tagInfo.getTagName(); + + if (tagName == null || tagName.trim().isEmpty()) { + throw new UserException("Tag name cannot be empty"); + } + boolean create = tagInfo.getCreate(); boolean replace = tagInfo.getReplace(); boolean ifNotExists = tagInfo.getIfNotExists(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 93a7af2d8e095a..c7d1d638ddebac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -781,16 +781,19 @@ public void addPlannerHook(PlannerHook plannerHook) { } /** - * Load snapshot information of mvcc + * Load snapshot information of mvcc for a specific table. + * + * @param specificTable specific table to load snapshot for. + * @param tableSnapshot table snapshot info + * @param scanParams table scan params (e.g., branch/tag for Iceberg tables) */ - public void loadSnapshots(Optional tableSnapshot, Optional scanParams) { - for (TableIf tableIf : tables.values()) { - if (tableIf instanceof MvccTable) { - MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); - // may be set by MTMV, we can not load again - if (!snapshots.containsKey(mvccTableInfo)) { - snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot(tableSnapshot, scanParams)); - } + public void loadSnapshots(TableIf specificTable, Optional tableSnapshot, + Optional scanParams) { + if (specificTable instanceof MvccTable) { + MvccTableInfo mvccTableInfo = new MvccTableInfo(specificTable); + if (!snapshots.containsKey(mvccTableInfo)) { + snapshots.put(mvccTableInfo, + ((MvccTable) specificTable).loadSnapshot(tableSnapshot, scanParams)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 35d64c4455e2e8..59c2ad3d995d53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -9446,7 +9446,12 @@ public BranchOptions visitBranchOptions(DorisParser.BranchOptionsContext ctx) { if (ctx.retainTime() != null) { DorisParser.TimeValueWithUnitContext time = ctx.retainTime().timeValueWithUnit(); if (time != null) { - retainTime = Optional.of(visitTimeValueWithUnit(time)); + long retainTimeMs = visitTimeValueWithUnit(time); + if (retainTimeMs <= 0) { + throw new IllegalArgumentException( + "RETAIN time value must be greater than 0, got: " + retainTimeMs + " ms"); + } + retainTime = Optional.of(retainTimeMs); } } @@ -9455,11 +9460,21 @@ public BranchOptions visitBranchOptions(DorisParser.BranchOptionsContext ctx) { if (ctx.retentionSnapshot() != null) { DorisParser.RetentionSnapshotContext retentionSnapshotContext = ctx.retentionSnapshot(); if (retentionSnapshotContext.minSnapshotsToKeep() != null) { - numSnapshots = Optional.of( - Integer.parseInt(retentionSnapshotContext.minSnapshotsToKeep().value.getText())); + int snapshotsCount = Integer.parseInt( + retentionSnapshotContext.minSnapshotsToKeep().value.getText()); + if (snapshotsCount <= 0) { + throw new IllegalArgumentException( + "Snapshot count (SNAPSHOTS) value must be greater than 0, got: " + snapshotsCount); + } + numSnapshots = Optional.of(snapshotsCount); } if (retentionSnapshotContext.timeValueWithUnit() != null) { - retention = Optional.of(visitTimeValueWithUnit(retentionSnapshotContext.timeValueWithUnit())); + long retentionMs = visitTimeValueWithUnit(retentionSnapshotContext.timeValueWithUnit()); + if (retentionMs <= 0) { + throw new IllegalArgumentException( + "Retention time value must be greater than 0, got: " + retentionMs + " ms"); + } + retention = Optional.of(retentionMs); } } return new BranchOptions(snapshotId, retainTime, numSnapshots, retention); @@ -9497,7 +9512,12 @@ public TagOptions visitTagOptions(DorisParser.TagOptionsContext ctx) { if (ctx.retainTime() != null) { DorisParser.TimeValueWithUnitContext time = ctx.retainTime().timeValueWithUnit(); if (time != null) { - retainTime = Optional.of(visitTimeValueWithUnit(time)); + long retainTimeMs = visitTimeValueWithUnit(time); + if (retainTimeMs <= 0) { + throw new IllegalArgumentException( + "RETAIN time value must be greater than 0, got: " + retainTimeMs + " ms"); + } + retainTime = Optional.of(retainTimeMs); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 44dfbc563a71db..811bd923f67cb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -417,6 +417,7 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio List qualifierWithoutTableName = qualifiedTableName.subList(0, qualifiedTableName.size() - 1); cascadesContext.getStatementContext().loadSnapshots( + table, unboundRelation.getTableSnapshot(), Optional.ofNullable(unboundRelation.getScanParams())); boolean isView = false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java index e3d9ad9b23bdfc..41941bab6fe457 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java @@ -60,15 +60,6 @@ public void testCreateBranchIfNotExistsToSql() { Assertions.assertEquals(expected, branchInfo.toSql()); } - @Test - public void testCreateOrReplaceBranchIfNotExistsToSql() { - // Test CREATE OR REPLACE BRANCH IF NOT EXISTS - CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo( - "test_branch", true, true, true, null); - String expected = "CREATE OR REPLACE BRANCH IF NOT EXISTS test_branch"; - Assertions.assertEquals(expected, branchInfo.toSql()); - } - @Test public void testCreateBranchWithSnapshotIdToSql() { // Test CREATE BRANCH with snapshot ID diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_complex_queries.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_complex_queries.out new file mode 100644 index 00000000000000..4ebb8c695407ce --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_complex_queries.out @@ -0,0 +1,125 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !agg_count -- +8 + +-- !agg_sum -- +360 + +-- !agg_avg -- +45 + +-- !agg_max -- +80 + +-- !agg_min -- +10 + +-- !agg_group_by -- +cat1 3 100 +cat2 3 150 +cat3 2 110 + +-- !window_row_number -- +1 a 10 1 +2 b 20 2 +3 c 30 3 +4 d 40 4 +5 e 50 5 +6 f 60 6 +7 g 70 7 +8 h 80 8 + +-- !window_rank -- +1 a 10 1 +2 b 20 2 +3 c 30 1 +4 d 40 2 +5 e 50 1 +6 f 60 2 +7 g 70 3 +8 h 80 3 + +-- !window_dense_rank -- +1 a 10 1 +2 b 20 2 +3 c 30 3 +4 d 40 4 +5 e 50 5 +6 f 60 6 +7 g 70 7 +8 h 80 8 + +-- !join_branch_branch -- +1 a info1 +2 b info2 +3 c info3 +6 f info6 +7 g info7 +8 h info8 + +-- !join_branch_main -- +1 a info1 +2 b info2 +3 c info3 +6 f info6 +7 g info7 + +-- !join_main_branch -- +1 a info1 +2 b info2 +3 c info3 + +-- !subquery_in -- + +-- !subquery_exists -- + +-- !subquery_scalar -- +1 a 80 +2 b 80 +3 c 80 +4 d 80 +5 e 80 + +-- !cte_simple -- +6 f 60 cat3 +7 g 70 cat1 +8 h 80 cat2 + +-- !cte_multiple -- +1 a 10 cat1 +2 b 20 cat1 +3 c 30 cat2 +4 d 40 cat2 +7 g 70 cat1 +8 h 80 cat2 + +-- !cte_join -- +1 a 10 +2 b 20 +3 c 30 +4 d 40 +5 e 50 +6 f \N +7 g \N +8 h \N + +-- !complex_filter -- +cat1 3 33.33333333333334 +cat2 3 50 +cat3 1 60 + +-- !order_limit -- +8 h 80 cat2 +7 g 70 cat1 +6 f 60 cat3 + +-- !order_by_multiple -- +7 g 70 cat1 +2 b 20 cat1 +1 a 10 cat1 +8 h 80 cat2 +4 d 40 cat2 +3 c 30 cat2 +6 f 60 cat3 +5 e 50 cat3 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_cross_operations.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_cross_operations.out new file mode 100644 index 00000000000000..0447dee6891073 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_cross_operations.out @@ -0,0 +1,104 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !b1_initial -- +1 a +2 b +3 c +4 d +5 e + +-- !b2_initial -- +1 a +2 b +3 c + +-- !b2_after_copy -- +1 a +1 a +2 b +2 b +3 c +3 c +4 d +5 e + +-- !b1_unchanged -- +1 a +2 b +3 c +4 d +5 e + +-- !b3_has_main_data -- +1 a +1 a +2 b +2 b +3 c +3 c +6 f +6 f +7 g +7 g + +-- !main_unchanged_after_copy -- +1 a +2 b +3 c +6 f +7 g + +-- !main_overwritten -- +1 a +2 b +3 c +6 f +7 g +8 h +9 i +10 j + +-- !b4_unchanged -- +1 a +2 b +3 c +6 f +7 g +8 h +9 i +10 j + +-- !b4_has_filtered_data -- +1 a +2 b +3 c +6 f +7 g +8 h +9 i +10 j +11 k +12 l +13 m + +-- !b6_overwritten -- +1 a +2 b +3 c +6 f +7 g +8 h +9 i +10 j +11 k +12 l +13 m + +-- !b5_chain -- +31 + +-- !b6_chain -- +20 + +-- !b7_chain -- +9 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_partition_operations.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_partition_operations.out new file mode 100644 index 00000000000000..d96a863ffc1988 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_partition_operations.out @@ -0,0 +1,61 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !b1_all_partitions -- +1 a +2 a +3 b +4 b +5 c +6 c + +-- !b1_new_partition -- +7 d +8 d + +-- !main_no_new_partition -- + +-- !main_before_overwrite -- +1 a +2 a +9 a +10 a + +-- !b1_overwrite_partition -- +11 a +12 a + +-- !main_unchanged -- +1 a +2 a +9 a +10 a + +-- !b1_multiple_partitions -- +a 2 +b 2 +c 2 +d 2 +e 2 +f 2 + +-- !main_partitions -- +a 4 +b 2 +c 2 + +-- !b2_partitions -- +1 a +2 a +3 b +4 b +5 c +6 c +9 a +10 a +17 g +18 g +19 h +20 h + +-- !b1_still_has_original -- +12 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.out new file mode 100644 index 00000000000000..ed60f80f9b29b3 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.out @@ -0,0 +1,24 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !expire_branch_still_accessible -- +8 + +-- !expire_tag_still_accessible -- +1 a +2 b + +-- !retain_count_branch_accessible -- +6 + +-- !retain_count_branch_data -- +6 + +-- !unref_tag_before_expire -- +1 old +2 old2 +3 new + +-- !unref_tag_accessible -- +1 old +2 old2 +3 new + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_auth.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_auth.out new file mode 100644 index 00000000000000..621350f0b90e2b --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_auth.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_branch_0 -- +1 name_1 + +-- !select_tag_0 -- +1 name_1 + +-- !select_branch_0_auth -- +1 name_1 + +-- !select_tag_0_auth -- +1 name_1 + +-- !select_branch_0_auth_2 -- +1 name_1 + +-- !select_tag_0_auth_2 -- +1 name_1 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_edge_cases.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_edge_cases.out new file mode 100644 index 00000000000000..c03c8313f97d6b --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_edge_cases.out @@ -0,0 +1,35 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !branch_hyphen -- +1 a +2 b + +-- !branch_underscore -- +1 a +2 b + +-- !tag_hyphen -- +1 a +2 b + +-- !tag_underscore -- +1 a +2 b + +-- !case_branch -- +2 + +-- !case_tag -- +2 + +-- !numeric_branch -- +2 + +-- !numeric_tag -- +2 + +-- !mixed_branch -- +2 + +-- !mixed_tag -- +2 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_parallel_op.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_parallel_op.out new file mode 100644 index 00000000000000..1b4519ec7a8fc2 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_parallel_op.out @@ -0,0 +1,69 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_branch_3 -- +1 name_1 + +-- !select_branch_4 -- +1 name_1 + +-- !select_branch_8 -- +1 name_1 + +-- !select_branch_5 -- +1 name_1 + +-- !select_branch_6 -- +1 name_1 + +-- !select_branch_0 -- +1 name_1 + +-- !select_branch_7 -- +1 name_1 + +-- !select_branch_2 -- +1 name_1 + +-- !select_branch_after_insert_0 -- +1 name_1 +2 name_2 + +-- !select_branch_after_insert_2 -- +1 name_1 +4 name_4 + +-- !select_tag_8 -- +1 name_1 + +-- !select_tag_5 -- +1 name_1 + +-- !select_tag_6 -- +1 name_1 + +-- !select_tag_7 -- +1 name_1 + +-- !select_tag_3 -- +1 name_1 + +-- !select_tag_1 -- +1 name_1 + +-- !select_tag_0 -- +1 name_1 + +-- !select_tag_4 -- +1 name_1 + +-- !select_tag_2 -- +1 name_1 + +-- !select_tag_9 -- +1 name_1 + +-- !select_branch_same_name -- +1 name_1 + +-- !select_tag_same_name -- +1 name_1 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_schema_change_extended.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_schema_change_extended.out new file mode 100644 index 00000000000000..87a878776933de --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_schema_change_extended.out @@ -0,0 +1,36 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !b1_has_new_col -- +1 a \N +2 b \N + +-- !b2_no_dropped_col -- +1 \N +2 \N + +-- !b3_new_type -- +1 10 + +-- !b3_with_new_col -- +3 30 test + +-- !t1_no_new_col -- +1 a +2 b + +-- !t2_has_old_col -- +1 a + +-- !t3_old_type -- +1 + +-- !main_schema -- +id int Yes true \N +col2 text Yes true \N +col3 int Yes true \N + +-- !tag_schema -- +1 a + +-- !b4_new_schema -- +1 \N + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_system_tables.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_system_tables.out new file mode 100644 index 00000000000000..df8ba0cafcd3bf --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_system_tables.out @@ -0,0 +1,59 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !refs_all -- +b1_system BRANCH \N \N \N +b2_system BRANCH \N \N \N +main BRANCH \N \N \N +t1_system TAG \N \N \N +t2_system TAG \N \N \N + +-- !refs_branches -- +b1_system BRANCH \N \N \N +b2_system BRANCH \N \N \N +main BRANCH \N \N \N + +-- !refs_tags -- +t1_system TAG \N \N \N +t2_system TAG \N \N \N + +-- !refs_b1 -- +b1_system BRANCH \N \N \N + +-- !refs_sorted -- +b1_system BRANCH \N \N \N +b2_system BRANCH \N \N \N +main BRANCH \N \N \N +t1_system TAG \N \N \N +t2_system TAG \N \N \N + +-- !refs_snapshots_join -- +b1_system BRANCH +b2_system BRANCH +main BRANCH +t1_system TAG +t2_system TAG + +-- !refs_count -- +5 + +-- !refs_branch_count -- +3 + +-- !refs_tag_count -- +2 + +-- !refs_with_details -- +b1_system BRANCH append +b2_system BRANCH append +main BRANCH append +t1_system TAG append +t2_system TAG append + +-- !refs_after_more -- +7 + +-- !refs_branches_after -- +4 + +-- !refs_tags_after -- +3 + diff --git a/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.out b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.out new file mode 100644 index 00000000000000..528b8f22b6bed8 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.out @@ -0,0 +1,114 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !expire_tag_early_accessible -- +1 a snapshot1 +2 b snapshot2 + +-- !expire_tag_middle_accessible -- +1 a snapshot1 +2 b snapshot2 +3 c snapshot3 +4 d snapshot4 + +-- !multi_tag_a -- +1 first +2 second + +-- !multi_tag_b -- +1 first +2 second + +-- !multi_tag_c -- +1 first +2 second + +-- !imm_first_query -- +1 version1 1000 +2 version2 2000 + +-- !imm_after_changes -- +1 version1 1000 +2 version2 2000 + +-- !imm_main_has_new -- +1 version1 1000 +2 version2 2000 +3 version3 3000 +4 version4 4000 + +-- !imm_unchanged -- +1 version1 1000 +2 version2 2000 + +-- !replace_initial -- +1 pending + +-- !replace_after -- +1 pending +2 approved + +-- !tt_v1 -- +1 v1.0 2024-01-01 + +-- !tt_v2 -- +1 v1.0 2024-01-01 +2 v1.0 2024-01-01 + +-- !tt_v3 -- +1 v1.0 2024-01-01 +2 v1.0 2024-01-01 +3 v1.0 2024-01-01 + +-- !tt_v1_unchanged -- +1 v1.0 2024-01-01 + +-- !tt_v3_unchanged -- +1 v1.0 2024-01-01 +2 v1.0 2024-01-01 +3 v1.0 2024-01-01 + +-- !tt_main_all -- +1 v1.0 2024-01-01 +2 v1.0 2024-01-01 +3 v1.0 2024-01-01 +4 v2.0 2024-02-01 +5 v2.0 2024-02-01 + +-- !agg_count -- +4 + +-- !agg_sum -- +750 + +-- !agg_avg -- +187.5 + +-- !agg_group_by -- +A 250 +B 200 +C 300 + +-- !agg_count_unchanged -- +4 + +-- !agg_sum_unchanged -- +750 + +-- !inter_tag_baseline -- +1 item1 main +2 item2 main + +-- !inter_branch_data -- +1 item1 main +2 item2 main +3 item3 branch +4 item4 branch + +-- !inter_join -- +1 item1 1 item1 +2 item2 2 item2 +\N \N 3 item3 +\N \N 4 item4 + +-- !write_fail_tag_unchanged -- +1 data1 + diff --git a/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out b/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out index a7108fdcc02837..9af28fc98f617d 100644 --- a/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out +++ b/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out @@ -76,6 +76,29 @@ 4 5 +-- !q17 -- +1 +2 +3 +4 +5 +6 +7 + +-- !q19 -- +1 +2 +3 +4 +5 + +-- !q19_1 -- +1 +2 +3 +4 +5 + -- !q20 -- -- !q21 -- diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index da6ee9fe4426a1..dfb0b239bea104 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1598,6 +1598,96 @@ class Suite implements GroovyInterceptable { return result } + /** + * Get the spark-iceberg container name by querying docker. + * Uses 'docker ps --filter name=spark-iceberg' to find the container. + */ + private String getSparkIcebergContainerName() { + try { + // Use docker ps with filter to find containers with 'spark-iceberg' in the name + String command = "docker ps --filter name=spark-iceberg --format {{.Names}}" + def process = command.execute() + process.waitFor() + String output = process.in.text.trim() + + if (output) { + // Get the first matching container + String containerName = output.split('\n')[0].trim() + if (containerName) { + logger.info("Found spark-iceberg container: ${containerName}".toString()) + return containerName + } + } + + logger.warn("No spark-iceberg container found via docker ps") + return null + } catch (Exception e) { + logger.warn("Failed to get spark-iceberg container via docker ps: ${e.message}".toString()) + return null + } + } + + /** + * Execute Spark SQL on the spark-iceberg container via docker exec. + * + * Usage in test suite: + * spark_iceberg "CREATE TABLE demo.test_db.t1 (id INT) USING iceberg" + * spark_iceberg "INSERT INTO demo.test_db.t1 VALUES (1)" + * def result = spark_iceberg "SELECT * FROM demo.test_db.t1" + * + * The container name is found by querying 'docker ps --filter name=spark-iceberg' + */ + String spark_iceberg(String sqlStr, int timeoutSeconds = 120) { + String containerName = getSparkIcebergContainerName() + if (containerName == null) { + throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.") + } + String masterUrl = "spark://${containerName}:7077" + + // Escape double quotes in SQL string for shell command + String escapedSql = sqlStr.replaceAll('"', '\\\\"') + + // Build docker exec command + String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -e "${escapedSql}" """ + + logger.info("Executing Spark Iceberg SQL: ${sqlStr}".toString()) + logger.info("Container: ${containerName}".toString()) + + try { + String result = cmd(command, timeoutSeconds) + logger.info("Spark Iceberg SQL result: ${result}".toString()) + return result + } catch (Exception e) { + logger.error("Spark Iceberg SQL failed: ${e.message}".toString()) + throw e + } + } + + /** + * Execute multiple Spark SQL statements on the spark-iceberg container. + * Statements are separated by semicolons. + * + * Usage: + * spark_iceberg_multi ''' + * CREATE DATABASE IF NOT EXISTS demo.test_db; + * CREATE TABLE demo.test_db.t1 (id INT) USING iceberg; + * INSERT INTO demo.test_db.t1 VALUES (1); + * ''' + */ + List spark_iceberg_multi(String sqlStatements, int timeoutSeconds = 300) { + // Split by semicolon and execute each statement + def statements = sqlStatements.split(';').collect { it.trim() }.findAll { it } + def results = [] + + for (stmt in statements) { + if (stmt) { + results << spark_iceberg(stmt, timeoutSeconds) + } + } + + return results + } + List> db2_docker(String sqlStr, boolean isOrder = false) { String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") def (result, meta) = JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr) diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_complex_queries.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_complex_queries.groovy new file mode 100644 index 00000000000000..deec1c1093109b --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_complex_queries.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_complex_queries", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_branch_complex" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_complex force""" + sql """create database ${catalog_name}.test_db_complex""" + sql """ use ${catalog_name}.test_db_complex """ + + String table_name = "test_complex_branch" + + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, name string, value int, category string) """ + + sql """ insert into ${table_name} values (1, 'a', 10, 'cat1'), (2, 'b', 20, 'cat1'), (3, 'c', 30, 'cat2'), (4, 'd', 40, 'cat2'), (5, 'e', 50, 'cat3') """ + sql """ alter table ${table_name} create branch b1_complex """ + sql """ insert into ${table_name}@branch(b1_complex) values (6, 'f', 60, 'cat3'), (7, 'g', 70, 'cat1'), (8, 'h', 80, 'cat2') """ + + // Test 1.4.1: Aggregate queries + qt_agg_count """ select count(*) from ${table_name}@branch(b1_complex) """ + qt_agg_sum """ select sum(value) from ${table_name}@branch(b1_complex) """ + qt_agg_avg """ select avg(value) from ${table_name}@branch(b1_complex) """ + qt_agg_max """ select max(value) from ${table_name}@branch(b1_complex) """ + qt_agg_min """ select min(value) from ${table_name}@branch(b1_complex) """ + qt_agg_group_by """ select category, count(*), sum(value) from ${table_name}@branch(b1_complex) group by category order by category """ + + // Test 1.4.2: Window function queries + qt_window_row_number """ select id, name, value, row_number() over (order by value) as rn from ${table_name}@branch(b1_complex) order by id """ + qt_window_rank """ select id, name, value, rank() over (partition by category order by value) as rk from ${table_name}@branch(b1_complex) order by id """ + qt_window_dense_rank """ select id, name, value, dense_rank() over (order by value) as dr from ${table_name}@branch(b1_complex) order by id """ + + // Test 1.4.3: Multi-table join queries + String table_name2 = "test_complex_branch2" + sql """ drop table if exists ${table_name2} """ + sql """ create table ${table_name2} (id int, info string) """ + sql """ insert into ${table_name2} values (1, 'info1'), (2, 'info2'), (3, 'info3'), (6, 'info6'), (7, 'info7') """ + sql """ alter table ${table_name2} create branch b2_complex """ + sql """ insert into ${table_name2}@branch(b2_complex) values (8, 'info8'), (9, 'info9') """ + + qt_join_branch_branch """ select t1.id, t1.name, t2.info from ${table_name}@branch(b1_complex) t1 join ${table_name2}@branch(b2_complex) t2 on t1.id = t2.id order by t1.id """ + qt_join_branch_main """ select t1.id, t1.name, t2.info from ${table_name}@branch(b1_complex) t1 join ${table_name2} t2 on t1.id = t2.id order by t1.id """ + qt_join_main_branch """ select t1.id, t1.name, t2.info from ${table_name} t1 join ${table_name2}@branch(b2_complex) t2 on t1.id = t2.id order by t1.id """ + + // Test 1.4.4: Subquery with branch + qt_subquery_in """ select * from ${table_name} where id in (select id from ${table_name}@branch(b1_complex) where value > 50) order by id """ + qt_subquery_exists """ select * from ${table_name} t1 where exists (select 1 from ${table_name}@branch(b1_complex) t2 where t2.id = t1.id and t2.value > 50) order by id """ + qt_subquery_scalar """ select id, name, (select max(value) from ${table_name}@branch(b1_complex)) as max_val from ${table_name} order by id limit 5 """ + + // Test 1.4.5: CTE with branch + qt_cte_simple """ with cte as (select * from ${table_name}@branch(b1_complex) where value > 50) select * from cte order by id """ + qt_cte_multiple """ + with cte1 as (select * from ${table_name}@branch(b1_complex) where category = 'cat1'), + cte2 as (select * from ${table_name}@branch(b1_complex) where category = 'cat2') + select * from cte1 union all select * from cte2 order by id + """ + qt_cte_join """ + with branch_data as (select * from ${table_name}@branch(b1_complex)), + main_data as (select * from ${table_name}) + select b.id, b.name, m.value from branch_data b left join main_data m on b.id = m.id order by b.id + """ + + // Test complex filter with branch + qt_complex_filter """ + select category, count(*), avg(value) + from ${table_name}@branch(b1_complex) + where value > 30 and name like '%f%' or category in ('cat1', 'cat2') + group by category + order by category + """ + + // Test order by and limit with branch + qt_order_limit """ select * from ${table_name}@branch(b1_complex) order by value desc limit 3 """ + qt_order_by_multiple """ select * from ${table_name}@branch(b1_complex) order by category, value desc """ +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_cross_operations.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_cross_operations.groovy new file mode 100644 index 00000000000000..7e79713f5ec1b1 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_cross_operations.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_cross_operations", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_branch_cross" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_cross force""" + sql """create database ${catalog_name}.test_db_cross""" + sql """ use ${catalog_name}.test_db_cross """ + + String table_name = "test_cross_branch" + + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, name string) """ + + sql """ insert into ${table_name} values (1, 'a'), (2, 'b'), (3, 'c') """ + sql """ alter table ${table_name} create branch b1_cross """ + sql """ alter table ${table_name} create branch b2_cross """ + sql """ insert into ${table_name}@branch(b1_cross) values (4, 'd'), (5, 'e') """ + + // Test 1.5.1: Read from branch and write to another branch + qt_b1_initial """ select * from ${table_name}@branch(b1_cross) order by id """ // Should have 1,2,3,4,5 + qt_b2_initial """ select * from ${table_name}@branch(b2_cross) order by id """ // Should have 1,2,3 + + sql """ insert into ${table_name}@branch(b2_cross) select * from ${table_name}@branch(b1_cross) """ + qt_b2_after_copy """ select * from ${table_name}@branch(b2_cross) order by id """ // Should have all data from b1 + qt_b1_unchanged """ select * from ${table_name}@branch(b1_cross) order by id """ // b1 should be unchanged + + // Test 1.5.2: Read from main branch and write to branch + sql """ insert into ${table_name} values (6, 'f'), (7, 'g') """ + sql """ alter table ${table_name} create branch b3_cross """ + sql """ insert into ${table_name}@branch(b3_cross) select * from ${table_name} """ + qt_b3_has_main_data """ select * from ${table_name}@branch(b3_cross) order by id """ // Should have main branch data + qt_main_unchanged_after_copy """ select * from ${table_name} order by id """ // Main should be unchanged + + // Test 1.5.3: Read from branch and overwrite main branch + sql """ alter table ${table_name} create branch b4_cross """ + sql """ insert into ${table_name}@branch(b4_cross) values (8, 'h'), (9, 'i'), (10, 'j') """ + sql """ insert overwrite table ${table_name} select * from ${table_name}@branch(b4_cross) """ + qt_main_overwritten """ select * from ${table_name} order by id """ // Main should have b4 data + qt_b4_unchanged """ select * from ${table_name}@branch(b4_cross) order by id """ // b4 should be unchanged + + // Test cross-branch operations with filters + sql """ alter table ${table_name} create branch b5_cross """ + sql """ insert into ${table_name}@branch(b5_cross) values (11, 'k'), (12, 'l'), (13, 'm') """ + sql """ insert into ${table_name}@branch(b4_cross) select * from ${table_name}@branch(b5_cross) where id > 10 """ + qt_b4_has_filtered_data """ select * from ${table_name}@branch(b4_cross) order by id """ // Should have original + filtered data + + // Test overwrite from branch to branch + sql """ alter table ${table_name} create branch b6_cross """ + sql """ insert into ${table_name}@branch(b6_cross) values (14, 'n'), (15, 'o') """ + sql """ insert overwrite table ${table_name}@branch(b6_cross) select * from ${table_name}@branch(b5_cross) """ + qt_b6_overwritten """ select * from ${table_name}@branch(b6_cross) order by id """ // Should have b5 data + + // Test multiple branch chain operations + sql """ alter table ${table_name} create branch b7_cross """ + sql """ insert into ${table_name}@branch(b7_cross) values (16, 'p') """ + sql """ insert into ${table_name}@branch(b6_cross) select * from ${table_name}@branch(b7_cross) """ + sql """ insert into ${table_name}@branch(b5_cross) select * from ${table_name}@branch(b6_cross) """ + qt_b5_chain """ select count(*) from ${table_name}@branch(b5_cross) """ + qt_b6_chain """ select count(*) from ${table_name}@branch(b6_cross) """ + qt_b7_chain """ select count(*) from ${table_name}@branch(b7_cross) """ +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_partition_operations.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_partition_operations.groovy new file mode 100644 index 00000000000000..6ed62280c86b8e --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_partition_operations.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_partition_operations", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_branch_partition" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_partition force""" + sql """create database ${catalog_name}.test_db_partition""" + sql """ use ${catalog_name}.test_db_partition """ + + String table_name = "test_partition_branch" + + // Test 1.3.1: Partition table branch creation + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, par string) PARTITION BY LIST(par)() """ + + sql """ insert into ${table_name} values (1, 'a'), (2, 'a'), (3, 'b'), (4, 'b') """ + sql """ insert into ${table_name} values (5, 'c'), (6, 'c') """ + + sql """ alter table ${table_name} create branch b1_partition """ + qt_b1_all_partitions """ select * from ${table_name}@branch(b1_partition) order by id """ // Should have all partitions + + // Test 1.3.2: Partition table branch write + sql """ insert into ${table_name}@branch(b1_partition) values (7, 'd'), (8, 'd') """ + qt_b1_new_partition """ select * from ${table_name}@branch(b1_partition) where par = 'd' order by id """ // Should have new partition + qt_main_no_new_partition """ select * from ${table_name} where par = 'd' order by id """ // Main branch should not have new partition + + // Test 1.3.3: Partition table branch overwrite + sql """ insert into ${table_name} values (9, 'a'), (10, 'a') """ + qt_main_before_overwrite """ select * from ${table_name} where par = 'a' order by id """ + + sql """ insert overwrite table ${table_name}@branch(b1_partition) partition(par='a') select 11 union all select 12 """ + qt_b1_overwrite_partition """ select * from ${table_name}@branch(b1_partition) where par = 'a' order by id """ // Should only have 11, 12 + qt_main_unchanged """ select * from ${table_name} where par = 'a' order by id """ // Main branch unchanged + + // Test multiple partitions in branch + sql """ insert into ${table_name}@branch(b1_partition) values (13, 'e'), (14, 'e') """ + sql """ insert into ${table_name}@branch(b1_partition) values (15, 'f'), (16, 'f') """ + + qt_b1_multiple_partitions """ select par, count(*) from ${table_name}@branch(b1_partition) group by par order by par """ + qt_main_partitions """ select par, count(*) from ${table_name} group by par order by par """ + + // Test branch with different partition values + sql """ alter table ${table_name} create branch b2_partition """ + sql """ insert into ${table_name}@branch(b2_partition) values (17, 'g'), (18, 'g') """ + sql """ insert into ${table_name}@branch(b2_partition) values (19, 'h'), (20, 'h') """ + + qt_b2_partitions """ select * from ${table_name}@branch(b2_partition) order by id """ + qt_b1_still_has_original """ select count(*) from ${table_name}@branch(b1_partition) """ // b1 should still have its data +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy new file mode 100644 index 00000000000000..489715d1ff191b --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_retention_and_snapshot", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_branch_retention_snapshot" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_retention force""" + sql """create database ${catalog_name}.test_db_retention""" + sql """ use ${catalog_name}.test_db_retention """ + + // Test 1: expire_snapshots verification - snapshots referenced by branches/tags should be protected + def table_name_expire = "test_expire_snapshots_branch" + sql """ drop table if exists ${table_name_expire} """ + sql """ create table ${table_name_expire} (id int, name string) """ + + // Create multiple snapshots on main branch + sql """ insert into ${table_name_expire} values (1, 'a') """ + sql """ insert into ${table_name_expire} values (2, 'b') """ + sql """ insert into ${table_name_expire} values (3, 'c') """ + sql """ insert into ${table_name_expire} values (4, 'd') """ + sql """ insert into ${table_name_expire} values (5, 'e') """ + + List> snapshots_expire = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_expire}", "query_type" = "snapshots") order by committed_at; """ + String s_expire_0 = snapshots_expire.get(0)[0] + String s_expire_1 = snapshots_expire.get(1)[0] + String s_expire_2 = snapshots_expire.get(2)[0] + String s_expire_3 = snapshots_expire.get(3)[0] + String s_expire_4 = snapshots_expire.get(4)[0] + + // Create a branch with snapshot retention policy + sql """ alter table ${table_name_expire} create branch b_expire_test AS OF VERSION ${s_expire_2} RETAIN 30 DAYS WITH SNAPSHOT RETENTION 3 SNAPSHOTS """ + + // Write multiple snapshots to the branch + sql """ insert into ${table_name_expire}@branch(b_expire_test) values (6, 'f') """ + sql """ insert into ${table_name_expire}@branch(b_expire_test) values (7, 'g') """ + sql """ insert into ${table_name_expire}@branch(b_expire_test) values (8, 'h') """ + sql """ insert into ${table_name_expire}@branch(b_expire_test) values (9, 'i') """ + sql """ insert into ${table_name_expire}@branch(b_expire_test) values (10, 'j') """ + + // Get the current snapshot count before expire + def snapshot_count_before_expire = sql """ select count(*) from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_expire}", "query_type" = "snapshots") """ + logger.info("Snapshot count before expire: ${snapshot_count_before_expire[0][0]}") + + // Get branch ref snapshot + def branch_ref_snapshot = sql """ select snapshot_id from ${table_name_expire}\$refs where name = 'b_expire_test' """ + + // Create tags to protect additional snapshots + sql """ alter table ${table_name_expire} create tag t_expire_protect AS OF VERSION ${s_expire_1} """ + + // Call expire_snapshots via Spark - should not delete snapshots referenced by branch/tag + // Using a timestamp that would expire old snapshots but not those referenced by branch/tag + spark_iceberg """CALL demo.system.expire_snapshots(table => 'test_db_retention.${table_name_expire}', older_than => TIMESTAMP '2020-01-01 00:00:00')""" + + // Verify snapshots are still accessible after expire_snapshots + qt_expire_branch_still_accessible """ select count(*) from ${table_name_expire}@branch(b_expire_test) """ // Should still have data + qt_expire_tag_still_accessible """ select * from ${table_name_expire}@tag(t_expire_protect) order by id """ // Should have 2 records + + // Verify the branch ref is still in refs table + def branch_ref_after_expire = sql """ select count(*) from ${table_name_expire}\$refs where name = 'b_expire_test' """ + assertEquals(branch_ref_after_expire[0][0], 1) + + // Verify the tag ref is still in refs table + def tag_ref_after_expire = sql """ select count(*) from ${table_name_expire}\$refs where name = 't_expire_protect' """ + assertEquals(tag_ref_after_expire[0][0], 1) + + // Test 3: expire_snapshots with snapshot retention count policy + def table_name_retain_count = "test_expire_retain_count" + sql """ drop table if exists ${table_name_retain_count} """ + sql """ create table ${table_name_retain_count} (id int, name string) """ + + sql """ insert into ${table_name_retain_count} values (1, 'a') """ + List> snapshots_retain = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" = "snapshots") order by committed_at; """ + def s_retain_0 = snapshots_retain[0][0].toString() + + // Create branch with snapshot retention count of 3 + sql """ alter table ${table_name_retain_count} create branch b_retain_count AS OF VERSION ${s_retain_0} RETAIN 30 DAYS WITH SNAPSHOT RETENTION 3 SNAPSHOTS """ + + // Write 5 snapshots to the branch (exceeding retention count of 3) + sql """ insert into ${table_name_retain_count}@branch(b_retain_count) values (2, 'b') """ + sql """ insert into ${table_name_retain_count}@branch(b_retain_count) values (3, 'c') """ + sql """ insert into ${table_name_retain_count}@branch(b_retain_count) values (4, 'd') """ + sql """ insert into ${table_name_retain_count}@branch(b_retain_count) values (5, 'e') """ + sql """ insert into ${table_name_retain_count}@branch(b_retain_count) values (6, 'f') """ + + // Get current snapshot id on branch + def branch_snapshot_id = sql """ select snapshot_id from ${table_name_retain_count}\$refs where name = 'b_retain_count' """ + + // Count snapshots before expire + def snapshot_count_retain_before = sql """ select count(*) from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" = "snapshots") """ + + // Call expire_snapshots - older snapshots beyond retention count may be expired, but branch snapshot should be protected + spark_iceberg """CALL demo.system.expire_snapshots(table => 'test_db_retention.${table_name_retain_count}', older_than => TIMESTAMP '2020-01-01 00:00:00')""" + + // Verify branch is still accessible and has data + qt_retain_count_branch_accessible """ select count(*) from ${table_name_retain_count}@branch(b_retain_count) """ // Should have data + qt_retain_count_branch_data """ select count(*) from ${table_name_retain_count}@branch(b_retain_count) where id > 0 """ // Should have multiple records + + // Verify branch ref still exists + def branch_ref_retain_after = sql """ select count(*) from ${table_name_retain_count}\$refs where name = 'b_retain_count' """ + assertEquals(branch_ref_retain_after[0][0], 1) + + // Test 4: Verify that old unreferenced snapshots can be expired + def table_name_unref = "test_expire_unreferenced" + sql """ drop table if exists ${table_name_unref} """ + sql """ create table ${table_name_unref} (id int, name string) """ + + // Create snapshots + sql """ insert into ${table_name_unref} values (1, 'old') """ + sql """ insert into ${table_name_unref} values (2, 'old2') """ + sql """ insert into ${table_name_unref} values (3, 'new') """ + + List> snapshots_unref = sql """ select snapshot_id, committed_at from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", "query_type" = "snapshots") order by committed_at; """ + def old_snapshot_id = snapshots_unref[0][0] + + // Create a tag pointing to the newest snapshot (not the old ones) + List> latest_snapshot = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", "query_type" = "snapshots") order by committed_at desc limit 1; """ + def latest_snap_id = latest_snapshot[0][0] + sql """ alter table ${table_name_unref} create tag t_latest AS OF VERSION ${latest_snap_id} """ + + // Verify tag has all data before expiration + qt_unref_tag_before_expire """ select * from ${table_name_unref}@tag(t_latest) order by id """ // Should have 1, old2, 3 rows + + // Count snapshots before expire + def snapshot_count_unref_before = sql """ select count(*) from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", "query_type" = "snapshots") """ + logger.info("Snapshot count before expire: ${snapshot_count_unref_before[0][0]}") + + // Call expire_snapshots - old unreferenced snapshots should be expired + spark_iceberg """CALL demo.system.expire_snapshots(table => 'test_db_retention.${table_name_unref}', retain_last => 1)""" + + // Count snapshots after expire + def snapshot_count_unref_after = sql """ select count(*) from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", "query_type" = "snapshots") """ + + // Refresh catalog to ensure we see the latest state after expire_snapshots + sql """refresh catalog ${catalog_name}""" + + // Verify that at least the latest snapshot (referenced by tag) still exists + qt_unref_tag_accessible """ select * from ${table_name_unref}@tag(t_latest) order by id """ // Should have data + + // Verify old snapshot is no longer accessible if it was expired + def old_snapshot_exists = sql """ select count(*) from iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", "query_type" = "snapshots") where snapshot_id = '${old_snapshot_id}' """ + logger.info("Old snapshot exists after expire: ${old_snapshot_exists[0][0]}") + + // The tag-protected snapshot should still be in refs + def tag_ref_unref_after = sql """ select count(*) from ${table_name_unref}\$refs where name = 't_latest' """ + assertEquals(tag_ref_unref_after[0][0], 1) + +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_auth.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_auth.groovy new file mode 100644 index 00000000000000..4a2c5b578bd01d --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_auth.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_tag_auth", "p0,external_docker,external,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_branch_tag_auth" + String db_name = "test_parallel_auth" + String table_name = "test_branch_tag_auth" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.${db_name} force""" + sql """create database ${catalog_name}.${db_name}""" + sql """ use ${catalog_name}.${db_name} """ + sql """drop table if exists ${table_name}""" + sql """create table ${table_name} (id int, name string)""" + sql """insert into ${table_name} values (1, 'name_1')""" + sql """alter table ${table_name} create branch branch_0 """ + sql """alter table ${table_name} create tag tag_0 """ + qt_select_branch_0 """select * from ${table_name}@branch(branch_0)""" + qt_select_tag_0 """select * from ${table_name}@tag(tag_0)""" + + // Create test user without table permission + String user = "test_iceberg_branch_tag_auth_user" + String pwd = 'C123_567p' + try_sql("DROP USER '${user}'@'%'") + sql """CREATE USER '${user}'@'%' IDENTIFIED BY '${pwd}'""" + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER `${validCluster}` TO '${user}'@'%'"""; + } + sql """create database if not exists internal.regression_test""" + sql """GRANT SELECT_PRIV ON internal.regression_test.* TO '${user}'@'%'""" + connect(user, "${pwd}", context.config.jdbcUrl) { + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} create branch branch_1; + """ + exception "denied" + } + test { + sql """ + select * from ${catalog_name}.${db_name}.${table_name}@branch(branch_0); + """ + exception "denied" + } + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} drop branch branch_0; + """ + exception "denied" + } + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} create tag tag_1; + """ + exception "denied" + } + test { + sql """ + select * from ${catalog_name}.${db_name}.${table_name}@tag(tag_0); + """ + exception "denied" + } + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} drop tag tag_0; + """ + exception "denied" + } + } + // Grant permission and verify user can query branch and tag + sql """GRANT SELECT_PRIV ON ${catalog_name}.${db_name}.${table_name} TO '${user}'@'%'""" + + connect(user, "${pwd}", context.config.jdbcUrl) { + qt_select_branch_0_auth """select * from ${catalog_name}.${db_name}.${table_name}@branch(branch_0)""" + qt_select_tag_0_auth """select * from ${catalog_name}.${db_name}.${table_name}@tag(tag_0)""" + + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} create branch branch_1; + """ + exception "denied" + } + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} drop branch branch_0; + """ + exception "denied" + } + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} create tag tag_1; + """ + exception "denied" + } + test { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} drop tag tag_0; + """ + exception "denied" + } + } + + // Grant permission and verify user can create/drop branch and tag + sql """GRANT ALTER_PRIV ON ${catalog_name}.${db_name}.${table_name} TO '${user}'@'%'""" + connect(user, "${pwd}", context.config.jdbcUrl) { + sql """ + alter table ${catalog_name}.${db_name}.${table_name} create branch branch_1; + """ + qt_select_branch_0_auth_2 """ + select * from ${catalog_name}.${db_name}.${table_name}@branch(branch_0); + """ + sql """ + alter table ${catalog_name}.${db_name}.${table_name} drop branch branch_0; + """ + sql """ + alter table ${catalog_name}.${db_name}.${table_name} create tag tag_1; + """ + qt_select_tag_0_auth_2 """ + select * from ${catalog_name}.${db_name}.${table_name}@tag(tag_0); + """ + sql """ + alter table ${catalog_name}.${db_name}.${table_name} drop tag tag_0; + """ + } +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_edge_cases.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_edge_cases.groovy new file mode 100644 index 00000000000000..b111aa0aed3959 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_edge_cases.groovy @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_tag_edge_cases", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_edge_cases" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_edge force""" + sql """create database ${catalog_name}.test_db_edge""" + sql """ use ${catalog_name}.test_db_edge """ + + String table_name = "test_edge_cases" + + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, name string) """ + sql """ insert into ${table_name} values (1, 'a'), (2, 'b') """ + def snapshots = sql """ select snapshot_id from ${table_name}\$snapshots order by committed_at desc limit 1 """ + def snapshot_id = snapshots[0][0] + + + // Test 1.6.1: Special characters in branch/tag names + sql """ alter table ${table_name} create branch `branch-1` """ + sql """ alter table ${table_name} create branch `branch_1` """ + sql """ alter table ${table_name} create tag `tag-1` """ + sql """ alter table ${table_name} create tag `tag_1` """ + + qt_branch_hyphen """ select * from ${table_name}@branch(`branch-1`) order by id """ + qt_branch_underscore """ select * from ${table_name}@branch(`branch_1`) order by id """ + qt_tag_hyphen """ select * from ${table_name}@tag(`tag-1`) order by id """ + qt_tag_underscore """ select * from ${table_name}@tag(`tag_1`) order by id """ + + // Test 1.6.3: Same name for branch and tag (should fail) + sql """ alter table ${table_name} create branch b1_conflict """ + test { + sql """ alter table ${table_name} create tag b1_conflict """ + exception "Ref b1_conflict already exists" + } + + sql """ alter table ${table_name} create tag t1_conflict """ + test { + sql """ alter table ${table_name} create branch t1_conflict """ + exception "Ref t1_conflict already exists" + } + + // Test 1.6.5: Create branch based on non-existent snapshot + test { + sql """ alter table ${table_name} create branch b1_invalid AS OF VERSION 999999 """ + exception "Cannot set b1_invalid to unknown snapshot: 99999" + } + + // Test 2.4.1: Tag name conflict + sql """ alter table ${table_name} create tag t2_conflict """ + test { + sql """ alter table ${table_name} create branch t2_conflict """ + exception "Ref t2_conflict already exists" + } + + // Test 2.4.2: Tag write operations should fail + sql """ alter table ${table_name} create tag t2_write_test """ + test { + sql """ insert into ${table_name}@tag(t2_write_test) values (3, 'c') """ + exception "mismatched input 'tag' expecting 'BRANCH'" + } + + test { + sql """ insert overwrite table ${table_name}@tag(t2_write_test) values (3, 'c') """ + exception "mismatched input 'tag' expecting 'BRANCH'" + } + + // Test 2.4.3: Create tag based on non-existent snapshot + test { + sql """ alter table ${table_name} create tag t3_invalid AS OF VERSION 999999 """ + exception "Cannot set t3_invalid to unknown snapshot: 999999" + } + + // Test case sensitivity + sql """ alter table ${table_name} create branch `CaseSensitive` """ + sql """ alter table ${table_name} create tag `CaseSensitiveTag` """ + + qt_case_branch """ select count(*) from ${table_name}@branch(`CaseSensitive`) """ + qt_case_tag """ select count(*) from ${table_name}@tag(`CaseSensitiveTag`) """ + + // Test main branch cannot be deleted + test { + sql """ alter table ${table_name} drop branch main """ + exception "Cannot remove main branc" + } + + // Test query non-existent branch + test { + sql """ select * from ${table_name}@branch(not_exist_branch) """ + exception "does not have branch named not_exist_branc" + } + + // Test query non-existent tag + test { + sql """ select * from ${table_name}@tag(not_exist_tag) """ + exception "does not have tag named not_exist_tag" + } + + // Test invalid syntax for branch/tag + test { + sql """ select * from ${table_name}@branch('name'='invalid_key') """ + exception "does not have branch named invalid_key" + } + + test { + sql """ select * from ${table_name}@tag('name'='invalid_key') """ + exception "does not have tag named invalid_key" + } + + // Test empty branch/tag name + test { + sql """ alter table ${table_name} create branch `` """ + exception "Branch name cannot be empty" + } + + test { + sql """ alter table ${table_name} create tag `` """ + exception "Tag name cannot be empty" + } + + // Test branch/tag with numbers + sql """ alter table ${table_name} create branch `branch123` """ + sql """ alter table ${table_name} create tag `tag456` """ + + qt_numeric_branch """ select count(*) from ${table_name}@branch(`branch123`) """ + qt_numeric_tag """ select count(*) from ${table_name}@tag(`tag456`) """ + + // Test mixed case and special characters + sql """ alter table ${table_name} create branch `Branch_123-Test` """ + sql """ alter table ${table_name} create tag `Tag_456-Test` """ + + qt_mixed_branch """ select count(*) from ${table_name}@branch(`Branch_123-Test`) """ + qt_mixed_tag """ select count(*) from ${table_name}@tag(`Tag_456-Test`) """ + + // Test branch/tag creation with retention and snapshot retention edge cases + test { + sql """ alter table ${table_name} create branch b1_invalid_unit as of version ${snapshot_id} retain 2 SECONDS """ + exception "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'" + } + test { + sql """ alter table ${table_name} create branch b2_invalid_unit as of version ${snapshot_id} with snapshot retention 3 SNAPSHOTS 1 SECONDS """ + exception "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'" + } + + test { + sql """ alter table ${table_name} create branch b3_invalid_num as of version ${snapshot_id} retain 0 MINUTES """ + exception "RETAIN time value must be greater than 0" + } + test { + sql """ alter table ${table_name} create branch b4_invalid_num as of version ${snapshot_id} retain -1 MINUTES """ + exception "extraneous input '-' expecting INTEGER_VALUE" + } + test { + sql """ alter table ${table_name} create branch b5_invalid_num as of version ${snapshot_id} retain xxx MINUTES """ + exception "expecting INTEGER_VALUE" + } + test { + sql """ alter table ${table_name} create branch b6_invalid_num as of version ${snapshot_id} with snapshot retention 0 SNAPSHOTS 1 MINUTES """ + exception "Snapshot count (SNAPSHOTS) value must be greater than 0" + } + test { + sql """ alter table ${table_name} create branch b7_invalid_num as of version ${snapshot_id} with snapshot retention -1 SNAPSHOTS 1 MINUTES """ + exception "no viable alternative at input 'with snapshot retention -'" + } + test { + sql """ alter table ${table_name} create branch b8_invalid_num as of version ${snapshot_id} with snapshot retention xxx SNAPSHOTS 1 MINUTES """ + exception "no viable alternative at input 'with snapshot retention xxx'" + } + test { + sql """ alter table ${table_name} create branch b9_invalid_num as of version ${snapshot_id} with snapshot retention 3 SNAPSHOTS 0 MINUTES """ + exception "Retention time value must be greater than 0" + } + test { + sql """ alter table ${table_name} create branch b10_invalid_num as of version ${snapshot_id} with snapshot retention 3 SNAPSHOTS -1 MINUTES """ + exception "mismatched input '-' expecting" + } + test { + sql """ alter table ${table_name} create branch b11_invalid_num as of version ${snapshot_id} with snapshot retention 3 SNAPSHOTS xxx MINUTES """ + exception "mismatched input 'xxx' expecting" + } + + test { + sql """ alter table ${table_name} create tag t1_invalid_unit as of version ${snapshot_id} retain 1 SECONDS """ + exception "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'" + } + test { + sql """ alter table ${table_name} create tag t3_invalid_num as of version ${snapshot_id} retain 0 MINUTES """ + exception "RETAIN time value must be greater than 0" + } + test { + sql """ alter table ${table_name} create tag t4_invalid_num as of version ${snapshot_id} retain -1 MINUTES """ + exception "extraneous input '-' expecting INTEGER_VALUE" + } + test { + sql """ alter table ${table_name} create tag t5_invalid_num as of version ${snapshot_id} retain xxx MINUTES """ + exception "mismatched input 'xxx' expecting INTEGER_VALUE" + } +} diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_parallel_op.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_parallel_op.groovy new file mode 100644 index 00000000000000..e0c9ec642b8ee0 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_parallel_op.groovy @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_tag_parallel_op", "p2,external,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_branch_tag_parallel" + String db_name = "test_parallel_op" + String table_name = "test_branch_tag_operate" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.${db_name} force""" + sql """create database ${catalog_name}.${db_name}""" + sql """ use ${catalog_name}.${db_name} """ + sql """drop table if exists ${table_name}""" + sql """create table ${table_name} (id int, name string)""" + sql """insert into ${table_name} values (1, 'name_1')""" + + // Test: Concurrent creation of different named branches + // Note: Some operations may fail due to metadata refresh issues, which is expected + def createBranchResults = [] + def futures = [] + for (int i = 0; i < 5; i++) { + int idx = i + futures.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """alter table ${table_name} create branch branch_${idx} """ + synchronized (this) { + createBranchResults.add([idx: idx, success: true]) + } + logger.info("Branch ${idx} creation succeeded") + } catch (Exception e) { + synchronized (this) { + createBranchResults.add([idx: idx, success: false, error: e.getMessage()]) + } + logger.info("Branch ${idx} creation failed: " + e.getMessage()) + } + }) + } + def combineFuture = combineFutures(futures) + combineFuture.get() + + // Verify that at least some branches were created successfully + def successfulBranches = createBranchResults.findAll { it.success } + def failedBranches = createBranchResults.findAll { !it.success } + logger.info("Branch creation results - Success: ${successfulBranches.size()}, Failure: ${failedBranches.size()}") + assertTrue(successfulBranches.size() > 0, "At least one branch should be created successfully") + + // Query only successfully created branches + successfulBranches.each { result -> + sql """use ${catalog_name}.${db_name}""" + def res = sql """select * from ${table_name}@branch(branch_${result.idx}) """ + logger.info("Query branch_${result.idx}: ${res.size()} rows") + assertTrue(res.size() > 0, "Branch ${result.idx} should have data") + } + + // Test: Concurrent writes to different branches + def writeBranchResults = [] + def futures1 = [] + for (int i = 0; i < 5; i++) { + int idx = i + futures1.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """insert into ${table_name}@branch(branch_${idx}) values (${idx + 2}, 'name_${idx + 2}') """ + synchronized (this) { + writeBranchResults.add([idx: idx, success: true]) + } + logger.info("Write to branch ${idx} succeeded") + } catch (Exception e) { + synchronized (this) { + writeBranchResults.add([idx: idx, success: false, error: e.getMessage()]) + } + logger.info("Write to branch ${idx} failed: " + e.getMessage()) + } + }) + } + def combineFuture1 = combineFutures(futures1) + combineFuture1.get() + + // Verify some writes succeeded + def successfulWrites = writeBranchResults.findAll { it.success } + logger.info("Write results - Success: ${successfulWrites.size()}, Failure: ${writeBranchResults.size() - successfulWrites.size()}") + + // Query only successfully written branches + successfulBranches.each { result -> + def writeSuccess = successfulWrites.find { it.idx == result.idx } + if (writeSuccess != null) { + sql """use ${catalog_name}.${db_name}""" + def res = sql """select * from ${table_name}@branch(branch_${result.idx}) order by id""" + logger.info("Query branch_${result.idx} after write: ${res.size()} rows") + assertTrue(res.size() > 1, "Branch ${result.idx} should have more data after write") + } + } + + // Clean up: Drop successfully created branches + def dropBranchResults = [] + def futures2 = [] + successfulBranches.each { result -> + futures2.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """alter table ${table_name} drop branch branch_${result.idx} """ + synchronized (this) { + dropBranchResults.add([idx: result.idx, success: true]) + } + } catch (Exception e) { + synchronized (this) { + dropBranchResults.add([idx: result.idx, success: false, error: e.getMessage()]) + } + logger.info("Drop branch ${result.idx} failed: " + e.getMessage()) + } + }) + } + if (futures2.size() > 0) { + def combineFuture2 = combineFutures(futures2) + combineFuture2.get() + } + + // Test: Concurrent creation of different named tags + def createTagResults = [] + def futures3 = [] + for (int i = 0; i < 5; i++) { + int idx = i + futures3.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """alter table ${table_name} create tag tag_${idx} """ + synchronized (this) { + createTagResults.add([idx: idx, success: true]) + } + logger.info("Tag ${idx} creation succeeded") + } catch (Exception e) { + synchronized (this) { + createTagResults.add([idx: idx, success: false, error: e.getMessage()]) + } + logger.info("Tag ${idx} creation failed: " + e.getMessage()) + } + }) + } + def combineFuture3 = combineFutures(futures3) + combineFuture3.get() + + // Verify some tags were created + def successfulTags = createTagResults.findAll { it.success } + logger.info("Tag creation results - Success: ${successfulTags.size()}, Failure: ${createTagResults.size() - successfulTags.size()}") + assertTrue(successfulTags.size() > 0, "At least one tag should be created successfully") + + // Query only successfully created tags + successfulTags.each { result -> + sql """use ${catalog_name}.${db_name}""" + def res = sql """select * from ${table_name}@tag(tag_${result.idx}) """ + logger.info("Query tag_${result.idx}: ${res.size()} rows") + assertTrue(res.size() > 0, "Tag ${result.idx} should have data") + } + + // Clean up: Drop successfully created tags + def dropTagResults = [] + def futures4 = [] + successfulTags.each { result -> + futures4.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """alter table ${table_name} drop tag tag_${result.idx} """ + synchronized (this) { + dropTagResults.add([idx: result.idx, success: true]) + } + } catch (Exception e) { + synchronized (this) { + dropTagResults.add([idx: result.idx, success: false, error: e.getMessage()]) + } + logger.info("Drop tag ${result.idx} failed: " + e.getMessage()) + } + }) + } + if (futures4.size() > 0) { + def combineFuture4 = combineFutures(futures4) + combineFuture4.get() + } + + // Test concurrent branch creation with same name - only one should succeed + def branchSuccessCount = 0 + def branchFailureCount = 0 + def futures5 = [] + for (int i = 0; i < 5; i++) { + futures5.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """alter table ${table_name} create branch branch_same_name """ + synchronized (this) { + branchSuccessCount++ + } + logger.info("Branch creation succeeded") + } catch (Exception e) { + synchronized (this) { + branchFailureCount++ + } + logger.info("Caught exception: " + e.getMessage()) + } + }) + } + def combineFuture5 = combineFutures(futures5) + combineFuture5.get() + + logger.info("Branch creation - Success: ${branchSuccessCount}, Failure: ${branchFailureCount}") + assertTrue(branchSuccessCount >= 1, "At least one branch creation should succeed") + + sql """use ${catalog_name}.${db_name}""" + def branchSameNameRes = sql """select * from ${table_name}@branch(branch_same_name) """ + logger.info("Query branch_same_name: ${branchSameNameRes.size()} rows") + assertTrue(branchSameNameRes.size() > 0, "branch_same_name should have data") + + // Test concurrent writes to the same branch - only one should succeed + def writeSuccessCount = 0 + def writeFailureCount = 0 + def futures6 = [] + for (int i = 0; i < 5; i++) { + int idx = i + futures6.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """insert into ${table_name}@branch(branch_same_name) values (${idx + 100}, 'name_write_${idx}') """ + synchronized (this) { + writeSuccessCount++ + } + logger.info("Write ${idx} succeeded") + } catch (Exception e) { + synchronized (this) { + writeFailureCount++ + } + logger.info("Write ${idx} caught exception: " + e.getMessage()) + } + }) + } + def combineFuture6 = combineFutures(futures6) + combineFuture6.get() + + logger.info("Write to branch - Success: ${writeSuccessCount}, Failure: ${writeFailureCount}") + assertTrue(writeSuccessCount >= 1, "At least one write should succeed") + + // Test concurrent tag creation with same name - only one should succeed + def tagSuccessCount = 0 + def tagFailureCount = 0 + def futures7 = [] + for (int i = 0; i < 5; i++) { + futures7.add(thread { + sql """use ${catalog_name}.${db_name}""" + try { + sql """alter table ${table_name} create tag tag_same_name """ + synchronized (this) { + tagSuccessCount++ + } + logger.info("Tag creation succeeded") + } catch (Exception e) { + synchronized (this) { + tagFailureCount++ + } + logger.info("Caught exception: " + e.getMessage()) + } + }) + } + def combineFuture7 = combineFutures(futures7) + combineFuture7.get() + + logger.info("Tag creation - Success: ${tagSuccessCount}, Failure: ${tagFailureCount}") + assertTrue(tagSuccessCount >= 1, "At least one tag creation should succeed") + + sql """use ${catalog_name}.${db_name}""" + def tagSameNameRes = sql """select * from ${table_name}@tag(tag_same_name) """ + logger.info("Query tag_same_name: ${tagSameNameRes.size()} rows") + assertTrue(tagSameNameRes.size() > 0, "tag_same_name should have data") + +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_schema_change_extended.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_schema_change_extended.groovy new file mode 100644 index 00000000000000..48de78fd282842 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_schema_change_extended.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_tag_schema_change_extended", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_schema_extended" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_schema_ext force""" + sql """create database ${catalog_name}.test_db_schema_ext""" + sql """ use ${catalog_name}.test_db_schema_ext """ + + String table_name = "test_schema_extended" + + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, name string) """ + sql """ insert into ${table_name} values (1, 'a'), (2, 'b') """ + + // Test 3.1.1: Add column after branch query + sql """ alter table ${table_name} create branch b1_schema """ + sql """ alter table ${table_name} add column new_col string """ + qt_b1_has_new_col """ select * from ${table_name}@branch(b1_schema) order by id """ // Should have new_col with NULL + + // Test 3.1.2: Drop column after branch query + sql """ alter table ${table_name} create branch b2_schema """ + sql """ alter table ${table_name} drop column name """ + qt_b2_no_dropped_col """select * from ${table_name}@branch(b2_schema) order by id """ // Should not have 'name' column + + // Recreate table for next tests + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, value int) """ + sql """ insert into ${table_name} values (1, 10), (2, 20) """ + sql """ alter table ${table_name} create branch b3_schema """ + + // Test 3.1.3: Modify column type after branch query + sql """ alter table ${table_name} modify column id bigint """ + qt_b3_new_type """ select * from ${table_name}@branch(b3_schema) where id = 1 """ // Should use new type + + // Test 3.1.4: Branch write with new schema + sql """ alter table ${table_name} add column new_col string """ + sql """ insert into ${table_name}@branch(b3_schema)(id, value, new_col) values (3, 30, 'test') """ + qt_b3_with_new_col """ select * from ${table_name}@branch(b3_schema) where id = 3 """ + + // Test 3.2.1: Add column after tag query + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, name string) """ + sql """ insert into ${table_name} values (1, 'a'), (2, 'b') """ + sql """ alter table ${table_name} create tag t1_schema """ + sql """ alter table ${table_name} add column new_col string """ + qt_t1_no_new_col """ select * from ${table_name}@tag(t1_schema) order by id """ // Should not have new_col + + // Test 3.2.2: Drop column after tag query + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, old_col string) """ + sql """ insert into ${table_name} values (1, 'a') """ + sql """ alter table ${table_name} create tag t2_schema """ + sql """ alter table ${table_name} drop column old_col """ + qt_t2_has_old_col """ select * from ${table_name}@tag(t2_schema) order by id """ // Should still have old_col + + // Test 3.2.3: Modify column type after tag query + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int) """ + sql """ insert into ${table_name} values (1), (2) """ + sql """ alter table ${table_name} create tag t3_schema """ + sql """ alter table ${table_name} modify column id bigint """ + qt_t3_old_type """ select * from ${table_name}@tag(t3_schema) where id = 1 """ // Should still use INT + + // Test 3.2.4: Tag schema vs main schema comparison + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, col1 string) """ + sql """ insert into ${table_name} values (1, 'a') """ + sql """ alter table ${table_name} create tag t4_schema """ + sql """ alter table ${table_name} add column col2 string """ + sql """ alter table ${table_name} drop column col1 """ + sql """ alter table ${table_name} add column col3 int """ + + qt_main_schema """ desc ${table_name} """ // Should have id, col2, col3 + qt_tag_schema """ select * from ${table_name}@tag(t4_schema) where id = 1 """ // Should have id, col1 + + // Test 3.3.1: Schema change in branch should fail + // Note: Schema changes should be done on main table, not on branch/tag + // This test verifies that attempting schema change on branch/tag is not supported + test { + sql """ alter table ${table_name}@branch(b1_schema) add column test_col string """ + exception "no viable alternative at input" + } + + // Test 3.3.2: Schema change in tag should fail + test { + sql """ alter table ${table_name}@tag(t1_schema) add column test_col string """ + exception "no viable alternative at input" + } + + // Test 3.3.3: Schema change with multiple branches + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int) """ + sql """ insert into ${table_name} values (1), (2) """ + sql """ alter table ${table_name} create branch b4_schema """ + + sql """ alter table ${table_name} add column new_col string """ + + qt_b4_new_schema """ select * from ${table_name}@branch(b4_schema) where id = 1 """ // Should have new_col +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_system_tables.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_system_tables.groovy new file mode 100644 index 00000000000000..c9994d770bb977 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_tag_system_tables.groovy @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_branch_tag_system_tables", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_system_tables" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_system force""" + sql """create database ${catalog_name}.test_db_system""" + sql """ use ${catalog_name}.test_db_system """ + + String table_name = "test_system_tables" + + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int, name string) """ + + sql """ insert into ${table_name} values (1, 'a'), (2, 'b'), (3, 'c') """ + def snapshot_id1_snapshots = sql """ select snapshot_id from ${table_name}\$snapshots order by committed_at desc limit 1 """ + + sql """ alter table ${table_name} create branch b1_system """ + sql """ alter table ${table_name} create branch b2_system """ + def snapshot_id1_refs_b1 = sql """ select snapshot_id from ${table_name}\$refs where name = 'b1_system' """ + def snapshot_id1_refs_b2 = sql """ select snapshot_id from ${table_name}\$refs where name = 'b2_system' """ + assertEquals(snapshot_id1_snapshots[0][0], snapshot_id1_refs_b1[0][0]) + assertEquals(snapshot_id1_snapshots[0][0], snapshot_id1_refs_b2[0][0]) + + sql """ alter table ${table_name} create tag t1_system """ + sql """ alter table ${table_name} create tag t2_system """ + def snapshot_id1_refs_t1 = sql """ select snapshot_id from ${table_name}\$refs where name = 't1_system' """ + def snapshot_id1_refs_t2 = sql """ select snapshot_id from ${table_name}\$refs where name = 't2_system' """ + assertEquals(snapshot_id1_snapshots[0][0], snapshot_id1_refs_t1[0][0]) + assertEquals(snapshot_id1_snapshots[0][0], snapshot_id1_refs_t2[0][0]) + + sql """ insert into ${table_name}@branch(b1_system) values (4, 'd') """ + + // Test 4.1.1: Query all refs information + qt_refs_all """ select name, type, max_reference_age_in_ms, min_snapshots_to_keep, max_snapshot_age_in_ms from ${table_name}\$refs order by name """ + + // Test 4.1.2: Filter branch information + qt_refs_branches """ select name, type, max_reference_age_in_ms, min_snapshots_to_keep, max_snapshot_age_in_ms from ${table_name}\$refs where type = 'BRANCH' order by name """ + + // Test 4.1.3: Filter tag information + qt_refs_tags """ select name, type, max_reference_age_in_ms, min_snapshots_to_keep, max_snapshot_age_in_ms from ${table_name}\$refs where type = 'TAG' order by name """ + + // Test 4.1.4: Query specific ref information + order_qt_refs_b1 """ select name, type, max_reference_age_in_ms, min_snapshots_to_keep, max_snapshot_age_in_ms from ${table_name}\$refs where name = 'b1_system' """ + + // Test 4.1.5: Refs information sorting + qt_refs_sorted """ select name, type, max_reference_age_in_ms, min_snapshots_to_keep, max_snapshot_age_in_ms from ${table_name}\$refs order by name """ + + // Test 4.1.6: Refs and snapshots join query + qt_refs_snapshots_join """ + select r.name, r.type + from ${table_name}\$refs r + join ${table_name}\$snapshots s on r.snapshot_id = s.snapshot_id + order by r.name + """ + + // Test refs count verification + qt_refs_count """ select count(*) from ${table_name}\$refs """ + qt_refs_branch_count """ select count(*) from ${table_name}\$refs where type = 'BRANCH' """ + qt_refs_tag_count """ select count(*) from ${table_name}\$refs where type = 'TAG' """ + + // Test refs with snapshot details + qt_refs_with_details """ + select + r.name, + r.type, + s.operation + from ${table_name}\$refs r + left join ${table_name}\$snapshots s on r.snapshot_id = s.snapshot_id + order by r.name + """ + + // Test query refs after creating more branches and tags + sql """ alter table ${table_name} create branch b3_system """ + sql """ alter table ${table_name} create tag t3_system """ + sql """ insert into ${table_name}@branch(b3_system) values (5, 'e') """ + + qt_refs_after_more """ select count(*) from ${table_name}\$refs """ + qt_refs_branches_after """ select count(*) from ${table_name}\$refs where type = 'BRANCH' """ + qt_refs_tags_after """ select count(*) from ${table_name}\$refs where type = 'TAG' """ + +} + diff --git a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy new file mode 100644 index 00000000000000..3a8244d18ca656 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_tag_retention_and_consistency", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_tag_retention_consistency" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """drop database if exists ${catalog_name}.test_db_tag_retention force""" + sql """create database ${catalog_name}.test_db_tag_retention""" + sql """ use ${catalog_name}.test_db_tag_retention """ + + // Test 1: Tag protection from expire_snapshots - tags should protect their referenced snapshots + def table_name_expire = "test_tag_expire_snapshots" + sql """ drop table if exists ${table_name_expire} """ + sql """ create table ${table_name_expire} (id int, name string, data string) """ + + // Create multiple snapshots on main branch + sql """ insert into ${table_name_expire} values (1, 'a', 'snapshot1') """ + sql """ insert into ${table_name_expire} values (2, 'b', 'snapshot2') """ + sql """ insert into ${table_name_expire} values (3, 'c', 'snapshot3') """ + sql """ insert into ${table_name_expire} values (4, 'd', 'snapshot4') """ + sql """ insert into ${table_name_expire} values (5, 'e', 'snapshot5') """ + + List> snapshots_expire = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_expire}", "query_type" = "snapshots") order by committed_at; """ + String s_expire_0 = snapshots_expire.get(0)[0] + String s_expire_1 = snapshots_expire.get(1)[0] + String s_expire_2 = snapshots_expire.get(2)[0] + String s_expire_3 = snapshots_expire.get(3)[0] + String s_expire_4 = snapshots_expire.get(4)[0] + + // Create tags at different snapshot points with retention policies + sql """ alter table ${table_name_expire} create tag t_early AS OF VERSION ${s_expire_1} RETAIN 30 DAYS """ + sql """ alter table ${table_name_expire} create tag t_middle AS OF VERSION ${s_expire_3} RETAIN 30 DAYS """ + + logger.info("Created tags t_early (snapshot ${s_expire_1}) and t_middle (snapshot ${s_expire_3})") + + // Get snapshot count before expire + def snapshot_count_before_expire = sql """ select count(*) from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_expire}", "query_type" = "snapshots") """ + logger.info("Snapshot count before expire: ${snapshot_count_before_expire[0][0]}") + + // Call expire_snapshots via Spark - should not delete snapshots referenced by tags + spark_iceberg """CALL demo.system.expire_snapshots(table => 'test_db_tag_retention.${table_name_expire}', older_than => TIMESTAMP '2020-01-01 00:00:00')""" + + // Verify tags are still accessible after expire_snapshots + qt_expire_tag_early_accessible """ select * from ${table_name_expire}@tag(t_early) order by id """ // Should have 2 records + qt_expire_tag_middle_accessible """ select * from ${table_name_expire}@tag(t_middle) order by id """ // Should have 4 records + + // Verify tag refs are still in refs table + def tag_early_ref_after_expire = sql """ select count(*) from ${table_name_expire}\$refs where name = 't_early' """ + assertEquals(tag_early_ref_after_expire[0][0], 1) + + def tag_middle_ref_after_expire = sql """ select count(*) from ${table_name_expire}\$refs where name = 't_middle' """ + assertEquals(tag_middle_ref_after_expire[0][0], 1) + + // Test 2: Multiple tags pointing to the same snapshot + def table_name_multi_tag = "test_multiple_tags_same_snapshot" + sql """ drop table if exists ${table_name_multi_tag} """ + sql """ create table ${table_name_multi_tag} (id int, value string) """ + + sql """ insert into ${table_name_multi_tag} values (1, 'first') """ + sql """ insert into ${table_name_multi_tag} values (2, 'second') """ + sql """ insert into ${table_name_multi_tag} values (3, 'third') """ + + List> snapshots_multi = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_multi_tag}", "query_type" = "snapshots") order by committed_at; """ + String s_multi_1 = snapshots_multi.get(1)[0] + + // Create multiple tags pointing to the same snapshot + sql """ alter table ${table_name_multi_tag} create tag tag_a AS OF VERSION ${s_multi_1} RETAIN 7 DAYS """ + sql """ alter table ${table_name_multi_tag} create tag tag_b AS OF VERSION ${s_multi_1} RETAIN 15 DAYS """ + sql """ alter table ${table_name_multi_tag} create tag tag_c AS OF VERSION ${s_multi_1} """ + + // Verify all tags return the same data + qt_multi_tag_a """ select * from ${table_name_multi_tag}@tag(tag_a) order by id """ // Should have 1,2 + qt_multi_tag_b """ select * from ${table_name_multi_tag}@tag(tag_b) order by id """ // Should have 1,2 + qt_multi_tag_c """ select * from ${table_name_multi_tag}@tag(tag_c) order by id """ // Should have 1,2 + + // Verify all tags are in refs table + def tag_a_ref = sql """ select count(*) from ${table_name_multi_tag}\$refs where name = 'tag_a' """ + assertEquals(tag_a_ref[0][0], 1) + + def tag_b_ref = sql """ select count(*) from ${table_name_multi_tag}\$refs where name = 'tag_b' """ + assertEquals(tag_b_ref[0][0], 1) + + def tag_c_ref = sql """ select count(*) from ${table_name_multi_tag}\$refs where name = 'tag_c' """ + assertEquals(tag_c_ref[0][0], 1) + + // Test 3: Tag data immutability - tags should not change when main branch is updated + def table_name_immutable = "test_tag_immutability" + sql """ drop table if exists ${table_name_immutable} """ + sql """ create table ${table_name_immutable} (id int, description string, timestamp bigint) """ + + sql """ insert into ${table_name_immutable} values (1, 'version1', 1000) """ + sql """ insert into ${table_name_immutable} values (2, 'version2', 2000) """ + + List> snapshots_imm = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_immutable}", "query_type" = "snapshots") order by committed_at; """ + String s_imm_v1 = snapshots_imm.get(1)[0] + + // Create a tag at this point + sql """ alter table ${table_name_immutable} create tag v1_baseline AS OF VERSION ${s_imm_v1} """ + + // Query the tag immediately + qt_imm_first_query """ select * from ${table_name_immutable}@tag(v1_baseline) order by id """ + + // Make changes to main branch + sql """ insert into ${table_name_immutable} values (3, 'version3', 3000) """ + sql """ insert into ${table_name_immutable} values (4, 'version4', 4000) """ + + // Tag should still return the same data + qt_imm_after_changes """ select * from ${table_name_immutable}@tag(v1_baseline) order by id """ // Should still have 1,2 + + // Main branch should have new data + qt_imm_main_has_new """ select * from ${table_name_immutable} order by id """ // Should have 1,2,3,4 + + // Update main branch with more data + sql """ insert into ${table_name_immutable} values (5, 'version5', 5000) """ + + // Tag should still be unchanged + qt_imm_unchanged """ select * from ${table_name_immutable}@tag(v1_baseline) order by id """ // Should still have 1,2 + + // Test 4: Tag replacement - create or replace tag behavior + def table_name_replace = "test_tag_replacement" + sql """ drop table if exists ${table_name_replace} """ + sql """ create table ${table_name_replace} (id int, status string) """ + + sql """ insert into ${table_name_replace} values (1, 'pending') """ + List> snapshots_replace = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_replace}", "query_type" = "snapshots") order by committed_at; """ + String s_rep_v1 = snapshots_replace.get(0)[0] + + sql """ insert into ${table_name_replace} values (2, 'approved') """ + List> snapshots_replace_2 = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_replace}", "query_type" = "snapshots") order by committed_at; """ + String s_rep_v2 = snapshots_replace_2.get(1)[0] + + // Create initial tag + sql """ alter table ${table_name_replace} create tag status_baseline AS OF VERSION ${s_rep_v1} RETAIN 7 DAYS """ + qt_replace_initial """ select * from ${table_name_replace}@tag(status_baseline) order by id """ // Should have 1 + + // Replace the tag with a different snapshot + sql """ alter table ${table_name_replace} create or replace tag status_baseline AS OF VERSION ${s_rep_v2} RETAIN 30 DAYS """ + qt_replace_after """ select * from ${table_name_replace}@tag(status_baseline) order by id """ // Should have 1,2 + + // Verify the tag ref still exists (replaced, not deleted) + def tag_ref_after_replace = sql """ select count(*) from ${table_name_replace}\$refs where name = 'status_baseline' """ + assertEquals(tag_ref_after_replace[0][0], 1) + + // Test 5: Tag with time travel queries and snapshots at different points + def table_name_time_travel = "test_tag_time_travel" + sql """ drop table if exists ${table_name_time_travel} """ + sql """ create table ${table_name_time_travel} (id int, version string, created_at string) """ + + sql """ insert into ${table_name_time_travel} values (1, 'v1.0', '2024-01-01') """ + sql """ insert into ${table_name_time_travel} values (2, 'v1.0', '2024-01-01') """ + sql """ insert into ${table_name_time_travel} values (3, 'v1.0', '2024-01-01') """ + + List> snapshots_tt = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_time_travel}", "query_type" = "snapshots") order by committed_at; """ + String s_tt_1 = snapshots_tt.get(0)[0] + String s_tt_2 = snapshots_tt.get(1)[0] + String s_tt_3 = snapshots_tt.get(2)[0] + + sql """ alter table ${table_name_time_travel} create tag release_v1 AS OF VERSION ${s_tt_1} """ + sql """ alter table ${table_name_time_travel} create tag release_v2 AS OF VERSION ${s_tt_2} """ + sql """ alter table ${table_name_time_travel} create tag release_v3 AS OF VERSION ${s_tt_3} """ + + // Verify each tag captures the state at that point + qt_tt_v1 """ select * from ${table_name_time_travel}@tag(release_v1) order by id """ // Should have 1 + qt_tt_v2 """ select * from ${table_name_time_travel}@tag(release_v2) order by id """ // Should have 1,2 + qt_tt_v3 """ select * from ${table_name_time_travel}@tag(release_v3) order by id """ // Should have 1,2,3 + + // Add more data to main + sql """ insert into ${table_name_time_travel} values (4, 'v2.0', '2024-02-01'), (5, 'v2.0', '2024-02-01') """ + + // Tags should remain unchanged + qt_tt_v1_unchanged """ select * from ${table_name_time_travel}@tag(release_v1) order by id """ // Should still have 1 + qt_tt_v3_unchanged """ select * from ${table_name_time_travel}@tag(release_v3) order by id """ // Should still have 1,2,3 + + // Main should have all data + qt_tt_main_all """ select * from ${table_name_time_travel} order by id """ // Should have 1,2,3,4,5 + + // Test 6: Tag aggregate queries and consistency + def table_name_agg = "test_tag_aggregates" + sql """ drop table if exists ${table_name_agg} """ + sql """ create table ${table_name_agg} (id int, category string, amount int) """ + + sql """ insert into ${table_name_agg} values (1, 'A', 100) """ + sql """ insert into ${table_name_agg} values (2, 'B', 200) """ + sql """ insert into ${table_name_agg} values (3, 'A', 150) """ + sql """ insert into ${table_name_agg} values (4, 'C', 300) """ + + List> snapshots_agg = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_agg}", "query_type" = "snapshots") order by committed_at; """ + String s_agg_all = snapshots_agg.get(3)[0] + + // Create tag for aggregate queries + sql """ alter table ${table_name_agg} create tag agg_baseline AS OF VERSION ${s_agg_all} """ + + qt_agg_count """ select count(*) from ${table_name_agg}@tag(agg_baseline) """ // Should have 4 + qt_agg_sum """ select sum(amount) from ${table_name_agg}@tag(agg_baseline) """ // Should be 750 + qt_agg_avg """ select avg(amount) from ${table_name_agg}@tag(agg_baseline) """ // Should be 187.5 + qt_agg_group_by """ select category, sum(amount) as total from ${table_name_agg}@tag(agg_baseline) group by category order by category """ // A:250, B:200, C:300 + + // Add more data + sql """ insert into ${table_name_agg} values (5, 'A', 50), (6, 'B', 100) """ + + // Tag aggregates should remain unchanged + qt_agg_count_unchanged """ select count(*) from ${table_name_agg}@tag(agg_baseline) """ // Should still be 4 + qt_agg_sum_unchanged """ select sum(amount) from ${table_name_agg}@tag(agg_baseline) """ // Should still be 750 + + // Test 7: Tag and branch interaction + def table_name_interaction = "test_tag_branch_interaction" + sql """ drop table if exists ${table_name_interaction} """ + sql """ create table ${table_name_interaction} (id int, name string, source string) """ + + sql """ insert into ${table_name_interaction} values (1, 'item1', 'main') """ + sql """ insert into ${table_name_interaction} values (2, 'item2', 'main') """ + + List> snapshots_inter = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_interaction}", "query_type" = "snapshots") order by committed_at; """ + String s_inter_baseline = snapshots_inter.get(1)[0] + + // Create a tag for baseline + sql """ alter table ${table_name_interaction} create tag baseline AS OF VERSION ${s_inter_baseline} """ + + // Create a branch and add data + sql """ alter table ${table_name_interaction} create branch feature_branch """ + sql """ insert into ${table_name_interaction}@branch(feature_branch) values (3, 'item3', 'branch') """ + sql """ insert into ${table_name_interaction}@branch(feature_branch) values (4, 'item4', 'branch') """ + + // Tag should still show baseline data + qt_inter_tag_baseline """ select * from ${table_name_interaction}@tag(baseline) order by id """ // Should have 1,2 + + // Branch should have its own data + qt_inter_branch_data """ select * from ${table_name_interaction}@branch(feature_branch) order by id """ // Should have 1,2,3,4 + + // Join between tag and branch + qt_inter_join """ + select t.id as tag_id, t.name as tag_name, b.id as branch_id, b.name as branch_name + from ${table_name_interaction}@tag(baseline) t + full outer join ${table_name_interaction}@branch(feature_branch) b on t.id = b.id + order by coalesce(t.id, b.id) + """ + + // Test 8: Tag write operations should fail (runtime validation) + def table_name_write_fail = "test_tag_write_fails" + sql """ drop table if exists ${table_name_write_fail} """ + sql """ create table ${table_name_write_fail} (id int, value string) """ + + sql """ insert into ${table_name_write_fail} values (1, 'data1') """ + List> snapshots_fail = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db_tag_retention.${table_name_write_fail}", "query_type" = "snapshots") order by committed_at; """ + String s_fail = snapshots_fail.get(0)[0] + + // Create a tag + sql """ alter table ${table_name_write_fail} create tag protected AS OF VERSION ${s_fail} """ + + // Attempting to insert into a tag should fail + test { + sql """ insert into ${table_name_write_fail}@branch(protected) values (2, 'data2') """ + exception "protected is a tag, not a branch" + } + + test { + sql """ insert overwrite table ${table_name_write_fail}@branch(protected) values (2, 'data2') """ + exception "protected is a tag, not a branch" + } + + // Verify tag is still accessible and unchanged + qt_write_fail_tag_unchanged """ select * from ${table_name_write_fail}@tag(protected) order by id """ // Should still have 1 + +} diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy index 08a72138ee512c..68146796d0ade3 100644 --- a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_insert_data.groovy @@ -45,7 +45,6 @@ suite("iceberg_branch_insert_data", "p0,external,doris,external_docker,external_ String tmp_tb1 = catalog_name + "_tmp_tb1" String tmp_tb2 = catalog_name + "_tmp_tb2" - // create an unpartitioned table sql """ drop table if exists ${tmp_tb1} """ sql """ create table ${tmp_tb1} (id int, par string) """ diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy index b8838eadbaa269..b021227d01f54c 100644 --- a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("iceberg_branch_tag_operate", "p0,external,doris,external_docker,external_docker_doris") { +suite("iceberg_branch_tag_operate", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { String enabled = context.config.otherConfigs.get("enableIcebergTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("disable iceberg test.") @@ -40,48 +40,55 @@ suite("iceberg_branch_tag_operate", "p0,external,doris,external_docker,external_ );""" sql """ use ${catalog_name}.test_db """ + def table_name = "test_branch_tag_operate" - sql """ drop table if exists test_branch_tag_operate """ - sql """ create table test_branch_tag_operate (id int) """ + sql """ drop table if exists ${table_name} """ + sql """ create table ${table_name} (id int) """ // with empty table test { - sql """ alter table test_branch_tag_operate create tag b1 """ + sql """ alter table ${table_name} create tag b1 """ exception "main has no snapshot" } - sql """ alter table test_branch_tag_operate create branch b1 """ - sql """ alter table test_branch_tag_operate create branch if not exists b1 """ - def result = sql """select UPDATE_TIME from information_schema.tables where TABLE_SCHEMA="test_db" and TABLE_NAME='test_branch_tag_operate'""" + sql """ alter table ${table_name} create branch if not exists b1 """ + def snapshot_id1_snapshots = sql """ select snapshot_id from ${table_name}\$snapshots order by committed_at desc limit 1 """ + def snapshot_id1_refs_b1 = sql """ select snapshot_id from ${table_name}\$refs where name = 'b1' """ + assertEquals(snapshot_id1_snapshots[0][0], snapshot_id1_refs_b1[0][0]) + + + sql """ alter table ${table_name} create branch if not exists b1 """ + + def result = sql """select UPDATE_TIME from information_schema.tables where TABLE_SCHEMA="test_db" and TABLE_NAME='${table_name}'""" def update_time1 = result[0][0]; sleep(1000) test { - sql """ alter table test_branch_tag_operate create or replace branch b1 """ + sql """ alter table ${table_name} create or replace branch b1 """ exception "main has no snapshot" } test { - sql """ alter table test_branch_tag_operate create branch b1 """ + sql """ alter table ${table_name} create branch b1 """ exception "Ref b1 already exists" } - qt_q1 """ select * from test_branch_tag_operate@branch(b1) """ // empty table + qt_q1 """ select * from ${table_name}@branch(b1) """ // empty table // with some data - sql """ insert into test_branch_tag_operate values (1) """ - sql """ insert into test_branch_tag_operate values (2) """ - sql """ insert into test_branch_tag_operate values (3) """ - sql """ insert into test_branch_tag_operate values (4) """ - sql """ insert into test_branch_tag_operate values (5) """ - result = sql """select UPDATE_TIME from information_schema.tables where TABLE_SCHEMA="test_db" and TABLE_NAME='test_branch_tag_operate'""" + sql """ insert into ${table_name} values (1) """ + sql """ insert into ${table_name} values (2) """ + sql """ insert into ${table_name} values (3) """ + sql """ insert into ${table_name} values (4) """ + sql """ insert into ${table_name} values (5) """ + result = sql """select UPDATE_TIME from information_schema.tables where TABLE_SCHEMA="test_db" and TABLE_NAME='${table_name}'""" def update_time2 = result[0][0]; logger.info("get update times " + update_time1 + " vs. " + update_time2) assertTrue(update_time2 > update_time1); sleep(1000) - - List> snapshots = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db.test_branch_tag_operate", "query_type" = "snapshots") order by committed_at; """ + + List> snapshots = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db.${table_name}", "query_type" = "snapshots") order by committed_at; """ String s0 = snapshots.get(0)[0] String s1 = snapshots.get(1)[0] String s2 = snapshots.get(2)[0] @@ -89,101 +96,123 @@ suite("iceberg_branch_tag_operate", "p0,external,doris,external_docker,external_ String s4 = snapshots.get(4)[0] // branch - sql """ alter table test_branch_tag_operate create branch b2 as of version ${s0} """ - qt_q2 """ select * from test_branch_tag_operate@branch(b2) order by id """ // 0 records + sql """ alter table ${table_name} create branch b2 as of version ${s0} """ + + qt_q2 """ select * from ${table_name}@branch(b2) order by id """ // 0 records + + sql """ alter table ${table_name} create or replace branch b2 AS OF VERSION ${s1} RETAIN 2 days """ + qt_q3 """ select * from ${table_name}@branch(b2) order by id """ // 1 records + + sql """ alter table ${table_name} create or replace branch b2 AS OF VERSION ${s2} RETAIN 2 hours WITH SNAPSHOT RETENTION 3 SNAPSHOTS""" + qt_q4 """ select * from ${table_name}@branch(b2) order by id """ // 2 records - sql """ alter table test_branch_tag_operate create or replace branch b2 AS OF VERSION ${s1} RETAIN 2 days """ - qt_q3 """ select * from test_branch_tag_operate@branch(b2) order by id """ // 1 records + sql """ alter table ${table_name} replace branch b2 AS OF VERSION ${s3} RETAIN 2 hours WITH SNAPSHOT RETENTION 4 DAYS """ + qt_q5 """ select * from ${table_name}@branch(b2) order by id """ // 3 records - sql """ alter table test_branch_tag_operate create or replace branch b2 AS OF VERSION ${s2} RETAIN 2 hours WITH SNAPSHOT RETENTION 3 SNAPSHOTS""" - qt_q4 """ select * from test_branch_tag_operate@branch(b2) order by id """ // 2 records + sql """ alter table ${table_name} create or replace branch b2 RETAIN 2 hours WITH SNAPSHOT RETENTION 3 SNAPSHOTS 4 DAYS """ + qt_q6 """ select * from ${table_name}@branch(b2) order by id """ // 5 records - sql """ alter table test_branch_tag_operate replace branch b2 AS OF VERSION ${s3} RETAIN 2 hours WITH SNAPSHOT RETENTION 4 DAYS """ - qt_q5 """ select * from test_branch_tag_operate@branch(b2) order by id """ // 3 records + sql """ alter table ${table_name} create or replace branch b3 AS OF VERSION ${s1} RETAIN 2 days """ + qt_q7 """ select * from ${table_name}@branch(b3) order by id """ // 1 records - sql """ alter table test_branch_tag_operate create or replace branch b2 RETAIN 2 hours WITH SNAPSHOT RETENTION 3 SNAPSHOTS 4 DAYS """ - qt_q6 """ select * from test_branch_tag_operate@branch(b2) order by id """ // 5 records + sql """ alter table ${table_name} create branch if not exists b3 AS OF VERSION ${s2} RETAIN 2 days """ + qt_q8 """ select * from ${table_name}@branch(b3) order by id """ // still 1 records - sql """ alter table test_branch_tag_operate create or replace branch b3 AS OF VERSION ${s1} RETAIN 2 days """ - qt_q7 """ select * from test_branch_tag_operate@branch(b3) order by id """ // 1 records + sql """ alter table ${table_name} create branch if not exists b4 AS OF VERSION ${s2} RETAIN 2 MINUTES WITH SNAPSHOT RETENTION 3 SNAPSHOTS """ + qt_q9 """ select * from ${table_name}@branch(b4) order by id """ // 2 records - sql """ alter table test_branch_tag_operate create branch if not exists b3 AS OF VERSION ${s2} RETAIN 2 days """ - qt_q8 """ select * from test_branch_tag_operate@branch(b3) order by id """ // still 1 records + sql """ alter table ${table_name} create branch if not exists b5 """ + qt_q10 """ select * from ${table_name}@branch(b5) order by id """ // 5 records - sql """ alter table test_branch_tag_operate create branch if not exists b4 AS OF VERSION ${s2} RETAIN 2 MINUTES WITH SNAPSHOT RETENTION 3 SNAPSHOTS """ - qt_q9 """ select * from test_branch_tag_operate@branch(b4) order by id """ // 2 records + sql """ alter table ${table_name} create branch if not exists b6 AS OF VERSION ${s2} """ + qt_q11 """ select * from ${table_name}@branch(b6) order by id """ // 2 records - sql """ alter table test_branch_tag_operate create branch if not exists b5 """ - qt_q10 """ select * from test_branch_tag_operate@branch(b5) order by id """ // 5 records + sql """ alter table ${table_name} create or replace branch b6 AS OF VERSION ${s3} """ + qt_q12 """ select * from ${table_name}@branch(b6) order by id """ // 3 records - sql """ alter table test_branch_tag_operate create branch if not exists b6 AS OF VERSION ${s2} """ - qt_q11 """ select * from test_branch_tag_operate@branch(b6) order by id """ // 2 records + sql """ alter table ${table_name} create or replace branch b6 """ + qt_q13 """ select * from ${table_name}@branch(b6) order by id """ // 5 records - sql """ alter table test_branch_tag_operate create or replace branch b6 AS OF VERSION ${s3} """ - qt_q12 """ select * from test_branch_tag_operate@branch(b6) order by id """ // 3 records + sql """ alter table ${table_name} create or replace branch b6 """ + qt_q14 """ select * from ${table_name}@branch(b6) order by id """ // still 5 records - sql """ alter table test_branch_tag_operate create or replace branch b6 """ - qt_q13 """ select * from test_branch_tag_operate@branch(b6) order by id """ // 5 records + sql """ alter table ${table_name} create or replace branch b6 RETAIN 2 DAYS """ + qt_q15 """ select * from ${table_name}@branch(b6) order by id """ // still 5 records - sql """ alter table test_branch_tag_operate create or replace branch b6 """ - qt_q14 """ select * from test_branch_tag_operate@branch(b6) order by id """ // still 5 records + sql """ alter table ${table_name} create branch b7 """ + qt_q16 """ select * from ${table_name}@branch(b7) order by id """ // 5 records - sql """ alter table test_branch_tag_operate create or replace branch b6 RETAIN 2 DAYS """ - qt_q15 """ select * from test_branch_tag_operate@branch(b6) order by id """ // still 5 records + sql """ insert into ${table_name}@branch(b7) values (6), (7) """ + qt_q17 """ select * from ${table_name}@branch(b7) order by id """ // 7 records + + // Test CREATE OR REPLACE BRANCH IF NOT EXISTS - this should fail + test { + sql """ alter table ${table_name} create or replace branch if not exists b7 """ + exception "mismatched input" + } + + sql """ alter table ${table_name} create or replace branch b7 """ + qt_q19 """ select * from ${table_name}@branch(b7) order by id """ // back to 5 records + + sql """alter table ${table_name} create branch if not exists b8""" + qt_q19_1 """ select * from ${table_name}@branch(b8) order by id """ // 5 records + + def snapshot_id2_refs_b8 = sql """ select snapshot_id from ${table_name}\$refs where name = 'b8' """ + // Verify that b8's snapshot_id exists in snapshots table (avoid race condition) + def snapshot_id2_snapshots = sql """ select snapshot_id from ${table_name}\$snapshots where snapshot_id = ${snapshot_id2_refs_b8[0][0]} """ + assertEquals(snapshot_id2_refs_b8[0][0], snapshot_id2_snapshots[0][0]) - sql """ alter table test_branch_tag_operate create branch b7 """ - qt_q16 """ select * from test_branch_tag_operate@branch(b7) order by id """ // 5 records test { - sql """ alter table test_branch_tag_operate create branch b7 as of version ${s3} """ + sql """ alter table ${table_name} create branch b7 as of version ${s3} """ exception "Ref b7 already exists" } test { - sql """ alter table test_branch_tag_operate create branch b8 as of version 11223344 """ - exception "Cannot set b8 to unknown snapshot: 11223344" + sql """ alter table ${table_name} create branch b9 as of version 11223344 """ + exception "Cannot set b9 to unknown snapshot: 11223344" } - result = sql """select UPDATE_TIME from information_schema.tables where TABLE_SCHEMA="test_db" and TABLE_NAME='test_branch_tag_operate'""" + result = sql """select UPDATE_TIME from information_schema.tables where TABLE_SCHEMA="test_db" and TABLE_NAME='${table_name}'""" def update_time3 = result[0][0]; logger.info("get update times " + update_time2 + " vs. " + update_time3) assertTrue(update_time3 > update_time2); // tag - sql """ alter table test_branch_tag_operate create tag t2 as of version ${s0} """ - qt_q20 """ select * from test_branch_tag_operate@tag(t2) order by id """ // 0 records + sql """ alter table ${table_name} create tag t2 as of version ${s0} """ + qt_q20 """ select * from ${table_name}@tag(t2) order by id """ // 0 records - sql """ alter table test_branch_tag_operate create or replace tag t2 as of version ${s1} """ - qt_q21 """ select * from test_branch_tag_operate@tag(t2) order by id """ // 1 records + sql """ alter table ${table_name} create or replace tag t2 as of version ${s1} """ + qt_q21 """ select * from ${table_name}@tag(t2) order by id """ // 1 records - sql """ alter table test_branch_tag_operate create or replace tag t2 as of version ${s2} RETAIN 10 MINUTES """ - qt_q22 """ select * from test_branch_tag_operate@tag(t2) order by id """ // 2 records + sql """ alter table ${table_name} create or replace tag t2 as of version ${s2} RETAIN 10 MINUTES """ + qt_q22 """ select * from ${table_name}@tag(t2) order by id """ // 2 records - sql """ alter table test_branch_tag_operate create or replace tag t2 RETAIN 10 MINUTES """ - qt_q23 """ select * from test_branch_tag_operate@tag(t2) order by id """ // 5 records + sql """ alter table ${table_name} create or replace tag t2 RETAIN 10 MINUTES """ + qt_q23 """ select * from ${table_name}@tag(t2) order by id """ // 5 records - sql """ alter table test_branch_tag_operate create tag if not exists t3 as of version ${s1} """ - qt_q24 """ select * from test_branch_tag_operate@tag(t3) order by id """ // 1 records + sql """ alter table ${table_name} create tag if not exists t3 as of version ${s1} """ + qt_q24 """ select * from ${table_name}@tag(t3) order by id """ // 1 records - sql """ alter table test_branch_tag_operate create tag if not exists t3 as of version ${s2} """ // still 1 records - qt_q25 """ select * from test_branch_tag_operate@tag(t3) order by id """ + sql """ alter table ${table_name} create tag if not exists t3 as of version ${s2} """ // still 1 records + qt_q25 """ select * from ${table_name}@tag(t3) order by id """ - sql """ alter table test_branch_tag_operate create tag t4 as of version ${s2} """ - qt_q26 """ select * from test_branch_tag_operate@tag(t4) order by id """ // 2 records + sql """ alter table ${table_name} create tag t4 as of version ${s2} """ + qt_q26 """ select * from ${table_name}@tag(t4) order by id """ // 2 records - sql """ alter table test_branch_tag_operate create or replace tag t5 as of version ${s3} """ - qt_q27 """ select * from test_branch_tag_operate@tag(t5) order by id """ // 3 records + sql """ alter table ${table_name} create or replace tag t5 as of version ${s3} """ + qt_q27 """ select * from ${table_name}@tag(t5) order by id """ // 3 records - sql """ alter table test_branch_tag_operate create tag t6 """ - qt_q28 """ select * from test_branch_tag_operate@tag(t6) order by id """ // 5 records + sql """ alter table ${table_name} create tag t6 """ + qt_q28 """ select * from ${table_name}@tag(t6) order by id """ // 5 records test { - sql """ alter table test_branch_tag_operate create tag t6 as of version ${s3} """ + sql """ alter table ${table_name} create tag t6 as of version ${s3} """ exception "Ref t6 already exists" } test { - sql """ alter table test_branch_tag_operate create branch t7 as of version 11223344 """ + sql """ alter table ${table_name} create branch t7 as of version 11223344 """ exception "Cannot set t7 to unknown snapshot: 11223344" } @@ -195,7 +224,7 @@ suite("iceberg_branch_tag_operate", "p0,external,doris,external_docker,external_ List> refs = sql """select * from tmp_schema_change_branch\$refs order by name""" String s_main = refs.get(0)[2] String s_test_branch = refs.get(1)[2] - + /// select by version will use branch schema qt_sc04 """SELECT * FROM tmp_schema_change_branch for VERSION AS OF ${s_test_branch} order by id;""" qt_sc05 """SELECT * FROM tmp_schema_change_branch for VERSION AS OF ${s_main} order by id;""" @@ -208,58 +237,58 @@ suite("iceberg_branch_tag_operate", "p0,external,doris,external_docker,external_ // ---------------------------------------------------------------------------------------- test { - sql """ alter table test_branch_tag_operate drop branch if exists t2 """ + sql """ alter table ${table_name} drop branch if exists t2 """ exception "Ref t2 is a tag not a branch" } test { - sql """ alter table test_branch_tag_operate drop branch t2 """ + sql """ alter table ${table_name} drop branch t2 """ exception "Ref t2 is a tag not a branch" } test { - sql """ alter table test_branch_tag_operate drop tag if exists b2 """ + sql """ alter table ${table_name} drop tag if exists b2 """ exception "Ref b2 is a branch not a tag" } test { - sql """ alter table test_branch_tag_operate drop tag b2 """ + sql """ alter table ${table_name} drop tag b2 """ exception "Ref b2 is a branch not a tag" } - sql """ alter table test_branch_tag_operate drop branch if exists not_exists_branch """ + sql """ alter table ${table_name} drop branch if exists not_exists_branch """ test { - sql """ alter table test_branch_tag_operate drop branch not_exists_branch """ + sql """ alter table ${table_name} drop branch not_exists_branch """ exception "Branch does not exist: not_exists_branch" } - sql """ alter table test_branch_tag_operate drop tag if exists not_exists_tag """ + sql """ alter table ${table_name} drop tag if exists not_exists_tag """ test { - sql """ alter table test_branch_tag_operate drop tag not_exists_tag """ + sql """ alter table ${table_name} drop tag not_exists_tag """ exception "Tag does not exist: not_exists_tag" } // drop tag success, then read - sql """ alter table test_branch_tag_operate drop tag t2 """ - sql """ alter table test_branch_tag_operate drop tag if exists t3 """ + sql """ alter table ${table_name} drop tag t2 """ + sql """ alter table ${table_name} drop tag if exists t3 """ test { - sql """ select * from test_branch_tag_operate@tag(t2) """ + sql """ select * from ${table_name}@tag(t2) """ exception "does not have tag named t2" } test { - sql """ select * from test_branch_tag_operate@tag(t3) """ + sql """ select * from ${table_name}@tag(t3) """ exception "does not have tag named t3" } // drop branch success, then read - sql """ alter table test_branch_tag_operate drop branch b2 """ - sql """ alter table test_branch_tag_operate drop branch if exists b3 """ + sql """ alter table ${table_name} drop branch b2 """ + sql """ alter table ${table_name} drop branch if exists b3 """ test { - sql """ select * from test_branch_tag_operate@branch(b2) """ + sql """ select * from ${table_name}@branch(b2) """ exception "does not have branch named b2" } test { - sql """ select * from test_branch_tag_operate@branch(b3) """ + sql """ select * from ${table_name}@branch(b3) """ exception "does not have branch named b3" } } diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_query_tag_branch.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_query_tag_branch.groovy index 1fb21cb8c27f55..8e70003f1c7629 100644 --- a/regression-test/suites/external_table_p0/iceberg/iceberg_query_tag_branch.groovy +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_query_tag_branch.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("iceberg_query_tag_branch", "p0,external,doris,external_docker,external_docker_doris") { +suite("iceberg_query_tag_branch", "p0,external,doris,external_docker,external_docker_doris,branch_tag") { String enabled = context.config.otherConfigs.get("enableIcebergTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) {