diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index b6a4177bc497bf..b16ef30876048b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -35,6 +35,7 @@ import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -111,6 +112,7 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse public static final String FUZZY_PARSE = "fuzzy_parse"; public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior"; public static final String WORKLOAD_GROUP = "workload_group"; @@ -141,6 +143,7 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse .add(SEND_BATCH_PARALLELISM) .add(LOAD_TO_SINGLE_TABLET) .add(PARTIAL_COLUMNS) + .add(PARTIAL_UPDATE_NEW_KEY_POLICY) .add(WORKLOAD_GROUP) .add(LoadStmt.KEY_ENCLOSE) .add(LoadStmt.KEY_ESCAPE) @@ -178,6 +181,8 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse */ @Getter private boolean isPartialUpdate = false; + @Getter + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; private String comment = ""; @@ -211,6 +216,15 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List jobProperties, modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { - this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS)); + boolean isPartialUpdate = + BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS)); + this.uniquekeyUpdateMode = + isPartialUpdate ? TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT; + } + if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY)) { + String policy = jobProperties.get(CreateRoutineLoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY); + if ("ERROR".equalsIgnoreCase(policy)) { + this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR; + } else { + this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + } } } LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e050c6204df498..2ba80e0fcdc0c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -67,8 +67,10 @@ import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; @@ -222,7 +224,8 @@ public boolean isFinalState() { @SerializedName("mbsb") protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; - protected boolean isPartialUpdate = false; + protected TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; protected String sequenceCol; @@ -393,7 +396,10 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet)); jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, stmt.isPartialUpdate() ? "true" : "false"); if (stmt.isPartialUpdate()) { - this.isPartialUpdate = true; + this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + this.partialUpdateNewKeyPolicy = stmt.getPartialUpdateNewKeyPolicy(); + jobProperties.put(CreateRoutineLoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY, + this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND"); } jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); @@ -726,7 +732,22 @@ public List getHiddenColumns() { @Override public boolean isFixedPartialUpdate() { - return isPartialUpdate; + return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + + @Override + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return uniquekeyUpdateMode; + } + + @Override + public boolean isFlexiblePartialUpdate() { + return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; + } + + @Override + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; } @Override @@ -1015,7 +1036,7 @@ public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId loadId, throw new UserException("txn does not exist: " + txnId); } txnState.addTableIndexes(planner.getDestTable()); - if (isPartialUpdate) { + if (isFixedPartialUpdate()) { txnState.setSchemaForPartialUpdate((OlapTable) table); } @@ -1733,8 +1754,8 @@ public String getShowCreateInfo() { appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false); appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false); appendProperties(sb, FileFormatProperties.PROP_FORMAT, getFormat(), false); - if (isPartialUpdate) { - appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, isPartialUpdate, false); + if (isFixedPartialUpdate()) { + appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, isFixedPartialUpdate(), false); } appendProperties(sb, JsonFileFormatProperties.PROP_JSON_PATHS, getJsonPaths(), false); appendProperties(sb, JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, isStripOuterArray(), false); @@ -1829,7 +1850,11 @@ public String jobPropertiesToJsonString() { sequenceCol == null ? STAR_STRING : sequenceCol); // job properties defined in CreateRoutineLoadStmt - jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isFixedPartialUpdate())); + if (isFixedPartialUpdate()) { + jobProperties.put(CreateRoutineLoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY, + partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND"); + } jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum)); jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS)); jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows)); @@ -1895,7 +1920,16 @@ public void gsonPostProcess() throws IOException { } jobProperties.forEach((k, v) -> { if (k.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { - isPartialUpdate = Boolean.parseBoolean(v); + boolean isPartialUpdate = Boolean.parseBoolean(v); + if (isPartialUpdate) { + this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + } else if (k.equals(CreateRoutineLoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY)) { + if ("ERROR".equalsIgnoreCase(v)) { + partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR; + } else { + partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + } } }); SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), @@ -1972,7 +2006,10 @@ protected void readFields(DataInput in) throws IOException { String value = Text.readString(in); jobProperties.put(key, value); if (key.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { - isPartialUpdate = Boolean.parseBoolean(value); + boolean isPartialUpdate = Boolean.parseBoolean(value); + if (isPartialUpdate) { + this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } } } diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out new file mode 100644 index 00000000000000..a8eed2b2a057fb --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_initial -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_partial_update -- +1 alice 150 20 +2 bob 95 21 +3 charlie 80 22 +100 \N 100 \N + diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out new file mode 100644 index 00000000000000..45b434b8cad23f --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out @@ -0,0 +1,27 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_initial -- +1 1 1 1 +2 2 2 2 +3 3 3 3 + +-- !select_after_append -- +1 10 1 1 +2 20 2 2 +3 3 3 3 +4 40 \N \N +5 50 \N \N + +-- !select_after_error_mode -- +1 1 100 1 +2 2 200 2 +3 3 3 3 +4 4 40 4 +5 5 50 5 + +-- !select_after_error_rejected -- +1 1 100 1 +2 2 200 2 +3 3 3 3 +4 4 40 4 +5 5 50 5 + diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy new file mode 100644 index 00000000000000..85888b9e439b9b --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.RoutineLoadTestUtils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord + +suite("test_routine_load_partial_update", "nonConcurrent") { + def kafkaCsvTopic = "test_routine_load_partial_update" + + if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) { + def runSql = { String q -> sql q } + def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context) + def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker) + + def tableName = "test_routine_load_partial_update" + def job = "test_partial_update_job" + + sql """ DROP TABLE IF EXISTS ${tableName} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` int NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'test partial update' + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial "SELECT * FROM ${tableName} ORDER BY id" + + try { + // create routine load with partial_columns=true + // only update id and score columns + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY ",", + COLUMNS (id, score) + PROPERTIES + ( + "max_batch_interval" = "10", + "partial_columns" = "true" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send partial update data to kafka + // update score for id=1 from 100 to 150 + // update score for id=2 from 90 to 95 + def data = [ + "1,150", + "2,95", + "100,100" + ] + + data.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3) + + // verify partial update: score should be updated, name and age should remain unchanged + qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job}" + } + } +} diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy new file mode 100644 index 00000000000000..a6a97253e986d0 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.RoutineLoadTestUtils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord + +suite("test_routine_load_partial_update_new_key_behavior", "nonConcurrent") { + def kafkaCsvTopic = "test_routine_load_partial_update_new_key_behavior" + + if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) { + def runSql = { String q -> sql q } + def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context) + def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker) + + // Test 1: partial_update_new_key_behavior=APPEND (default) + def tableName1 = "test_routine_load_pu_new_key_append" + def job1 = "test_new_key_behavior_append" + + sql """ DROP TABLE IF EXISTS ${tableName1} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + sql """ + INSERT INTO ${tableName1} VALUES + (1, 1, 1, 1), + (2, 2, 2, 2), + (3, 3, 3, 3) + """ + + qt_select_initial "SELECT * FROM ${tableName1} ORDER BY k" + try { + sql """ + CREATE ROUTINE LOAD ${job1} ON ${tableName1} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c1) + PROPERTIES + ( + "max_batch_interval" = "10", + "partial_columns" = "true", + "partial_update_new_key_behavior" = "append" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send data with existing keys and new keys + def data1 = [ + "1,10", // update existing key + "2,20", // update existing key + "4,40", // new key - should be appended with default values for c2 and c3 + "5,50" // new key - should be appended with default values for c2 and c3 + ] + + data1.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + sql "set skip_delete_bitmap=true;" + sql "sync;" + RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 6) + sql "set skip_delete_bitmap=false;" + sql "sync;" + + // verify: new keys should be appended + qt_select_after_append "SELECT * FROM ${tableName1} ORDER BY k" + + } catch (Exception e) { + logger.info("Caught expected exception: ${e.getMessage()}") + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job1}" + } + + // Test 2: partial_update_new_key_behavior=ERROR + def tableName2 = "test_routine_load_pu_new_key_error" + def job2 = "test_new_key_behavior_error" + + sql """ DROP TABLE IF EXISTS ${tableName2} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + sql """ + INSERT INTO ${tableName2} VALUES + (1, 1, 1, 1), + (2, 2, 2, 2), + (3, 3, 3, 3), + (4, 4, 4, 4), + (5, 5, 5, 5) + """ + try { + sql """ + CREATE ROUTINE LOAD ${job2} ON ${tableName2} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c2) + PROPERTIES + ( + "max_batch_interval" = "10", + "partial_columns" = "true", + "partial_update_new_key_behavior" = "error" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send data with only existing keys (should succeed) + def data2 = [ + "1,100", + "2,200" + ] + + data2.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + sql "set skip_delete_bitmap=true;" + sql "sync;" + RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 6) + sql "set skip_delete_bitmap=false;" + sql "sync;" + + // verify: existing keys should be updated + qt_select_after_error_mode "SELECT * FROM ${tableName2} ORDER BY k" + + // Now send data with new keys - this should fail the task + def data3 = [ + "10,1000", // new key - should cause error + "11,1100" // new key - should cause error + ] + + data3.each { line -> + logger.info("Sending to Kafka with new keys: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + RoutineLoadTestUtils.waitForTaskAbort(runSql, job2) + def state = sql "SHOW ROUTINE LOAD FOR ${job2}" + logger.info("routine load state after new keys: ${state[0][8].toString()}") + logger.info("routine load error rows: ${state[0][15].toString()}") + + // the data should not be loaded due to error + qt_select_after_error_rejected "SELECT * FROM ${tableName2} ORDER BY k" + + } catch (Exception e) { + logger.info("Caught expected exception: ${e.getMessage()}") + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job2}" + } + + // Test 3: Test invalid property value + def tableName3 = "test_routine_load_pu_invalid_prop" + sql """ DROP TABLE IF EXISTS ${tableName3} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD test_invalid_property ON ${tableName3} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c3) + PROPERTIES + ( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "invalid" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "partial_update_new_key_behavior should be one of {'APPEND', 'ERROR'}" + } + + // Test 4: Test setting property without partial_columns + def tableName4 = "test_routine_load_pu_without_partial" + sql """ DROP TABLE IF EXISTS ${tableName4} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName4} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD test_without_partial_columns ON ${tableName4} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c3) + PROPERTIES + ( + "partial_update_new_key_behavior" = "append" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "partial_update_new_key_behavior can only be set when partial_columns is true" + } + } +}