From 2f13bb930c7894e6561c1ccb289ddd3e23a33617 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 26 Sep 2024 18:21:04 +0800 Subject: [PATCH] [feat](job)Implementing Job using antlr4 --- .../org/apache/doris/nereids/DorisParser.g4 | 25 +- .../apache/doris/analysis/CreateJobStmt.java | 1 + .../doris/cloud/rpc/MetaServiceClient.java | 2 +- .../job/extensions/insert/InsertJob.java | 4 +- .../apache/doris/job/task/AbstractTask.java | 1 - .../nereids/parser/LogicalPlanBuilder.java | 22 ++ .../doris/nereids/trees/plans/PlanType.java | 1 + .../plans/commands/CreateJobCommand.java | 73 +++++ .../plans/commands/info/CreateJobInfo.java | 263 ++++++++++++++++++ .../insert/InsertIntoTableCommand.java | 14 +- .../trees/plans/visitor/CommandVisitor.java | 5 + .../suites/job_p0/test_base_insert_job.groovy | 59 ++-- 12 files changed, 437 insertions(+), 33 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 8eb6720048186b..c5f97ab771c6d6 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -50,6 +50,7 @@ statementBase | supportedCreateStatement #supportedCreateStatementAlias | supportedAlterStatement #supportedAlterStatementAlias | materializedViewStatement #materializedViewStatementAlias + | jobScheduleStatement #jobScheduleStatementAlias | constraintStatement #constraintStatementAlias | supportedDropStatement #supportedDropStatementAlias | unsupportedStatement #unsupported @@ -102,7 +103,17 @@ materializedViewStatement | CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON mvName=multipartIdentifier #cancelMTMVTask | SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier #showCreateMTMV ; - +jobScheduleStatement + : CREATE JOB label=multipartIdentifier ON SCHEDULE + ( + (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier + (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))? + (ENDS endsTime=STRING_LITERAL)?) + | + (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) + commentSpec? + DO supportedDmlStatement #createScheduledJob + ; constraintStatement : ALTER TABLE table=multipartIdentifier ADD CONSTRAINT constraintName=errorCapturingIdentifier @@ -413,16 +424,8 @@ unsupportedCleanStatement ; unsupportedJobStatement - : CREATE JOB label=multipartIdentifier ON SCHEDULE - ( - (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier - (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))? - (ENDS endsTime=STRING_LITERAL)?) - | - (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) - commentSpec? - DO statement #createJob - | PAUSE JOB wildWhere? #pauseJob + + : PAUSE JOB wildWhere? #pauseJob | DROP JOB (IF EXISTS)? wildWhere? #dropJob | RESUME JOB wildWhere? #resumeJob | CANCEL TASK wildWhere? #cancelJobTask diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 0fff1e097497ea..8babb665299a71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -60,6 +60,7 @@ * quantity { DAY | HOUR | MINUTE | * WEEK | SECOND } */ +@Deprecated @Slf4j public class CreateJobStmt extends DdlStmt implements NotFallbackInParser { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index 323b880a3a7a79..c4d28fb3bc256c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -79,7 +79,7 @@ private long connectionAgeExpiredAt() { if (!isMetaServiceEndpointList && connectionAgeBase > 1) { long base = TimeUnit.MINUTES.toMillis(connectionAgeBase); long now = System.currentTimeMillis(); - long rand = random.nextLong(base); + long rand = random.nextLong() % base; return now + base + rand; } return Long.MAX_VALUE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 43f43ba86997cf..487591efc04745 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -299,7 +299,9 @@ public void cancelTaskById(long taskId) throws JobException { @Override public void cancelAllTasks() throws JobException { try { - checkAuth("CANCEL LOAD"); + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + checkAuth("CANCEL LOAD"); + } super.cancelAllTasks(); this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index f78446aaf85cbf..a29878a97a3c1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -141,7 +141,6 @@ public void cancel() throws JobException { executeCancelLogic(); } catch (Exception e) { log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e); - throw new JobException(e); } finally { closeOrReleaseResources(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index d5a48f8e1129c6..094488acec2d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -376,6 +376,7 @@ import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.Constraint; +import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand; @@ -414,6 +415,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo; @@ -565,6 +567,26 @@ public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) { return withExplain(plan, ctx.explain()); } + @Override + public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) { + Optional label = ctx.label == null ? Optional.empty() : Optional.of(ctx.label.getText()); + Optional atTime = ctx.atTime == null ? Optional.empty() : Optional.of(ctx.atTime.getText()); + Optional immediateStartOptional = ctx.CURRENT_TIMESTAMP() == null ? Optional.of(false) : + Optional.of(true); + Optional startTime = ctx.startTime == null ? Optional.empty() : Optional.of(ctx.startTime.getText()); + Optional endsTime = ctx.endsTime == null ? Optional.empty() : Optional.of(ctx.endsTime.getText()); + Optional interval = ctx.timeInterval == null ? Optional.empty() : + Optional.of(Long.valueOf(ctx.timeInterval.getText())); + Optional intervalUnit = ctx.timeUnit == null ? Optional.empty() : Optional.of(ctx.timeUnit.getText()); + String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText(); + String comment = + LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); + String executeSql = getOriginSql(ctx.supportedDmlStatement()); + CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, interval, intervalUnit, startTime, + endsTime, immediateStartOptional, comment, executeSql); + return new CreateJobCommand(createJobInfo); + } + @Override public LogicalPlan visitInsertTable(InsertTableContext ctx) { boolean isOverwrite = ctx.INTO() == null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 42cdc0b7d9d267..73ea61fcface45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -143,6 +143,7 @@ public enum PlanType { SELECT_INTO_OUTFILE_COMMAND, UPDATE_COMMAND, CREATE_MTMV_COMMAND, + CREATE_JOB_COMMAND, ALTER_MTMV_COMMAND, ADD_CONSTRAINT_COMMAND, DROP_CONSTRAINT_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java new file mode 100644 index 00000000000000..fecd457ada56eb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.StmtType; +import org.apache.doris.catalog.Env; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +/** + * syntax: + * CREATE + * [DEFINER = user] + * JOB + * event_name + * ON SCHEDULE schedule + * [COMMENT 'string'] + * DO event_body; + * schedule: { + * [STREAMING] AT timestamp + * | EVERY interval + * [STARTS timestamp ] + * [ENDS timestamp ] + * } + * interval: + * quantity { DAY | HOUR | MINUTE | + * WEEK | SECOND } + */ +public class CreateJobCommand extends Command implements ForwardWithSync { + + private CreateJobInfo createJobInfo; + + public CreateJobCommand(CreateJobInfo jobInfo) { + super(PlanType.CREATE_JOB_COMMAND); + this.createJobInfo = jobInfo; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx); + Env.getCurrentEnv().getJobManager().registerJob(job); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCreateJobCommand(this, context); + } + + @Override + public StmtType stmtType() { + return StmtType.CREATE; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java new file mode 100644 index 00000000000000..6cef7ee89ec960 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.base.TimerDefinition; +import org.apache.doris.job.common.IntervalUnit; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +import java.util.Optional; + +/** + * Build job info and analyze the SQL statement to create a job. + */ +public class CreateJobInfo { + + // exclude job name prefix, which is used by inner job + private static final String excludeJobNamePrefix = "inner_"; + + private final Optional labelNameOptional; + + private final Optional onceJobStartTimestampOptional; + + private final Optional intervalOptional; + + private final Optional intervalTimeUnitOptional; + + private final Optional startsTimeStampOptional; + + private final Optional endsTimeStampOptional; + + private final Optional immediateStartOptional; + + private final String comment; + + private final String executeSql; + + /** + * Constructor for CreateJobInfo. + * + * @param labelNameOptional Job name. + * @param onceJobStartTimestampOptional Start time for a one-time job. + * @param intervalOptional Interval for a recurring job. + * @param intervalTimeUnitOptional Interval time unit for a recurring job. + * @param startsTimeStampOptional Start time for a recurring job. + * @param endsTimeStampOptional End time for a recurring job. + * @param immediateStartOptional Immediate start for a job. + * @param comment Comment for the job. + * @param executeSql Original SQL statement. + */ + public CreateJobInfo(Optional labelNameOptional, Optional onceJobStartTimestampOptional, + Optional intervalOptional, Optional intervalTimeUnitOptional, + Optional startsTimeStampOptional, Optional endsTimeStampOptional, + Optional immediateStartOptional, String comment, String executeSql) { + this.labelNameOptional = labelNameOptional; + this.onceJobStartTimestampOptional = onceJobStartTimestampOptional; + this.intervalOptional = intervalOptional; + this.intervalTimeUnitOptional = intervalTimeUnitOptional; + this.startsTimeStampOptional = startsTimeStampOptional; + this.endsTimeStampOptional = endsTimeStampOptional; + this.immediateStartOptional = immediateStartOptional; + this.comment = comment; + this.executeSql = executeSql; + + } + + /** + * Analyzes the provided SQL statement and builds the job information. + * + * @param ctx Connect context. + * @return AbstractJob instance. + * @throws UserException If there is an error during SQL analysis or job creation. + */ + public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserException { + checkAuth(); + if (labelNameOptional.orElseThrow(() -> new AnalysisException("labelName is null")).isEmpty()) { + throw new AnalysisException("Job name can not be empty"); + } + + String jobName = labelNameOptional.get(); + checkJobName(jobName); + String dbName = ctx.getDatabase(); + + Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + // check its insert stmt,currently only support insert stmt + JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); + JobExecuteType executeType = intervalOptional.isPresent() ? JobExecuteType.RECURRING : JobExecuteType.ONE_TIME; + jobExecutionConfiguration.setExecuteType(executeType); + TimerDefinition timerDefinition = new TimerDefinition(); + + if (executeType.equals(JobExecuteType.ONE_TIME)) { + buildOnceJob(timerDefinition, jobExecutionConfiguration); + } else { + buildRecurringJob(timerDefinition, jobExecutionConfiguration); + } + jobExecutionConfiguration.setTimerDefinition(timerDefinition); + return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration); + } + + /** + * Builds a TimerDefinition for a once-job. + * + * @param timerDefinition Timer definition to be built. + * @param jobExecutionConfiguration Job execution configuration. + * @throws AnalysisException If the job is not configured correctly. + */ + private void buildOnceJob(TimerDefinition timerDefinition, + JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException { + if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) { + jobExecutionConfiguration.setImmediate(true); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); + return; + } + + // Ensure start time is provided for once jobs. + String startTime = onceJobStartTimestampOptional.orElseThrow(() + -> new AnalysisException("Once time job must set start time")); + timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(startTime)); + } + + /** + * Builds a TimerDefinition for a recurring job. + * + * @param timerDefinition Timer definition to be built. + * @param jobExecutionConfiguration Job execution configuration. + * @throws AnalysisException If the job is not configured correctly. + */ + private void buildRecurringJob(TimerDefinition timerDefinition, + JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException { + // Ensure interval is provided for recurring jobs. + long interval = intervalOptional.orElseThrow(() + -> new AnalysisException("Interval must be set for recurring job")); + timerDefinition.setInterval(interval); + + // Ensure interval time unit is provided for recurring jobs. + String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(() + -> new AnalysisException("Interval time unit must be set for recurring job")); + IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase()); + if (intervalUnit == null) { + throw new AnalysisException("Invalid interval time unit: " + intervalTimeUnit); + } + + // Check if interval unit is second and disable if not in test mode. + if (intervalUnit.equals(IntervalUnit.SECOND) && !Config.enable_job_schedule_second_for_test) { + throw new AnalysisException("Interval time unit can not be second in production mode"); + } + + timerDefinition.setIntervalUnit(intervalUnit); + + // Set end time if provided. + endsTimeStampOptional.ifPresent(s -> timerDefinition.setEndTimeMs(stripQuotesAndParseTimestamp(s))); + + // Set immediate start if configured. + if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) { + jobExecutionConfiguration.setImmediate(true); + // Avoid immediate re-scheduling by setting start time slightly in the past. + timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100); + return; + } + // Set start time if provided. + startsTimeStampOptional.ifPresent(s -> timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s))); + } + + protected static void checkAuth() throws AnalysisException { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + /** + * Analyzes the provided SQL statement and creates an appropriate job based on the parsed logical plan. + * Currently, only "InsertIntoTableCommand" is supported for job creation. + * + * @param sql the SQL statement to be analyzed + * @param currentDbName the current database name where the SQL statement will be executed + * @param jobExecutionConfiguration the configuration for job execution + * @return an instance of AbstractJob corresponding to the SQL statement + * @throws UserException if there is an error during SQL analysis or job creation + */ + private AbstractJob analyzeAndCreateJob(String sql, String currentDbName, + JobExecutionConfiguration jobExecutionConfiguration) throws UserException { + NereidsParser parser = new NereidsParser(); + LogicalPlan logicalPlan = parser.parseSingle(sql); + if (logicalPlan instanceof InsertIntoTableCommand) { + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + try { + insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + return new InsertJob(labelNameOptional.get(), + JobStatus.RUNNING, + currentDbName, + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + sql); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } + } else { + throw new AnalysisException("Not support this sql : " + sql + " Command class is " + + logicalPlan.getClass().getName() + "."); + } + } + + private void checkJobName(String jobName) throws AnalysisException { + if (Strings.isNullOrEmpty(jobName)) { + throw new AnalysisException("job name can not be null"); + } + if (jobName.startsWith(excludeJobNamePrefix)) { + throw new AnalysisException("job name can not start with " + excludeJobNamePrefix); + } + } + + /** + * Strips quotes from the input string and parses it to a timestamp. + * + * @param str The input string potentially enclosed in single or double quotes. + * @return The parsed timestamp as a long value, or -1L if the input is null or empty. + */ + public static Long stripQuotesAndParseTimestamp(String str) { + if (str == null || str.isEmpty()) { + return -1L; + } + if (str.startsWith("'") && str.endsWith("'")) { + str = str.substring(1, str.length() - 1); + } else if (str.startsWith("\"") && str.endsWith("\"")) { + str = str.substring(1, str.length() - 1); + } + return TimeUtils.timeStringToLong(str.trim()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 74f75d2d7d5dd9..68718de0f86a5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -123,12 +123,20 @@ public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor, runInternal(ctx, executor); } + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { + return initPlan(ctx, executor, true); + } + /** * This function is used to generate the plan for Nereids. * There are some load functions that only need to the plan, such as stream_load. * Therefore, this section will be presented separately. + * @param needBeginTransaction whether to start a transaction. + * For external uses such as creating a job, only basic analysis is needed without starting a transaction, + * in which case this can be set to false. */ - public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor, + boolean needBeginTransaction) throws Exception { TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); // check auth if (!Env.getCurrentEnv().getAccessManager() @@ -220,6 +228,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor // TODO: support other table types throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table"); } + if (!needBeginTransaction) { + targetTableIf.readUnlock(); + return insertExecutor; + } if (!insertExecutor.isEmptyInsert()) { insertExecutor.beginTransaction(); insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalSink); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 0763e8fcbfd704..f35e6f8a6400b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -23,6 +23,7 @@ import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand; @@ -113,6 +114,10 @@ default R visitCreateMTMVCommand(CreateMTMVCommand createMTMVCommand, C context) return visitCommand(createMTMVCommand, context); } + default R visitCreateJobCommand(CreateJobCommand createJobCommand, C context) { + return visitCommand(createJobCommand, context); + } + default R visitAlterMTMVCommand(AlterMTMVCommand alterMTMVCommand, C context) { return visitCommand(alterMTMVCommand, context); } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index be744427d88d24..19f4422d64fb01 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -26,6 +26,9 @@ suite("test_base_insert_job") { def tableName = "t_test_BASE_inSert_job" def jobName = "insert_recovery_test_base_insert_job" def jobMixedName = "Insert_recovery_Test_base_insert_job" + sql """ + SET enable_fallback_to_original_planner=false; + """ sql """drop table if exists `${tableName}` force""" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' @@ -70,27 +73,47 @@ suite("test_base_insert_job") { ); """ sql """ - CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + insert into ${tableName} values + ('2023-03-18', 1, 1) + """ + sql """ + CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO INSERT INTO ${tableName} (`timestamp`, `type`, `user_id`) + WITH + tbl_timestamp AS ( + SELECT `timestamp` FROM ${tableName} WHERE user_id = 1 + ), + tbl_type AS ( + SELECT `type` FROM ${tableName} WHERE user_id = 1 + ), + tbl_user_id AS ( + SELECT `user_id` FROM ${tableName} WHERE user_id = 1 + ) + SELECT + tbl_timestamp.`timestamp`, + tbl_type.`type`, + tbl_user_id.`user_id` + FROM + tbl_timestamp, tbl_type, tbl_user_id; """ Awaitility.await().atMost(30, SECONDS).until( { def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='RECURRING' """ println(onceJob) - onceJob .size() == 1 && '1' <= onceJob.get(0).get(0) - + onceJob.size() == 1 && '1' <= onceJob.get(0).get(0) + } - ) + ) sql """ PAUSE JOB where jobname = '${jobName}' """ def tblDatas = sql """select * from ${tableName}""" println tblDatas - assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some times 3 records + assert tblDatas.size() >= 2 //at least 2 records def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'""" def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'""" println taskStatus for (int i = 0; i < taskStatus.size(); i++) { - assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) != "STOPPED"||taskStatus.get(i).get(0) != "STOPPED" + assert taskStatus.get(i).get(0) != "FAILED" || taskStatus.get(i).get(0) != "STOPPED" || taskStatus.get(i).get(0) != "STOPPED" } sql """ CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); @@ -126,11 +149,11 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 2, 1001); """ - Awaitility.await("create-one-time-job-test").atMost(30,SECONDS).until( - { - def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ - onceJob.size() == 1 && '1' == onceJob.get(0).get(0) - } + Awaitility.await("create-one-time-job-test").atMost(30, SECONDS).until( + { + def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ + onceJob.size() == 1 && '1' == onceJob.get(0).get(0) + } ) def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ assert onceJob.size() == 1 @@ -141,7 +164,7 @@ suite("test_base_insert_job") { assert datas.size() == 1 assert datas.get(0).get(0) == "FINISHED" // check table data - def dataCount1 = sql """select count(1) from ${tableName}""" + def dataCount1 = sql """select count(1) from ${tableName} where user_id=1001""" assert dataCount1.get(0).get(0) == 1 // check job status def oncejob = sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """ @@ -198,10 +221,10 @@ suite("test_base_insert_job") { println(tasks.size()) Awaitility.await("resume-job-test").atMost(60, SECONDS).until({ def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ - println "resume tasks :"+afterResumeTasks - afterResumeTasks.size() >tasks.size() + println "resume tasks :" + afterResumeTasks + afterResumeTasks.size() > tasks.size() }) - + // assert same job name try { sql """ @@ -216,7 +239,7 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1; """ } catch (Exception e) { - assert e.getMessage().contains("Not support this sql") + assert e.getMessage().contains("Not support this sql :") } // assert start time greater than current time try { @@ -245,7 +268,7 @@ suite("test_base_insert_job") { // assert end time less than start time try { sql """ - CREATE JOB test_error_starts ON SCHEDULE every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + CREATE JOB test_error_starts ON SCHEDULE every 1 second starts current_timestamp ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { assert e.getMessage().contains("endTimeMs must be greater than the start time") @@ -256,7 +279,7 @@ suite("test_base_insert_job") { CREATE JOB test_error_starts ON SCHEDULE every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert e.getMessage().contains("interval time unit can not be years") + assert e.getMessage().contains("Invalid interval time unit: years") } // test keyword as job name