From 486bab49aba96366032122133c13f4025fd287e3 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Fri, 27 Sep 2024 18:27:17 +0800 Subject: [PATCH 1/2] partial-update merge engine support batch deletion using flink sql --- .../java/org/apache/paimon/CoreOptions.java | 4 +++ ...pportsRowLevelOperationFlinkTableSink.java | 15 ++++++++--- .../paimon/flink/PartialUpdateITCase.java | 25 +++++++++++++++++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 569816115530..bc5af04d6de6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1942,6 +1942,10 @@ public List sequenceField() { .orElse(Collections.emptyList()); } + public boolean partialUpdateRemoveRecordOnDelete() { + return options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); + } + public Optional rowkindField() { return options.getOptional(ROWKIND_FIELD); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java index 583c1c9d161c..22a17cb33a2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java @@ -185,11 +185,18 @@ private void validateDeletable() { table.getClass().getName())); } - MergeEngine mergeEngine = CoreOptions.fromMap(table.options()).mergeEngine(); - if (mergeEngine != DEDUPLICATE) { - throw new UnsupportedOperationException( - String.format("Merge engine %s can not support batch delete.", mergeEngine)); + CoreOptions coreOptions = CoreOptions.fromMap(table.options()); + if (coreOptions.mergeEngine() == DEDUPLICATE + || coreOptions.ignoreDelete() + || (coreOptions.mergeEngine() == PARTIAL_UPDATE + && coreOptions.partialUpdateRemoveRecordOnDelete())) { + return; } + + throw new UnsupportedOperationException( + String.format( + "Merge engine %s can not support batch delete.", + coreOptions.mergeEngine())); } private boolean canPushDownDeleteFilter() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index 4505487fae1b..68f109f0427a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -621,4 +621,29 @@ public void testIgnoreDelete(boolean localMerge) throws Exception { Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple")); iterator.close(); } + + @Test + public void testRemoveRecordOnDelete() { + sql( + "CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + + " 'merge-engine' = 'partial-update'," + + " 'partial-update.remove-record-on-delete' = 'true'" + + ")"); + + sql("INSERT INTO remove_record_on_delete VALUES (1, CAST (NULL AS STRING), 'apple')"); + + // delete record + sql("DELETE FROM remove_record_on_delete WHERE pk = 1"); + + // batch read + assertThat(sql("SELECT * FROM remove_record_on_delete")).isEmpty(); + + // insert records + sql("INSERT INTO remove_record_on_delete VALUES (1, CAST (NULL AS STRING), 'apache')"); + sql("INSERT INTO remove_record_on_delete VALUES (1, 'A', CAST (NULL AS STRING))"); + + // batch read + assertThat(sql("SELECT * FROM remove_record_on_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", "apache")); + } } From a9b689f0c1775979c68fb97ab1ea2ee6f4f9157c Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 30 Sep 2024 09:34:02 +0800 Subject: [PATCH 2/2] remove ignoreDelete --- .../flink/sink/SupportsRowLevelOperationFlinkTableSink.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java index 22a17cb33a2f..4e4c2ff2c67f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java @@ -187,7 +187,6 @@ private void validateDeletable() { CoreOptions coreOptions = CoreOptions.fromMap(table.options()); if (coreOptions.mergeEngine() == DEDUPLICATE - || coreOptions.ignoreDelete() || (coreOptions.mergeEngine() == PARTIAL_UPDATE && coreOptions.partialUpdateRemoveRecordOnDelete())) { return;