diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 3fb8930200b2ff..32dc115255a850 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -405,6 +405,7 @@ module.exports = [ "LOAD", "MINI LOAD", "MULTI LOAD", + "alter-routine-load", "PAUSE ROUTINE LOAD", "RESTORE TABLET", "RESUME ROUTINE LOAD", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index da69db3ec919d1..bd2693e5176a1d 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -418,6 +418,7 @@ module.exports = [ "LOAD", "MINI LOAD", "MULTI LOAD", + "alter-routine-load", "PAUSE ROUTINE LOAD", "RESUME ROUTINE LOAD", "ROUTINE LOAD", diff --git a/docs/en/administrator-guide/load-data/routine-load-manual.md b/docs/en/administrator-guide/load-data/routine-load-manual.md index 791ffa4b7352e8..6566e176d0ff15 100644 --- a/docs/en/administrator-guide/load-data/routine-load-manual.md +++ b/docs/en/administrator-guide/load-data/routine-load-manual.md @@ -237,6 +237,10 @@ Specific commands and examples for viewing the **Task** status can be viewed wit You can only view tasks that are currently running, and tasks that have ended and are not started cannot be viewed. +### Alter job + +Users can modify jobs that have been created. Specific instructions can be viewed through the `HELP ALTER ROUTINE LOAD;` command. Or refer to [ALTER ROUTINE LOAD](../../sql-reference/sql-statements/Data Manipulation/alter-routine-load.md). + ### Job Control The user can control the stop, pause and restart of the job by the three commands `STOP/PAUSE/RESUME`. You can view help and examples with the three commands `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` and `HELP RESUME ROUTINE LOAD;`. diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md new file mode 100644 index 00000000000000..e2cf4374c8f639 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md @@ -0,0 +1,111 @@ +--- +{ + "title": "ALTER ROUTINE LOAD", + "language": "en" +} +--- + + + +# ALTER ROUTINE LOAD +## description + +This syntax is used to modify a routine import job that has been created. + +Only jobs in the PAUSED state can be modified. + +Syntax: + + ALTER ROUTINE LOAD FOR [db.]job_name + [job_properties] + FROM data_source + [data_source_properties] + +1. `[db.]job_name` + + Specify the name of the job to be modified. + +2. `job_properties` + + Specify the job parameters that need to be modified. Currently only supports the modification of the following parameters: + + 1. `desired_concurrent_number` + 2. `max_error_number` + 3. `max_batch_interval` + 4. `max_batch_rows` + 5. `max_batch_size` + 6. `jsonpaths` + 7. `json_root` + 8. `strip_outer_array` + 9. `strict_mode` + 10. `timezone` + + +3. `data_source` + + The type of data source. Currently supported: + + KAFKA + +4. `data_source_properties` + + The relevant attributes of the data source. Currently only supports: + + 1. `kafka_partitions` + 2. `kafka_offsets` + 3. Custom property, such as `property.group.id` + + Notice: + + 1. `kafka_partitions` and `kafka_offsets` are used to modify the offset of the kafka partition to be consumed, and can only modify the currently consumed partition. Cannot add partition. + +## example + +1. Modify `desired_concurrent_number` to 1 + + ``` + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "1" + ); + ``` + +2. Modify `desired_concurrent_number` to 10, modify partition offset, and modify group id。 + + ``` + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "10" + ) + FROM kafka + ( + "kafka_partitions" = "0, 1, 2", + "kafka_offsets" = "100, 200, 100", + "property.group.id" = "new_group" + ); + ``` + + +## keyword + + ALTER,ROUTINE,LOAD + diff --git a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md index da4b1fa443ed0a..47925bb19d2992 100644 --- a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md @@ -238,6 +238,10 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。 +### 修改作业属性 + +用户可以修改已经创建的作业。具体说明可以通过 `HELP ALTER ROUTINE LOAD;` 命令查看。或参阅 [ALTER ROUTINE LOAD](../../sql-reference/sql-statements/Data Manipulation/alter-routine-load.md)。 + ### 作业控制 用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` 以及 `HELP RESUME ROUTINE LOAD;` 三个命令查看帮助和示例。 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md new file mode 100644 index 00000000000000..c814fd72c7e00f --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md @@ -0,0 +1,115 @@ +--- +{ + "title": "ALTER ROUTINE LOAD", + "language": "zh-CN" +} +--- + + + +# ALTER ROUTINE LOAD +## description + +该语法用于修改已经创建的例行导入作业。 + +只能修改处于 PAUSED 状态的作业。 + +语法: + + ALTER ROUTINE LOAD FOR [db.]job_name + [job_properties] + FROM data_source + [data_source_properties] + +1. `[db.]job_name` + + 指定要修改的作业名称。 + +2. `tbl_name` + + 指定需要导入的表的名称。 + +3. `job_properties` + + 指定需要修改的作业参数。目前仅支持如下参数的修改: + + 1. `desired_concurrent_number` + 2. `max_error_number` + 3. `max_batch_interval` + 4. `max_batch_rows` + 5. `max_batch_size` + 6. `jsonpaths` + 7. `json_root` + 8. `strip_outer_array` + 9. `strict_mode` + 10. `timezone` + + +4. `data_source` + + 数据源的类型。当前支持: + + KAFKA + +5. `data_source_properties` + + 数据源的相关属性。目前仅支持: + + 1. `kafka_partitions` + 2. `kafka_offsets` + 3. 自定义 property,如 `property.group.id` + + 注: + + 1. `kafka_partitions` 和 `kafka_offsets` 用于修改待消费的 kafka partition 的offset,仅能修改当前已经消费的 partition。不能新增 partition。 + +## example + +1. 将 `desired_concurrent_number` 修改为 1 + + ``` + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "1" + ); + ``` + +2. 将 `desired_concurrent_number` 修改为 10,修改 partition 的offset,修改 group id。 + + ``` + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "10" + ) + FROM kafka + ( + "kafka_partitions" = "0, 1, 2", + "kafka_offsets" = "100, 200, 100", + "property.group.id" = "new_group" + ); + ``` + + +## keyword + + ALTER,ROUTINE,LOAD + diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index d6b55bd89b8628..4e4e028e41fcad 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -473,11 +473,13 @@ nonterminal Boolean opt_verbose; nonterminal Boolean opt_tmp; nonterminal OutFileClause opt_outfile; +nonterminal RoutineLoadDataSourceProperties opt_datasource_properties; precedence nonassoc COMMA; precedence nonassoc STRING_LITERAL; precedence nonassoc KW_COLUMNS; precedence nonassoc KW_WITH; + precedence left KW_FULL, KW_MERGE; precedence left DOT; precedence left SET_VAR; @@ -728,11 +730,26 @@ alter_stmt ::= {: RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.REPLICA, String.valueOf(number)); :} - | KW_ALTER KW_DATABASE ident:dbName KW_RENAME ident:newDbName {: RESULT = new AlterDatabaseRename(dbName, newDbName); :} + | KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties + opt_datasource_properties:datasourceProperties + {: + RESULT = new AlterRoutineLoadStmt(jobLabel, jobProperties, datasourceProperties); + :} + ; + +opt_datasource_properties ::= + // empty + {: + RESULT = new RoutineLoadDataSourceProperties(); + :} + | KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN + {: + RESULT = new RoutineLoadDataSourceProperties(type, customProperties); + :} ; quantity ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java new file mode 100644 index 00000000000000..47bfc659efa6a9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Optional; + +/** + * ALTER ROUTINE LOAD db.label + * PROPERTIES( + * ... + * ) + * FROM kafka ( + * ... + * ) + */ +public class AlterRoutineLoadStmt extends DdlStmt { + + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; + + private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + .add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY) + .add(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY) + .add(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY) + .add(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY) + .add(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY) + .add(CreateRoutineLoadStmt.JSONPATHS) + .add(CreateRoutineLoadStmt.JSONROOT) + .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY) + .add(LoadStmt.STRICT_MODE) + .add(LoadStmt.TIMEZONE) + .build(); + + private final LabelName labelName; + private final Map jobProperties; + private final RoutineLoadDataSourceProperties dataSourceProperties; + + // save analyzed job properties. + // analyzed data source properties are saved in dataSourceProperties. + private Map analyzedJobProperties = Maps.newHashMap(); + + public AlterRoutineLoadStmt(LabelName labelName, Map jobProperties, + RoutineLoadDataSourceProperties dataSourceProperties) { + this.labelName = labelName; + this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap(); + this.dataSourceProperties = dataSourceProperties; + } + + public String getDbName() { + return labelName.getDbName(); + } + + public String getLabel() { + return labelName.getLabelName(); + } + + public Map getAnalyzedJobProperties() { + return analyzedJobProperties; + } + + public boolean hasDataSourceProperty() { + return dataSourceProperties.hasAnalyzedProperties(); + } + + public RoutineLoadDataSourceProperties getDataSourceProperties() { + return dataSourceProperties; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + labelName.analyze(analyzer); + FeNameFormat.checkCommonName(NAME_TYPE, labelName.getLabelName()); + // check routine load job properties include desired concurrent number etc. + checkJobProperties(); + // check data source properties + checkDataSourceProperties(); + + if (analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) { + throw new AnalysisException("No properties are specified"); + } + } + + private void checkJobProperties() throws UserException { + Optional optional = jobProperties.keySet().stream().filter( + entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity)).findFirst(); + if (optional.isPresent()) { + throw new AnalysisException(optional.get() + " is invalid property"); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) { + long desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault( + jobProperties.get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY), + -1, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PRED, + CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0")).intValue(); + analyzedJobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, + String.valueOf(desiredConcurrentNum)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) { + long maxErrorNum = Util.getLongPropertyOrDefault( + jobProperties.get(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY), + -1, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PRED, + CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY + " should >= 0"); + analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, + String.valueOf(maxErrorNum)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) { + long maxBatchIntervalS = Util.getLongPropertyOrDefault( + jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY), + -1, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_PRED, + CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY + " should between 5 and 60"); + analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, + String.valueOf(maxBatchIntervalS)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) { + long maxBatchRows = Util.getLongPropertyOrDefault( + jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY), + -1, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PRED, + CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY + " should > 200000"); + analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, + String.valueOf(maxBatchRows)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) { + long maxBatchSizeBytes = Util.getLongPropertyOrDefault( + jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY), + -1, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PRED, + CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB"); + analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, + String.valueOf(maxBatchSizeBytes)); + } + + if (jobProperties.containsKey(LoadStmt.STRICT_MODE)) { + boolean strictMode = Boolean.valueOf(jobProperties.get(LoadStmt.STRICT_MODE)); + analyzedJobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(strictMode)); + } + + if (jobProperties.containsKey(LoadStmt.TIMEZONE)) { + String timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.get(LoadStmt.TIMEZONE)); + analyzedJobProperties.put(LoadStmt.TIMEZONE, timezone); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.JSONPATHS)) { + analyzedJobProperties.put(CreateRoutineLoadStmt.JSONPATHS, jobProperties.get(CreateRoutineLoadStmt.JSONPATHS)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.JSONROOT)) { + analyzedJobProperties.put(CreateRoutineLoadStmt.JSONROOT, jobProperties.get(CreateRoutineLoadStmt.JSONROOT)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)) { + boolean stripOuterArray = Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)); + analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY), + String.valueOf(stripOuterArray)); + } + } + + private void checkDataSourceProperties() throws AnalysisException { + dataSourceProperties.analyze(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 6861cafdc16c36..9c9434b937f993 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -17,10 +17,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; @@ -34,6 +30,11 @@ import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.util.List; import java.util.Map; import java.util.Optional; @@ -162,14 +163,14 @@ public class CreateRoutineLoadStmt extends DdlStmt { // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); - //custom kafka property map + // custom kafka property map private Map customKafkaProperties = Maps.newHashMap(); - private static final Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; }; - private static final Predicate MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; }; - private static final Predicate MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; }; - private static final Predicate MAX_BATCH_ROWS_PRED = (v) -> { return v >= 200000; }; - private static final Predicate MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; }; + public static final Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; }; + public static final Predicate MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; }; + public static final Predicate MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; }; + public static final Predicate MAX_BATCH_ROWS_PRED = (v) -> { return v >= 200000; }; + public static final Predicate MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; }; public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, Map jobProperties, @@ -366,12 +367,12 @@ private void checkJobProperties() throws UserException { format = jobProperties.get(FORMAT); if (format != null) { if (format.equalsIgnoreCase("csv")) { - format = "";// if it's not json, then it's mean csv and set empty + format = ""; // if it's not json, then it's mean csv and set empty } else if (format.equalsIgnoreCase("json")) { format = "json"; jsonPaths = jobProperties.get(JSONPATHS); jsonRoot = jobProperties.get(JSONROOT); - stripOuterArray = Boolean.valueOf(jobProperties.get(STRIP_OUTER_ARRAY)); + stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false")); } else { throw new UserException("Format type is invalid. format=`" + format + "`"); } @@ -424,58 +425,74 @@ private void checkKafkaProperties() throws AnalysisException { } // check partitions - final String kafkaPartitionsString = dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY); + String kafkaPartitionsString = dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY); if (kafkaPartitionsString != null) { - kafkaPartitionsString.replaceAll(" ", ""); - if (kafkaPartitionsString.isEmpty()) { - throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could not be a empty string"); - } - String[] kafkaPartionsStringList = kafkaPartitionsString.split(","); - for (String s : kafkaPartionsStringList) { - try { - kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY), - KafkaProgress.OFFSET_END_VAL)); - } catch (AnalysisException e) { - throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY - + " must be a number string with comma-separated"); - } - } + analyzeKafkaPartitionProperty(kafkaPartitionsString, this.kafkaPartitionOffsets); } // check offset - final String kafkaOffsetsString = dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY); + String kafkaOffsetsString = dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY); if (kafkaOffsetsString != null) { - kafkaOffsetsString.replaceAll(" ", ""); - if (kafkaOffsetsString.isEmpty()) { - throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not be a empty string"); - } - String[] kafkaOffsetsStringList = kafkaOffsetsString.split(","); - if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) { - throw new AnalysisException("Partitions number should be equals to offsets number"); + analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets); + } + + // check custom kafka property + analyzeCustomProperties(this.dataSourceProperties, this.customKafkaProperties); + } + + public static void analyzeKafkaPartitionProperty(String kafkaPartitionsString, + List> kafkaPartitionOffsets) throws AnalysisException { + kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", ""); + if (kafkaPartitionsString.isEmpty()) { + throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could not be a empty string"); + } + String[] kafkaPartionsStringList = kafkaPartitionsString.split(","); + for (String s : kafkaPartionsStringList) { + try { + kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY), + KafkaProgress.OFFSET_END_VAL)); + } catch (AnalysisException e) { + throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + + " must be a number string with comma-separated"); } + } + } + + public static void analyzeKafkaOffsetProperty(String kafkaOffsetsString, + List> kafkaPartitionOffsets) throws AnalysisException { + kafkaOffsetsString = kafkaOffsetsString.replaceAll(" ", ""); + if (kafkaOffsetsString.isEmpty()) { + throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not be a empty string"); + } + String[] kafkaOffsetsStringList = kafkaOffsetsString.split(","); + if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) { + throw new AnalysisException("Partitions number should be equals to offsets number"); + } - for (int i = 0; i < kafkaOffsetsStringList.length; i++) { - // defined in librdkafka/rdkafkacpp.h - // OFFSET_BEGINNING: -2 - // OFFSET_END: -1 - try { - kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i], - KAFKA_OFFSETS_PROPERTY); - if (kafkaPartitionOffsets.get(i).second < 0) { - throw new AnalysisException("Cannot specify offset smaller than 0"); - } - } catch (AnalysisException e) { - if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { - kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; - } else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) { - kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; - } else { - throw e; - } + for (int i = 0; i < kafkaOffsetsStringList.length; i++) { + // defined in librdkafka/rdkafkacpp.h + // OFFSET_BEGINNING: -2 + // OFFSET_END: -1 + try { + kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i], + KAFKA_OFFSETS_PROPERTY); + if (kafkaPartitionOffsets.get(i).second < 0) { + throw new AnalysisException("Can not specify offset smaller than 0"); + } + } catch (AnalysisException e) { + if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; + } else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; + } else { + throw e; } } } - // check custom kafka property + } + + public static void analyzeCustomProperties(Map dataSourceProperties, + Map customKafkaProperties) throws AnalysisException { for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { if (dataSourceProperty.getKey().startsWith("property.")) { String propertyKey = dataSourceProperty.getKey(); @@ -486,11 +503,11 @@ private void checkKafkaProperties() throws AnalysisException { } customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue); } - //can be extended in the future which other prefix + // can be extended in the future which other prefix } } - private int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { + private static int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { if (valueString.isEmpty()) { throw new AnalysisException(propertyName + " could not be a empty string"); } @@ -503,7 +520,7 @@ private int getIntegerValueFromString(String valueString, String propertyName) t return value; } - private long getLongValueFromString(String valueString, String propertyName) throws AnalysisException { + private static long getLongValueFromString(String valueString, String propertyName) throws AnalysisException { if (valueString.isEmpty()) { throw new AnalysisException(propertyName + " could not be a empty string"); } @@ -511,7 +528,7 @@ private long getLongValueFromString(String valueString, String propertyName) thr try { value = Long.valueOf(valueString); } catch (NumberFormatException e) { - throw new AnalysisException(propertyName + " must be a integer"); + throw new AnalysisException(propertyName + " must be a integer: " + valueString); } return value; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java new file mode 100644 index 00000000000000..1743b2be32f111 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.load.routineload.LoadDataSourceType; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class RoutineLoadDataSourceProperties { + private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) + .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) + .build(); + + @SerializedName(value = "type") + private String type = "KAFKA"; + // origin properties, no need to persist + private Map properties = Maps.newHashMap(); + @SerializedName(value = "kafkaPartitionOffsets") + private List> kafkaPartitionOffsets = Lists.newArrayList(); + @SerializedName(value = "customKafkaProperties") + private Map customKafkaProperties = Maps.newHashMap(); + + public RoutineLoadDataSourceProperties() { + // empty + } + + public RoutineLoadDataSourceProperties(String type, Map properties) { + this.type = type.toUpperCase(); + this.properties = properties; + } + + public void analyze() throws AnalysisException { + checkDataSourceProperties(); + } + + public boolean hasAnalyzedProperties() { + return !kafkaPartitionOffsets.isEmpty() || !customKafkaProperties.isEmpty(); + } + + public String getType() { + return type; + } + + public List> getKafkaPartitionOffsets() { + return kafkaPartitionOffsets; + } + + public Map getCustomKafkaProperties() { + return customKafkaProperties; + } + + private void checkDataSourceProperties() throws AnalysisException { + LoadDataSourceType sourceType; + try { + sourceType = LoadDataSourceType.valueOf(type); + } catch (IllegalArgumentException e) { + throw new AnalysisException("routine load job does not support this type " + type); + } + switch (sourceType) { + case KAFKA: + checkKafkaProperties(); + break; + default: + break; + } + } + + private void checkKafkaProperties() throws AnalysisException { + Optional optional = properties.keySet().stream().filter( + entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity)).filter( + entity -> !entity.startsWith("property.")).findFirst(); + if (optional.isPresent()) { + throw new AnalysisException(optional.get() + " is invalid kafka custom property"); + } + + // check partitions + final String kafkaPartitionsString = properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY); + if (kafkaPartitionsString != null) { + + if (!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) { + throw new AnalysisException("Partition and offset must be specified at the same time"); + } + + CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, kafkaPartitionOffsets); + } else { + if (properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) { + throw new AnalysisException("Missing kafka partition info"); + } + } + + // check offset + String kafkaOffsetsString = properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY); + if (kafkaOffsetsString != null) { + CreateRoutineLoadStmt.analyzeKafkaOffsetProperty(kafkaOffsetsString, kafkaPartitionOffsets); + } + + // check custom properties + CreateRoutineLoadStmt.analyzeCustomProperties(properties, customKafkaProperties); + } + + @Override + public String toString() { + if (!hasAnalyzedProperties()) { + return "empty"; + } + + StringBuilder sb = new StringBuilder(); + sb.append("type: ").append(type); + sb.append(", kafka partition offsets: ").append(kafkaPartitionOffsets); + sb.append(", custome properties: ").append(customKafkaProperties); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index ccfb541df885c7..a635ac8e0b2c53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -26,9 +26,9 @@ import org.apache.doris.backup.RestoreJob; import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSearchDesc; +import org.apache.doris.catalog.Resource; import org.apache.doris.cluster.BaseParam; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.io.Text; @@ -41,11 +41,12 @@ import org.apache.doris.load.ExportJob; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; -import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; +import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.master.Checkpoint; import org.apache.doris.mysql.privilege.UserPropertyInfo; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BackendIdsUpdateInfo; import org.apache.doris.persist.BackendTabletsInfo; @@ -575,6 +576,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_ALTER_ROUTINE_LOAD_JOB: { + data = AlterRoutineLoadJobOperationLog.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 8ef5f4656292e3..1903807dc452d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TKafkaRLTaskProgress; @@ -105,6 +106,21 @@ private void getReadableProgress(Map showPartitionIdToOffset) { } } + + // modify the partition offset of this progess. + // throw exception is the specified partition does not exist in progress. + public void modifyOffset(List> kafkaPartitionOffsets) throws DdlException { + for (Pair pair : kafkaPartitionOffsets) { + if (!partitionIdToOffset.containsKey(pair.first)) { + throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions"); + } + } + + for (Pair pair : kafkaPartitionOffsets) { + partitionIdToOffset.put(pair.first, pair.second); + } + } + @Override public String toString() { Map showPartitionIdToOffset = Maps.newHashMap(); @@ -147,4 +163,5 @@ public void readFields(DataInput in) throws IOException { partitionIdToOffset.put(in.readInt(), in.readLong()); } } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 4a95e5743e69b2..44def4cac03ff2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,7 +17,9 @@ package org.apache.doris.load.routineload; +import org.apache.doris.analysis.AlterRoutineLoadStmt; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.RoutineLoadDataSourceProperties; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; @@ -38,6 +40,7 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.SmallFileMgr.SmallFile; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.system.SystemInfoService; import org.apache.doris.transaction.TransactionStatus; @@ -86,7 +89,7 @@ public KafkaRoutineLoadJob() { } public KafkaRoutineLoadJob(Long id, String name, String clusterName, - long dbId, long tableId, String brokerList, String topic) { + long dbId, long tableId, String brokerList, String topic) { super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA); this.brokerList = brokerList; this.topic = topic; @@ -105,23 +108,27 @@ public Map getConvertedCustomProperties() { return convertedCustomProperties; } - public void resetConvertedCustomProperties() { - convertedCustomProperties.clear(); - } - @Override public void prepare() throws UserException { super.prepare(); // should reset converted properties each time the job being prepared. // because the file info can be changed anytime. - resetConvertedCustomProperties(); - convertCustomProperties(); + convertCustomProperties(true); } - private void convertCustomProperties() throws DdlException { - if (!convertedCustomProperties.isEmpty() || customProperties.isEmpty()) { + private void convertCustomProperties(boolean rebuild) throws DdlException { + if (customProperties.isEmpty()) { + return; + } + + if (!rebuild && !convertedCustomProperties.isEmpty()) { return; } + + if (rebuild) { + convertedCustomProperties.clear(); + } + SmallFileMgr smallFileMgr = Catalog.getCurrentCatalog().getSmallFileMgr(); for (Map.Entry entry : customProperties.entrySet()) { if (entry.getValue().startsWith("FILE:")) { @@ -184,10 +191,10 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { } LOG.debug("current concurrent task number is min" - + "(partition num: {}, desire task concurrent num: {}, alive be num: {}, config: {})", + + "(partition num: {}, desire task concurrent num: {}, alive be num: {}, config: {})", partitionNum, desireTaskConcurrentNum, aliveBeNum, Config.max_routine_load_task_concurrent_num); currentTaskConcurrentNum = Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), - Config.max_routine_load_task_concurrent_num); + Config.max_routine_load_task_concurrent_num); return currentTaskConcurrentNum; } @@ -202,20 +209,20 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated @Override protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionStatus txnStatus) { + TransactionStatus txnStatus) { if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) { // case 1 return false; } - + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) { // case 2 LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) - .add("job_id", id) - .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) - .add("progress_partition_offset_size", 0) - .add("msg", "commit attachment info is incorrect")); + .add("job_id", id) + .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) + .add("progress_partition_offset_size", 0) + .add("msg", "commit attachment info is incorrect")); return false; } return true; @@ -269,12 +276,12 @@ protected boolean unprotectNeedReschedule() throws UserException { newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) - .build(), e); + .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) + .build(), e); if (this.state == JobState.NEED_SCHEDULE) { unprotectUpdateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.PARTITIONS_ERR, - "Job failed to fetch all current partition with error " + e.getMessage()), + "Job failed to fetch all current partition with error " + e.getMessage()), false /* not replay */); } return false; @@ -284,9 +291,9 @@ protected boolean unprotectNeedReschedule() throws UserException { currentKafkaPartitions = newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) - .add("msg", "current kafka partitions has been change") - .build()); + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); } return true; } else { @@ -296,9 +303,9 @@ protected boolean unprotectNeedReschedule() throws UserException { currentKafkaPartitions = newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) - .add("msg", "current kafka partitions has been change") - .build()); + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); } return true; } @@ -328,7 +335,7 @@ protected String getStatistic() { } private List getAllKafkaPartitions() throws UserException { - convertCustomProperties(); + convertCustomProperties(false); return KafkaUtil.getAllKafkaPartitions(brokerList, topic, convertedCustomProperties); } @@ -406,9 +413,9 @@ private void updateNewPartitionProgress() { ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet)); if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("kafka_partition_id", kafkaPartition) - .add("begin_offset", beginOffSet) - .add("msg", "The new partition has been added in job")); + .add("kafka_partition_id", kafkaPartition) + .add("begin_offset", beginOffSet) + .add("msg", "The new partition has been added in job")); } } } @@ -437,6 +444,7 @@ private void setCustomKafkaPartitions(List> kafkaPartitionOf private void setCustomKafkaProperties(Map kafkaProperties) { this.customProperties = kafkaProperties; } + @Override protected String dataSourcePropertiesJsonToString() { Map dataSourceProperties = Maps.newHashMap(); @@ -484,7 +492,7 @@ public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_51) { int count = in.readInt(); - for (int i = 0 ;i < count ;i ++) { + for (int i = 0; i < count; i++) { String propertyKey = Text.readString(in); String propertyValue = Text.readString(in); if (propertyKey.startsWith("property.")) { @@ -493,4 +501,68 @@ public void readFields(DataInput in) throws IOException { } } } + + @Override + public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException { + Map jobProperties = stmt.getAnalyzedJobProperties(); + RoutineLoadDataSourceProperties dataSourceProperties = stmt.getDataSourceProperties(); + + writeLock(); + try { + if (getState() != JobState.PAUSED) { + throw new DdlException("Only supports modification of PAUSED jobs"); + } + + modifyPropertiesInternal(jobProperties, dataSourceProperties); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, + jobProperties, dataSourceProperties); + Catalog.getCurrentCatalog().getEditLog().logAlterRoutineLoadJob(log); + } finally { + writeUnlock(); + } + } + + private void modifyPropertiesInternal(Map jobProperties, + RoutineLoadDataSourceProperties dataSourceProperties) + throws DdlException { + + List> kafkaPartitionOffsets = Lists.newArrayList(); + Map customKafkaProperties = Maps.newHashMap(); + + if (dataSourceProperties.hasAnalyzedProperties()) { + kafkaPartitionOffsets = dataSourceProperties.getKafkaPartitionOffsets(); + customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); + } + + // modify partition offset first + if (!kafkaPartitionOffsets.isEmpty()) { + // we can only modify the partition that is being consumed + ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); + } + + if (!jobProperties.isEmpty()) { + Map copiedJobProperties = Maps.newHashMap(jobProperties); + modifyCommonJobProperties(copiedJobProperties); + this.jobProperties.putAll(copiedJobProperties); + } + + if (!customKafkaProperties.isEmpty()) { + this.customProperties.putAll(customKafkaProperties); + convertCustomProperties(true); + } + + LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", + this.id, jobProperties, dataSourceProperties); + } + + @Override + public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { + try { + modifyPropertiesInternal(log.getJobProperties(), log.getDataSourceProperties()); + } catch (DdlException e) { + // should not happen + LOG.error("failed to replay modify kafka routine load job: {}", id, e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index feaf50f171fcdc..fc322731d9b3dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import org.apache.doris.analysis.AlterRoutineLoadStmt; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.Expr; @@ -48,6 +49,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; @@ -1242,6 +1244,8 @@ private String jobPropertiesToJsonString() { jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes)); jobProperties.put("currentTaskConcurrentNum", String.valueOf(currentTaskConcurrentNum)); + jobProperties.put("desireTaskConcurrentNum", String.valueOf(desireTaskConcurrentNum)); + jobProperties.putAll(this.jobProperties); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(jobProperties); } @@ -1410,4 +1414,36 @@ public void readFields(DataInput in) throws IOException { throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e); } } + + abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException; + + abstract public void replayModifyProperties(AlterRoutineLoadJobOperationLog log); + + // for ALTER ROUTINE LOAD + protected void modifyCommonJobProperties(Map jobProperties) { + if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) { + this.desireTaskConcurrentNum = Integer.valueOf( + jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) { + this.maxErrorNum = Long.valueOf( + jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) { + this.maxBatchIntervalS = Long.valueOf( + jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) { + this.maxBatchRows = Long.valueOf( + jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)); + } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) { + this.maxBatchSizeBytes = Long.valueOf( + jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 925e31ef576cb6..f96f7c02b0ae83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -17,12 +17,14 @@ package org.apache.doris.load.routineload; +import org.apache.doris.analysis.AlterRoutineLoadStmt; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -35,9 +37,11 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -201,11 +205,11 @@ private boolean isNameUsed(Long dbId, String name) { return false; } - public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) - throws UserException { - RoutineLoadJob routineLoadJob = getJob(pauseRoutineLoadStmt.getDbFullName(), pauseRoutineLoadStmt.getName()); + private RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName) + throws MetaNotFoundException, DdlException, AnalysisException { + RoutineLoadJob routineLoadJob = getJob(dbName, jobName); if (routineLoadJob == null) { - throw new DdlException("There is not operable routine load job with name " + pauseRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + jobName); } // check auth String dbFullName; @@ -225,41 +229,27 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) ConnectContext.get().getRemoteIP(), tableName); } + return routineLoadJob; + } + + public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) + throws UserException { + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), + pauseRoutineLoadStmt.getName()); routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, - new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, - "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"), - false /* not replay */); - LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("current_state", routineLoadJob.getState()) - .add("user", ConnectContext.get().getQualifiedUser()) - .add("msg", "routine load job has been paused by user") - .build()); + new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, + "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"), + false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state", + routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg", + "routine load job has been paused by user").build()); } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException { - RoutineLoadJob routineLoadJob = getJob(resumeRoutineLoadStmt.getDbFullName(), resumeRoutineLoadStmt.getName()); - if (routineLoadJob == null) { - throw new DdlException("There is not operable routine load job with name " + resumeRoutineLoadStmt.getName() + "."); - } - // check auth - String dbFullName; - String tableName; - try { - dbFullName = routineLoadJob.getDbFullName(); - tableName = routineLoadJob.getTableName(); - } catch (MetaNotFoundException e) { - throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); - } - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - dbFullName, - tableName, - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - tableName); - } + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(), + resumeRoutineLoadStmt.getName()); + routineLoadJob.autoResumeCount = 0; routineLoadJob.firstResumeTimestamp = 0; routineLoadJob.autoResumeLock = false; @@ -273,31 +263,12 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws UserException { - RoutineLoadJob routineLoadJob = getJob(stopRoutineLoadStmt.getDbFullName(), stopRoutineLoadStmt.getName()); - if (routineLoadJob == null) { - throw new DdlException("There is not operable routine load job with name " + stopRoutineLoadStmt.getName()); - } - // check auth - String dbFullName; - String tableName; - try { - dbFullName = routineLoadJob.getDbFullName(); - tableName = routineLoadJob.getTableName(); - } catch (MetaNotFoundException e) { - throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); - } - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - dbFullName, - tableName, - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - tableName); - } + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(), + stopRoutineLoadStmt.getName()); routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, - new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), - false /* not replay */); + new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, + "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), + false /* not replay */); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) .add("current_state", routineLoadJob.getState()) .add("user", ConnectContext.get().getQualifiedUser()) @@ -602,6 +573,24 @@ public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { .add("msg", "replay change routine load job") .build()); } + + /** + * Enter of altering a routine load job + */ + public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException { + RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel()); + if (stmt.hasDataSourceProperty() + && !stmt.getDataSourceProperties().getType().equalsIgnoreCase(job.dataSourceType.name())) { + throw new DdlException("The specified job type is not: " + stmt.getDataSourceProperties().getType()); + } + job.modifyProperties(stmt); + } + + public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) { + RoutineLoadJob job = getJob(log.getJobId()); + Preconditions.checkNotNull(job, log.getJobId()); + job.replayModifyProperties(log); + } @Override public void write(DataOutput out) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java new file mode 100644 index 00000000000000..78b1d5f1c982a5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.analysis.RoutineLoadDataSourceProperties; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +public class AlterRoutineLoadJobOperationLog implements Writable { + + @SerializedName(value = "jobId") + private long jobId; + @SerializedName(value = "jobProperties") + private Map jobProperties; + @SerializedName(value = "dataSourceProperties") + private RoutineLoadDataSourceProperties dataSourceProperties; + + public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, + RoutineLoadDataSourceProperties dataSourceProperties) { + this.jobId = jobId; + this.jobProperties = jobProperties; + this.dataSourceProperties = dataSourceProperties; + } + + public long getJobId() { + return jobId; + } + + public Map getJobProperties() { + return jobProperties; + } + + public RoutineLoadDataSourceProperties getDataSourceProperties() { + return dataSourceProperties; + } + + public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index d7a8ecfb38b09f..c7830c8b74e76d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -784,6 +784,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { } break; } + case OperationType.OP_ALTER_ROUTINE_LOAD_JOB: { + AlterRoutineLoadJobOperationLog log = (AlterRoutineLoadJobOperationLog) journal.getData(); + catalog.getRoutineLoadManager().replayAlterRoutineLoadJob(log); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1346,4 +1351,8 @@ public void logSetReplicaStatus(SetReplicaStatusOperationLog log) { public void logRemoveExpiredAlterJobV2(RemoveAlterJobV2OperationLog log) { logEdit(OperationType.OP_REMOVE_ALTER_JOB_V2, log); } + + public void logAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) { + logEdit(OperationType.OP_ALTER_ROUTINE_LOAD_JOB, log); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 969160c02168cc..ce5f6327c452f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -142,6 +142,7 @@ public class OperationType { // routine load 110~120 public static final short OP_ROUTINE_LOAD_JOB = 110; + public static final short OP_ALTER_ROUTINE_LOAD_JOB = 111; // UDF 130-140 public static final short OP_ADD_FUNCTION = 130; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 8e9cbe2db83173..e455448cd40953 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -21,9 +21,9 @@ import org.apache.doris.alter.RollupJobV2; import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.DistributionInfo; -import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.RandomDistributionInfo; +import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.SparkResource; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; @@ -349,4 +349,4 @@ public T read(JsonReader reader) throws IOException { } } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 824e9f6a7a39b2..dfa0b715c03bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.AlterClusterStmt; import org.apache.doris.analysis.AlterDatabaseQuotaStmt; import org.apache.doris.analysis.AlterDatabaseRename; +import org.apache.doris.analysis.AlterRoutineLoadStmt; import org.apache.doris.analysis.AlterSystemStmt; import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.AlterViewStmt; @@ -73,7 +74,6 @@ import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.analysis.SyncStmt; import org.apache.doris.analysis.TruncateTableStmt; -import org.apache.doris.analysis.AlterViewStmt; import org.apache.doris.analysis.UninstallPluginStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.Config; @@ -146,7 +146,9 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { catalog.getRoutineLoadManager().resumeRoutineLoadJob((ResumeRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof StopRoutineLoadStmt) { catalog.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt); - } else if (ddlStmt instanceof DeleteStmt) { + } else if (ddlStmt instanceof AlterRoutineLoadStmt) { + catalog.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt); + } else if (ddlStmt instanceof DeleteStmt) { catalog.getDeleteHandler().process((DeleteStmt) ddlStmt); } else if (ddlStmt instanceof CreateUserStmt) { CreateUserStmt stmt = (CreateUserStmt) ddlStmt; diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java new file mode 100644 index 00000000000000..a1d71544838c76 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +import mockit.Expectations; +import mockit.Mocked; + +/* + * Author: Chenmingyu + * Date: Jul 20, 2020 + */ + +public class AlterRoutineLoadStmtTest { + + private Analyzer analyzer; + + @Mocked + private PaloAuth auth; + + @Before + public void setUp() { + analyzer = AccessTestUtil.fetchAdminAnalyzer(false); + + new Expectations() { + { + auth.checkGlobalPriv((ConnectContext) any, (PrivPredicate) any); + minTimes = 0; + result = true; + + auth.checkDbPriv((ConnectContext) any, anyString, (PrivPredicate) any); + minTimes = 0; + result = true; + + auth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); + minTimes = 0; + result = true; + } + }; + } + + @Test + public void testNormal() { + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, "200000"); + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.client.id", "101"); + dataSourceProperties.put("property.group.id", "mygroup"); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( + typeName, dataSourceProperties); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, routineLoadDataSourceProperties); + try { + stmt.analyze(analyzer); + } catch (UserException e) { + Assert.fail(); + } + + Assert.assertEquals(2, stmt.getAnalyzedJobProperties().size()); + Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)); + Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)); + Assert.assertTrue(stmt.hasDataSourceProperty()); + Assert.assertEquals(2, stmt.getDataSourceProperties().getCustomKafkaProperties().size()); + Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("group.id")); + Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("client.id")); + Assert.assertEquals(3, stmt.getDataSourceProperties().getKafkaPartitionOffsets().size()); + } + } + + @Test(expected = AnalysisException.class) + public void testNoPproperties() throws AnalysisException, UserException { + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + Maps.newHashMap(), new RoutineLoadDataSourceProperties()); + stmt.analyze(analyzer); + } + + @Test + public void testUnsupportedProperties() { + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.FORMAT, "csv"); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, new RoutineLoadDataSourceProperties()); + try { + stmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("format is invalid property")); + } catch (UserException e) { + Assert.fail(); + } + } + + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "new_topic"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( + typeName, dataSourceProperties); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, routineLoadDataSourceProperties); + + try { + stmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka custom property")); + } catch (UserException e) { + Assert.fail(); + } + } + + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( + typeName, dataSourceProperties); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, routineLoadDataSourceProperties); + try { + stmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("Partition and offset must be specified at the same time")); + } catch (UserException e) { + Assert.fail(); + } + } + + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( + typeName, dataSourceProperties); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, routineLoadDataSourceProperties); + try { + stmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number")); + } catch (UserException e) { + Assert.fail(); + } + } + + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000, 3000"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( + typeName, dataSourceProperties); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, routineLoadDataSourceProperties); + try { + stmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("Missing kafka partition info")); + } catch (UserException e) { + Assert.fail(); + } + } + + { + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, "200000"); + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.client.id", "101"); + dataSourceProperties.put("property.group.id", "mygroup"); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( + typeName, dataSourceProperties); + AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), + jobProperties, routineLoadDataSourceProperties); + try { + stmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("max_batch_size should between 100MB and 1GB")); + } catch (UserException e) { + Assert.fail(); + } + } + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 8469aca809eb2a..2515ba85ea9df5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -867,4 +867,47 @@ public void testReplayChangeRoutineLoadJob(@Injectable RoutineLoadOperation oper Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } + @Test + public void testAlterRoutineLoadJob(@Injectable StopRoutineLoadStmt stopRoutineLoadStmt, + @Mocked Catalog catalog, + @Mocked Database database, + @Mocked PaloAuth paloAuth, + @Mocked ConnectContext connectContext) throws UserException { + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + Map>> dbToNameToRoutineLoadJob = Maps.newHashMap(); + Map> nameToRoutineLoadJob = Maps.newHashMap(); + List routineLoadJobList = Lists.newArrayList(); + RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); + routineLoadJobList.add(routineLoadJob); + nameToRoutineLoadJob.put("", routineLoadJobList); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + + new Expectations() { + { + stopRoutineLoadStmt.getDbFullName(); + minTimes = 0; + result = ""; + stopRoutineLoadStmt.getName(); + minTimes = 0; + result = ""; + catalog.getDb(""); + minTimes = 0; + result = database; + database.getId(); + minTimes = 0; + result = 1L; + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); + minTimes = 0; + result = true; + } + }; + + routineLoadManager.stopRoutineLoadJob(stopRoutineLoadStmt); + + Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java new file mode 100644 index 00000000000000..3501ea8a823d60 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.RoutineLoadDataSourceProperties; +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +public class AlterRoutineLoadOperationLogTest { + private static String fileName = "./AlterRoutineLoadOperationLogTest"; + + @Test + public void testSerialzeAlterViewInfo() throws IOException, AnalysisException { + // 1. Write objects to file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + long jobId = 1000; + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); + + String typeName = "kafka"; + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1"); + dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000"); + dataSourceProperties.put("property.group.id", "mygroup"); + RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(typeName, + dataSourceProperties); + routineLoadDataSourceProperties.analyze(); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, + jobProperties, routineLoadDataSourceProperties); + log.write(out); + out.flush(); + out.close(); + + // 2. Read objects from file + DataInputStream in = new DataInputStream(new FileInputStream(file)); + + AlterRoutineLoadJobOperationLog log2 = AlterRoutineLoadJobOperationLog.read(in); + Assert.assertEquals(1, log2.getJobProperties().size()); + Assert.assertEquals("5", log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + Assert.assertEquals(1, log2.getDataSourceProperties().getCustomKafkaProperties().size()); + Assert.assertEquals("mygroup", log2.getDataSourceProperties().getCustomKafkaProperties().get("group.id")); + Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(0), + log2.getDataSourceProperties().getKafkaPartitionOffsets().get(0)); + Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(1), + log2.getDataSourceProperties().getKafkaPartitionOffsets().get(1)); + + in.close(); + } + + +} diff --git a/run-fe-ut.sh b/run-fe-ut.sh index 4aacbe327437e5..c93023b96535fb 100755 --- a/run-fe-ut.sh +++ b/run-fe-ut.sh @@ -97,9 +97,9 @@ else # eg: # sh run-fe-ut.sh --run org.apache.doris.utframe.Demo # sh run-fe-ut.sh --run org.apache.doris.utframe.Demo#testCreateDbAndTable+test2 - ${MVN_CMD} test -D test=$1 + ${MVN_CMD} test -DfailIfNoTests=false -D test=$1 else echo "Run Frontend UT" - ${MVN_CMD} test + ${MVN_CMD} test -DfailIfNoTests=false fi fi