Skip to content

Commit

Permalink
[feat](job)Implementing Job using antlr4
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Oct 6, 2024
1 parent c95eff5 commit 2f13bb9
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 33 deletions.
25 changes: 14 additions & 11 deletions fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ statementBase
| supportedCreateStatement #supportedCreateStatementAlias
| supportedAlterStatement #supportedAlterStatementAlias
| materializedViewStatement #materializedViewStatementAlias
| jobScheduleStatement #jobScheduleStatementAlias
| constraintStatement #constraintStatementAlias
| supportedDropStatement #supportedDropStatementAlias
| unsupportedStatement #unsupported
Expand Down Expand Up @@ -102,7 +103,17 @@ materializedViewStatement
| CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON mvName=multipartIdentifier #cancelMTMVTask
| SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier #showCreateMTMV
;

jobScheduleStatement
: CREATE JOB label=multipartIdentifier ON SCHEDULE
(
(EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
(STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
(ENDS endsTime=STRING_LITERAL)?)
|
(AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
commentSpec?
DO supportedDmlStatement #createScheduledJob
;
constraintStatement
: ALTER TABLE table=multipartIdentifier
ADD CONSTRAINT constraintName=errorCapturingIdentifier
Expand Down Expand Up @@ -413,16 +424,8 @@ unsupportedCleanStatement
;

unsupportedJobStatement
: CREATE JOB label=multipartIdentifier ON SCHEDULE
(
(EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
(STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
(ENDS endsTime=STRING_LITERAL)?)
|
(AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
commentSpec?
DO statement #createJob
| PAUSE JOB wildWhere? #pauseJob

: PAUSE JOB wildWhere? #pauseJob
| DROP JOB (IF EXISTS)? wildWhere? #dropJob
| RESUME JOB wildWhere? #resumeJob
| CANCEL TASK wildWhere? #cancelJobTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*/
@Deprecated
@Slf4j
public class CreateJobStmt extends DdlStmt implements NotFallbackInParser {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private long connectionAgeExpiredAt() {
if (!isMetaServiceEndpointList && connectionAgeBase > 1) {
long base = TimeUnit.MINUTES.toMillis(connectionAgeBase);
long now = System.currentTimeMillis();
long rand = random.nextLong(base);
long rand = random.nextLong() % base;
return now + base + rand;
}
return Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ public void cancelTaskById(long taskId) throws JobException {
@Override
public void cancelAllTasks() throws JobException {
try {
checkAuth("CANCEL LOAD");
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
checkAuth("CANCEL LOAD");
}
super.cancelAllTasks();
this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel");
} catch (DdlException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public void cancel() throws JobException {
executeCancelLogic();
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e);
throw new JobException(e);
} finally {
closeOrReleaseResources();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@
import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.Constraint;
import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
Expand Down Expand Up @@ -414,6 +415,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo;
Expand Down Expand Up @@ -565,6 +567,26 @@ public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) {
return withExplain(plan, ctx.explain());
}

@Override
public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) {
Optional<String> label = ctx.label == null ? Optional.empty() : Optional.of(ctx.label.getText());
Optional<String> atTime = ctx.atTime == null ? Optional.empty() : Optional.of(ctx.atTime.getText());
Optional<Boolean> immediateStartOptional = ctx.CURRENT_TIMESTAMP() == null ? Optional.of(false) :
Optional.of(true);
Optional<String> startTime = ctx.startTime == null ? Optional.empty() : Optional.of(ctx.startTime.getText());
Optional<String> endsTime = ctx.endsTime == null ? Optional.empty() : Optional.of(ctx.endsTime.getText());
Optional<Long> interval = ctx.timeInterval == null ? Optional.empty() :
Optional.of(Long.valueOf(ctx.timeInterval.getText()));
Optional<String> intervalUnit = ctx.timeUnit == null ? Optional.empty() : Optional.of(ctx.timeUnit.getText());
String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText();
String comment =
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1));
String executeSql = getOriginSql(ctx.supportedDmlStatement());
CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, interval, intervalUnit, startTime,
endsTime, immediateStartOptional, comment, executeSql);
return new CreateJobCommand(createJobInfo);
}

@Override
public LogicalPlan visitInsertTable(InsertTableContext ctx) {
boolean isOverwrite = ctx.INTO() == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public enum PlanType {
SELECT_INTO_OUTFILE_COMMAND,
UPDATE_COMMAND,
CREATE_MTMV_COMMAND,
CREATE_JOB_COMMAND,
ALTER_MTMV_COMMAND,
ADD_CONSTRAINT_COMMAND,
DROP_CONSTRAINT_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

/**
* syntax:
* CREATE
* [DEFINER = user]
* JOB
* event_name
* ON SCHEDULE schedule
* [COMMENT 'string']
* DO event_body;
* schedule: {
* [STREAMING] AT timestamp
* | EVERY interval
* [STARTS timestamp ]
* [ENDS timestamp ]
* }
* interval:
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*/
public class CreateJobCommand extends Command implements ForwardWithSync {

private CreateJobInfo createJobInfo;

public CreateJobCommand(CreateJobInfo jobInfo) {
super(PlanType.CREATE_JOB_COMMAND);
this.createJobInfo = jobInfo;
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx);
Env.getCurrentEnv().getJobManager().registerJob(job);
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCreateJobCommand(this, context);
}

@Override
public StmtType stmtType() {
return StmtType.CREATE;
}

}
Loading

0 comments on commit 2f13bb9

Please sign in to comment.