diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 41f517b5427bf8..359e4312147973 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -65,6 +65,7 @@ statementBase | supportedAdminStatement #supportedAdminStatementAlias | supportedUseStatement #supportedUseStatementAlias | supportedOtherStatement #supportedOtherStatementAlias + | supportedKillStatement #supportedKillStatementAlias | supportedStatsStatement #supportedStatsStatementAlias | unsupportedStatement #unsupported ; @@ -72,7 +73,6 @@ statementBase unsupportedStatement : unsupportedUseStatement | unsupportedDmlStatement - | unsupportedKillStatement | unsupportedCreateStatement | unsupportedDropStatement | unsupportedStatsStatement @@ -359,7 +359,12 @@ supportedOtherStatement | UNLOCK TABLES #unlockTables ; -unsupportedOtherStatement +supportedKillStatement + : KILL (CONNECTION)? INTEGER_VALUE #killConnection + | KILL QUERY (INTEGER_VALUE | STRING_LITERAL) #killQuery + ; + +unsupportedOtherStatement : INSTALL PLUGIN FROM source=identifierOrText properties=propertyClause? #installPlugin | UNINSTALL PLUGIN name=identifierOrText #uninstallPlugin | LOCK TABLES (lockTable (COMMA lockTable)*)? #lockTables @@ -899,11 +904,6 @@ stageAndPattern (LEFT_PAREN pattern=STRING_LITERAL RIGHT_PAREN)? ; -unsupportedKillStatement - : KILL (CONNECTION)? INTEGER_VALUE #killConnection - | KILL QUERY (INTEGER_VALUE | STRING_LITERAL) #killQuery - ; - supportedDescribeStatement : explainCommand FUNCTION tvfName=identifier LEFT_PAREN (properties=propertyItemList)? RIGHT_PAREN tableAlias #describeTableValuedFunction diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 39a369db13bae2..1177f5f1ed2103 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -207,6 +207,7 @@ import org.apache.doris.nereids.DorisParser.IsnullContext; import org.apache.doris.nereids.DorisParser.JoinCriteriaContext; import org.apache.doris.nereids.DorisParser.JoinRelationContext; +import org.apache.doris.nereids.DorisParser.KillQueryContext; import org.apache.doris.nereids.DorisParser.LambdaExpressionContext; import org.apache.doris.nereids.DorisParser.LateralViewContext; import org.apache.doris.nereids.DorisParser.LessThanPartitionDefContext; @@ -598,6 +599,8 @@ import org.apache.doris.nereids.trees.plans.commands.ExportCommand; import org.apache.doris.nereids.trees.plans.commands.HelpCommand; import org.apache.doris.nereids.trees.plans.commands.KillAnalyzeJobCommand; +import org.apache.doris.nereids.trees.plans.commands.KillConnectionCommand; +import org.apache.doris.nereids.trees.plans.commands.KillQueryCommand; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; import org.apache.doris.nereids.trees.plans.commands.PauseJobCommand; import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand; @@ -5883,6 +5886,24 @@ public LogicalPlan visitShowConvertLsc(ShowConvertLscContext ctx) { return new ShowConvertLSCCommand(databaseName); } + @Override + public LogicalPlan visitKillQuery(KillQueryContext ctx) { + String queryId; + TerminalNode integerValue = ctx.INTEGER_VALUE(); + if (integerValue != null) { + queryId = integerValue.getText(); + } else { + queryId = stripQuotes(ctx.STRING_LITERAL().getText()); + } + return new KillQueryCommand(queryId); + } + + @Override + public LogicalPlan visitKillConnection(DorisParser.KillConnectionContext ctx) { + int connectionId = Integer.parseInt(ctx.INTEGER_VALUE().getText()); + return new KillConnectionCommand(connectionId); + } + @Override public Object visitAlterDatabaseSetQuota(AlterDatabaseSetQuotaContext ctx) { String databaseName = Optional.ofNullable(ctx.name) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index a005ffe4a450db..c850dfa26ef1ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -305,6 +305,8 @@ public enum PlanType { SWITCH_COMMAND, HELP_COMMAND, USE_COMMAND, + KILL_QUERY_COMMAND, + KILL_CONNECTION_COMMAND, DESCRIBE, DROP_TABLE_COMMAND, ANALYZE_DATABASE, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java new file mode 100644 index 00000000000000..48bb81a0d516ef --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.RedirectStatus; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +/** + * kill connection command + */ + +public class KillConnectionCommand extends KillCommand { + private static final Logger LOG = LogManager.getLogger(KillQueryCommand.class); + private final Integer connectionId; + + public KillConnectionCommand(Integer connectionId) { + super(PlanType.KILL_CONNECTION_COMMAND); + this.connectionId = connectionId; + } + + @Override + public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { + ConnectContext killCtx = ctx.getConnectScheduler().getContext(connectionId); + if (killCtx == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id); + } + + if (ctx == killCtx) { + // Suicide + ctx.setKilled(); + } else { + // Check auth + // Only user itself and user with admin priv can kill connection + if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) + && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), + PrivPredicate.ADMIN)) { + ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); + } + killCtx.kill(true); + } + ctx.getState().setOk(); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitKillConnectionCommand(this, context); + } + + @Override + public RedirectStatus toRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java new file mode 100644 index 00000000000000..f21a865623c532 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.RedirectStatus; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.Status; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +/** + * kill query command + */ + +public class KillQueryCommand extends KillCommand { + private static final Logger LOG = LogManager.getLogger(KillQueryCommand.class); + private final String queryId; + + public KillQueryCommand(String queryId) { + super(PlanType.KILL_QUERY_COMMAND); + this.queryId = queryId; + } + + @Override + public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { + ConnectContext killCtx = ctx.getConnectScheduler().getContextWithQueryId(queryId); + // when killCtx == null, this means the query not in FE, + // then we just send kill signal to BE + if (killCtx == null) { + TUniqueId tQueryId = null; + try { + tQueryId = DebugUtil.parseTUniqueIdFromString(queryId); + } catch (NumberFormatException e) { + throw new UserException(e.getMessage()); + } + + LOG.info("kill query {}", queryId); + Collection nodesToPublish = Env.getCurrentSystemInfo() + .getAllBackendsByAllCluster().values(); + for (Backend be : nodesToPublish) { + if (be.isAlive()) { + try { + Status cancelReason = new Status(TStatusCode.CANCELLED, "user kill query"); + BackendServiceProxy.getInstance() + .cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId, + cancelReason); + } catch (Throwable t) { + LOG.info("send kill query {} rpc to be {} failed", queryId, be); + } + } + } + } else if (ctx == killCtx) { + // Suicide + ctx.setKilled(); + } else { + // Check auth + // Only user itself and user with admin priv can kill connection + if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) + && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), + PrivPredicate.ADMIN)) { + ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); + } + killCtx.kill(false); + } + ctx.getState().setOk(); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitKillQueryCommand(this, context); + } + + @Override + public RedirectStatus toRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 0d8de1802be894..c7c038a4f8ce1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -91,6 +91,8 @@ import org.apache.doris.nereids.trees.plans.commands.ExportCommand; import org.apache.doris.nereids.trees.plans.commands.HelpCommand; import org.apache.doris.nereids.trees.plans.commands.KillAnalyzeJobCommand; +import org.apache.doris.nereids.trees.plans.commands.KillConnectionCommand; +import org.apache.doris.nereids.trees.plans.commands.KillQueryCommand; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; import org.apache.doris.nereids.trees.plans.commands.PauseJobCommand; import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand; @@ -836,6 +838,14 @@ default R visitAlterDatabaseRenameCommand(AlterDatabaseRenameCommand alterDataba return visitCommand(alterDatabaseRenameCommand, context); } + default R visitKillQueryCommand(KillQueryCommand killQueryCommand, C context) { + return visitCommand(killQueryCommand, context); + } + + default R visitKillConnectionCommand(KillConnectionCommand killConnectionCommand, C context) { + return visitCommand(killConnectionCommand, context); + } + default R visitAlterDatabaseSetQuotaCommand(AlterDatabaseSetQuotaCommand alterDatabaseSetQuotaCommand, C context) { return visitCommand(alterDatabaseSetQuotaCommand, context); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java new file mode 100644 index 00000000000000..6145e46bd5b5ab --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ConnectScheduler; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class KillConnectionCommandTest extends TestWithFeService { + private ConnectContext connectContext; + private Env env; + private AccessControllerManager accessControllerManager; + + private void runBefore() throws IOException { + connectContext = createDefaultCtx(); + env = Env.getCurrentEnv(); + accessControllerManager = env.getAccessManager(); + ConnectScheduler scheduler = new ConnectScheduler(10); + connectContext.setQualifiedUser("root"); + new Expectations() { + { + accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); + minTimes = 0; + result = true; + + scheduler.listConnection("root", anyBoolean); + minTimes = 0; + result = Lists.newArrayList(connectContext.toThreadInfo(false)); + } + }; + connectContext.setConnectScheduler(scheduler); + scheduler.registerConnection(connectContext); + } + + @Test + public void testKillConnection() throws Exception { + runBefore(); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, "select 1"); + stmtExecutor.execute(); + KillConnectionCommand command = new KillConnectionCommand(stmtExecutor.getContext().getConnectionId()); + Assertions.assertDoesNotThrow(() -> command.doRun(connectContext, stmtExecutor)); + Assertions.assertEquals(connectContext.getState().getStateType(), QueryState.MysqlStateType.OK); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommandTest.java new file mode 100644 index 00000000000000..c2a95e2ed21ac4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommandTest.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ConnectScheduler; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class KillQueryCommandTest extends TestWithFeService { + private ConnectContext connectContext; + private Env env; + private AccessControllerManager accessControllerManager; + + private void runBefore() throws IOException { + connectContext = createDefaultCtx(); + env = Env.getCurrentEnv(); + accessControllerManager = env.getAccessManager(); + ConnectScheduler scheduler = new ConnectScheduler(10); + connectContext.setQualifiedUser("root"); + new Expectations() { + { + accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); + minTimes = 0; + result = true; + + scheduler.listConnection("root", anyBoolean); + minTimes = 0; + result = Lists.newArrayList(connectContext.toThreadInfo(false)); + } + }; + connectContext.setConnectScheduler(scheduler); + } + + @Test + public void testKillQuery() throws Exception { + runBefore(); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, "select 1"); + stmtExecutor.execute(); + String queryId = DebugUtil.printId(stmtExecutor.getContext().queryId()); + KillQueryCommand command = new KillQueryCommand(queryId); + Assertions.assertDoesNotThrow(() -> command.doRun(connectContext, stmtExecutor)); + Assertions.assertEquals(connectContext.getState().getStateType(), QueryState.MysqlStateType.OK); + } +}