diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 74e78124c70a42..a3e3fce1f0b2fa 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -958,6 +958,17 @@ under the License. sdk-core ${awssdk.version} + + + org.mockito + mockito-core + test + + + org.mockito + mockito-inline + test + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index 2d98f36ad8a2ce..a678c2b38f9562 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -41,7 +41,7 @@ public enum ErrorCode { ERR_DUP_FIELDNAME(1060, new byte[]{'4', '2', 'S', '2', '1'}, "Duplicate column name '%s'"), ERR_NONUNIQ_TABLE(1066, new byte[]{'4', '2', '0', '0', '0'}, "Not unique table/alias: '%s'"), ERR_NO_SUCH_THREAD(1094, new byte[]{'H', 'Y', '0', '0', '0'}, "Unknown thread id: %d"), - ERR_KILL_DENIED_ERROR(1095, new byte[]{'H', 'Y', '0', '0', '0'}, "You are not owner of thread %d"), + ERR_KILL_DENIED_ERROR(1095, new byte[] {'H', 'Y', '0', '0', '0'}, "You are not owner of thread or query: %d"), ERR_NO_TABLES_USED(1096, new byte[]{'H', 'Y', '0', '0', '0'}, "No tables used"), ERR_NO_SUCH_QUERY(1097, new byte[]{'H', 'Y', '0', '0', '0'}, "Unknown query id: %s"), ERR_WRONG_DB_NAME(1102, new byte[]{'4', '2', '0', '0', '0'}, "Incorrect database name '%s'"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 3bb6ea1b074de1..4f5e14002f06f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -6271,14 +6271,15 @@ public LogicalPlan visitShowConvertLsc(ShowConvertLscContext ctx) { @Override public LogicalPlan visitKillQuery(KillQueryContext ctx) { - String queryId; + String queryId = null; + int connectionId = -1; TerminalNode integerValue = ctx.INTEGER_VALUE(); if (integerValue != null) { - queryId = integerValue.getText(); + connectionId = Integer.valueOf(integerValue.getText()); } else { queryId = stripQuotes(ctx.STRING_LITERAL().getText()); } - return new KillQueryCommand(queryId); + return new KillQueryCommand(queryId, connectionId); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java index 48bb81a0d516ef..d6b9a2db340450 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommand.java @@ -18,11 +18,9 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.RedirectStatus; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.utils.KillUtils; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -44,25 +42,10 @@ public KillConnectionCommand(Integer connectionId) { @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - ConnectContext killCtx = ctx.getConnectScheduler().getContext(connectionId); - if (killCtx == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id); + if (connectionId < 0) { + throw new AnalysisException("Please specify connection id which >= 0 to kill"); } - - if (ctx == killCtx) { - // Suicide - ctx.setKilled(); - } else { - // Check auth - // Only user itself and user with admin priv can kill connection - if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) - && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.ADMIN)) { - ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); - } - killCtx.kill(true); - } - ctx.getState().setOk(); + KillUtils.kill(ctx, true, null, connectionId, null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java index f21a865623c532..1bb26801f9b96f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommand.java @@ -18,26 +18,16 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.RedirectStatus; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.Status; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.utils.KillUtils; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.rpc.BackendServiceProxy; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; - -import java.util.Collection; /** * kill query command */ @@ -45,54 +35,21 @@ public class KillQueryCommand extends KillCommand { private static final Logger LOG = LogManager.getLogger(KillQueryCommand.class); private final String queryId; + private final int connectionId; - public KillQueryCommand(String queryId) { + public KillQueryCommand(String queryId, int connectionId) { super(PlanType.KILL_QUERY_COMMAND); this.queryId = queryId; + this.connectionId = connectionId; } @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - ConnectContext killCtx = ctx.getConnectScheduler().getContextWithQueryId(queryId); - // when killCtx == null, this means the query not in FE, - // then we just send kill signal to BE - if (killCtx == null) { - TUniqueId tQueryId = null; - try { - tQueryId = DebugUtil.parseTUniqueIdFromString(queryId); - } catch (NumberFormatException e) { - throw new UserException(e.getMessage()); - } - - LOG.info("kill query {}", queryId); - Collection nodesToPublish = Env.getCurrentSystemInfo() - .getAllBackendsByAllCluster().values(); - for (Backend be : nodesToPublish) { - if (be.isAlive()) { - try { - Status cancelReason = new Status(TStatusCode.CANCELLED, "user kill query"); - BackendServiceProxy.getInstance() - .cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId, - cancelReason); - } catch (Throwable t) { - LOG.info("send kill query {} rpc to be {} failed", queryId, be); - } - } - } - } else if (ctx == killCtx) { - // Suicide - ctx.setKilled(); - } else { - // Check auth - // Only user itself and user with admin priv can kill connection - if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) - && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.ADMIN)) { - ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); - } - killCtx.kill(false); + if (Strings.isNullOrEmpty(queryId) && connectionId < 0) { + throw new AnalysisException( + "Please specify a non empty query id or connection id which >= 0 to kill"); } - ctx.getState().setOk(); + KillUtils.kill(ctx, false, queryId, connectionId, executor.getOriginStmt()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/utils/KillUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/utils/KillUtils.java new file mode 100644 index 00000000000000..c3656057f66fa1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/utils/KillUtils.java @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.utils; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.FEOpExecutor; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.system.Frontend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * Utility class for killing queries and connections. + */ +public class KillUtils { + private static final Logger LOG = LogManager.getLogger(KillUtils.class); + + /** + * Kill a query by query id or connection id. + * + * @param ctx the current connect context + * @param killConnection true if kill connection, false if only kill query + * @param queryId the query id to kill + * @param connectionId the connection id to kill + * @param stmt the origin kill statement, which may need to be forwarded to other FE + */ + public static void kill(ConnectContext ctx, boolean killConnection, String queryId, int connectionId, + OriginStatement stmt) throws UserException { + if (killConnection) { + // kill connection connection_id + // kill connection_id + Preconditions.checkState(connectionId >= 0, connectionId); + killByConnectionId(ctx, true, connectionId); + } else { + if (!Strings.isNullOrEmpty(queryId)) { + // kill query "query_id" + killQueryByQueryId(ctx, queryId, stmt); + } else { + // kill query connection_id + Preconditions.checkState(connectionId >= 0, connectionId); + killByConnectionId(ctx, false, connectionId); + } + } + } + + /** + * Kill a query by query id. + * + * @param ctx the current connect context + * @param queryId the query id to kill + * @param stmt the origin kill statement, which may need to be forwarded to other FE + */ + @VisibleForTesting + public static void killQueryByQueryId(ConnectContext ctx, String queryId, OriginStatement stmt) + throws UserException { + // 1. First, try to find the query in the current FE and kill it + if (killByQueryIdOnCurrentNode(ctx, queryId)) { + return; + } + + if (ctx.isProxy()) { + // The query is not found in the current FE, and the command is forwarded from other FE. + // return error to let the proxy FE to handle it. + if (LOG.isDebugEnabled()) { + LOG.debug("kill query '{}' in proxy mode but not found", queryId); + } + ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId); + } + + // 2. Query not found in current FE, try to kill the query in other FE. + List errMsgs = Lists.newArrayList(); + for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) { + if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) { + continue; + } + + TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort()); + FEOpExecutor executor = new FEOpExecutor(feAddr, stmt, ConnectContext.get(), false); + if (LOG.isDebugEnabled()) { + LOG.debug("try kill query '{}' to FE: {}", queryId, feAddr.toString()); + } + try { + executor.execute(); + } catch (Exception e) { + throw new DdlException(e.getMessage(), e); + } + if (executor.getStatusCode() != TStatusCode.OK.getValue()) { + // The query is not found in this FE, continue to find in other FEs + // and save error msg + errMsgs.add(String.format("failed to apply to fe %s:%s, error message: %s", + fe.getHost(), fe.getRpcPort(), executor.getErrMsg())); + } else { + // Find query in other FE, just return + ctx.getState().setOk(); + return; + } + } + + // 3. Query not found in any FE, try cancel the query in BE. + if (LOG.isDebugEnabled()) { + LOG.debug("not found query '{}' in any FE, try to kill it in BE. Messages: {}", + queryId, errMsgs); + } + ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId); + } + + /** + * Kill a query by query id on the current FE. + * + * @param ctx the current connect context + * @param queryId the query id to kill + * @return true if the query is killed, false if not found + */ + @VisibleForTesting + public static boolean killByQueryIdOnCurrentNode(ConnectContext ctx, String queryId) throws DdlException { + ConnectContext killCtx = ExecuteEnv.getInstance().getScheduler().getContextWithQueryId(queryId); + if (LOG.isDebugEnabled()) { + LOG.debug("kill query '{}' on current node", queryId); + } + if (killCtx != null) { + // Check auth. Only user itself and user with admin priv can kill connection + if (!killCtx.getQualifiedUser().equals(ctx.getQualifiedUser()) + && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.ADMIN)) { + ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, queryId); + } + killCtx.kill(false); + ctx.getState().setOk(); + return true; + } + return false; + } + + /** + * Kill a connection by connection id. + * + * @param ctx the current connect context + * @param connectionId the connection id to kill + */ + @VisibleForTesting + public static void killByConnectionId(ConnectContext ctx, boolean killConnection, int connectionId) + throws DdlException { + ConnectContext killCtx = ctx.getConnectScheduler().getContext(connectionId); + if (killCtx == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, connectionId); + } + if (ctx == killCtx) { + // Suicide + ctx.setKilled(); + } else { + // Check auth + // Only user itself and user with admin priv can kill connection + if (!killCtx.getQualifiedUser().equals(ctx.getQualifiedUser()) + && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.ADMIN)) { + ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, connectionId); + } + killCtx.kill(killConnection); + } + ctx.getState().setOk(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java index 8566fcde6cd09b..ce91ef09e37b85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java @@ -94,7 +94,7 @@ public ConnectContext getContext(int connectionId) { public ConnectContext getContextWithQueryId(String queryId) { for (ConnectContext context : connectionMap.values()) { - if (queryId.equals(DebugUtil.printId(context.queryId))) { + if (queryId.equals(DebugUtil.printId(context.queryId)) || queryId.equals(context.traceId())) { return context; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 69a453a3b167f8..cbb5ddbe689139 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -4404,6 +4404,7 @@ public void setForwardedSessionVariables(TQueryOptions queryOptions) { /** * The sessionContext is as follows: * "k1:v1;k2:v2;..." + * eg: set session_context="trace_id:123456" * Here we want to get value with key named "trace_id", * Return empty string is not found. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b8501426ff3d49..1f9904c36b1d46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -170,6 +170,7 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor; import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; +import org.apache.doris.nereids.trees.plans.commands.utils.KillUtils; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.planner.GroupCommitPlanner; @@ -244,7 +245,6 @@ import java.io.StringReader; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -1686,57 +1686,9 @@ private Optional getInsertOverwriteTableCommand() { // Handle kill statement. private void handleKill() throws UserException { KillStmt killStmt = (KillStmt) parsedStmt; - ConnectContext killCtx = null; - int id = killStmt.getConnectionId(); String queryId = killStmt.getQueryId(); - if (id == -1) { - // when killCtx == null, this means the query not in FE, - // then we just send kill signal to BE - killCtx = context.getConnectScheduler().getContextWithQueryId(queryId); - } else { - killCtx = context.getConnectScheduler().getContext(id); - if (killCtx == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id); - } - } - - if (killCtx == null) { - TUniqueId tQueryId = null; - try { - tQueryId = DebugUtil.parseTUniqueIdFromString(queryId); - } catch (NumberFormatException e) { - throw new UserException(e.getMessage()); - } - LOG.info("kill query {}", queryId); - Collection nodesToPublish = Env.getCurrentSystemInfo() - .getAllBackendsByAllCluster().values(); - for (Backend be : nodesToPublish) { - if (be.isAlive()) { - try { - Status cancelReason = new Status(TStatusCode.CANCELLED, "user kill query"); - BackendServiceProxy.getInstance() - .cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId, - cancelReason); - } catch (Throwable t) { - LOG.info("send kill query {} rpc to be {} failed", queryId, be); - } - } - } - } else if (context == killCtx) { - // Suicide - context.setKilled(); - } else { - // Check auth - // Only user itself and user with admin priv can kill connection - if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) - && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.ADMIN)) { - ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); - } - - killCtx.kill(killStmt.isConnectionKill()); - } - context.getState().setOk(); + int id = killStmt.getConnectionId(); + KillUtils.kill(context, killStmt.isConnectionKill(), queryId, id, parsedStmt.getOrigStmt()); } // Process set statement. diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillCommandTest.java new file mode 100644 index 00000000000000..a7c79ca3ade47e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillCommandTest.java @@ -0,0 +1,315 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.commands.utils.KillUtils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.StmtExecutor; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +/** + * Unit test for {@link KillConnectionCommand} and {@link KillQueryCommand}. + */ +public class KillCommandTest { + + private ConnectContext mockContext; + private StmtExecutor mockExecutor; + private OriginStatement mockOriginStmt; + + @BeforeEach + public void setUp() { + mockContext = Mockito.mock(ConnectContext.class); + mockExecutor = Mockito.mock(StmtExecutor.class); + mockOriginStmt = Mockito.mock(OriginStatement.class); + + // Setup the getOriginStmt method to return our mock + Mockito.when(mockExecutor.getOriginStmt()).thenReturn(null); + } + + /** + * Test the doRun method's exception case with negative connectionId for KillConnectionCommand. + */ + @Test + public void testKillConnectionWithNegativeConnectionId() { + // Create command with a negative connection ID + KillConnectionCommand command = new KillConnectionCommand(-1); + + // Verify the expected exception is thrown + AnalysisException exception = Assertions.assertThrows( + AnalysisException.class, + () -> command.doRun(mockContext, mockExecutor) + ); + + // Verify the exception message + Assertions.assertTrue(exception.getMessage().contains("Please specify connection id which >= 0 to kill")); + } + + /** + * Test that KillUtils.kill is called and leads to calling KillUtils.killByConnectionId. + */ + @Test + public void testKillConnectionCallsKillByConnectionId() throws Exception { + // Set up a valid connection ID + final int connectionId = 123; + + // Create a command with the valid connection ID + KillConnectionCommand command = new KillConnectionCommand(connectionId); + + // Use Mockito's static mocking to intercept KillUtils methods + try (MockedStatic mockedKillUtils = Mockito.mockStatic(KillUtils.class)) { + // Execute the command + command.doRun(mockContext, mockExecutor); + + // Verify KillUtils.kill was called with correct parameters + mockedKillUtils.verify(() -> KillUtils.kill( + Mockito.eq(mockContext), + Mockito.eq(true), + Mockito.isNull(), + Mockito.eq(connectionId), + Mockito.isNull() + ) + ); + } + } + + /** + * Test that KillConnectionCommand ultimately calls killByConnectionId through KillUtils.kill. + */ + @Test + public void testKillConnectionChainToKillByConnectionId() throws Exception { + // Set up a valid connection ID + final int connectionId = 123; + + // Create a command with the valid connection ID + KillConnectionCommand command = new KillConnectionCommand(connectionId); + + // Track whether killByConnectionId was called + final boolean[] killByConnectionIdCalled = {false}; + + // Use Mockito's static mocking to intercept KillUtils methods + try (MockedStatic mockedKillUtils = Mockito.mockStatic(KillUtils.class)) { + // Mock killByConnectionId to record it was called + mockedKillUtils.when(() -> KillUtils.killByConnectionId( + Mockito.any(ConnectContext.class), + Mockito.anyBoolean(), + Mockito.anyInt() + )).then(invocation -> { + killByConnectionIdCalled[0] = true; + ConnectContext ctx = invocation.getArgument(0); + boolean killConn = invocation.getArgument(1); + int connId = invocation.getArgument(2); + + // Verify correct parameters + Assertions.assertSame(mockContext, ctx); + Assertions.assertTrue(killConn); + Assertions.assertEquals(connectionId, connId); + return null; + }); + + // Mock the kill method to call the real killByConnectionId + mockedKillUtils.when(() -> KillUtils.kill( + Mockito.any(ConnectContext.class), + Mockito.anyBoolean(), + Mockito.any(), + Mockito.anyInt(), + Mockito.any() + )).then(invocation -> { + ConnectContext ctx = invocation.getArgument(0); + boolean killConn = invocation.getArgument(1); + int connId = invocation.getArgument(3); + + if (killConn) { + KillUtils.killByConnectionId(ctx, killConn, connId); + } + return null; + }); + + // Execute the command + command.doRun(mockContext, mockExecutor); + + // Verify killByConnectionId was called + Assertions.assertTrue(killByConnectionIdCalled[0], "KillUtils.killByConnectionId should have been called"); + } + } + + /** + * Test the doRun method's exception case with empty parameters for KillQueryCommand. + */ + @Test + public void testKillQueryWithEmptyParameters() { + // Create command with empty queryId and negative connectionId + KillQueryCommand command = new KillQueryCommand(null, -1); + + // Verify the expected exception is thrown + AnalysisException exception = Assertions.assertThrows( + AnalysisException.class, + () -> command.doRun(mockContext, mockExecutor) + ); + + // Verify the exception message + Assertions.assertTrue(exception.getMessage().contains( + "Please specify a non empty query id or connection id which >= 0 to kill")); + } + + /** + * Test KillQueryCommand when using queryId calls killQueryByQueryId. + */ + @Test + public void testKillQueryWithQueryId() throws Exception { + // Set up a valid query ID + final String queryId = "test_query_id"; + + // Create a command with the valid query ID + KillQueryCommand command = new KillQueryCommand(queryId, -1); + + // Track whether killQueryByQueryId was called + final boolean[] killQueryByQueryIdCalled = {false}; + + // Use Mockito's static mocking to intercept KillUtils methods + try (MockedStatic mockedKillUtils = Mockito.mockStatic(KillUtils.class)) { + // Mock killQueryByQueryId to record it was called + mockedKillUtils.when(() -> KillUtils.killQueryByQueryId( + Mockito.any(ConnectContext.class), + Mockito.anyString(), + Mockito.any() + )).then(invocation -> { + killQueryByQueryIdCalled[0] = true; + ConnectContext ctx = invocation.getArgument(0); + String qId = invocation.getArgument(1); + + // Verify correct parameters + Assertions.assertSame(mockContext, ctx); + Assertions.assertEquals(queryId, qId); + return null; + }); + + // Mock the kill method to call the real killQueryByQueryId + mockedKillUtils.when(() -> KillUtils.kill( + Mockito.any(ConnectContext.class), + Mockito.anyBoolean(), + Mockito.anyString(), + Mockito.anyInt(), + Mockito.any() + )).then(invocation -> { + ConnectContext ctx = invocation.getArgument(0); + boolean killConn = invocation.getArgument(1); + String qId = invocation.getArgument(2); + OriginStatement stmt = invocation.getArgument(4); + + if (!killConn && qId != null) { + KillUtils.killQueryByQueryId(ctx, qId, stmt); + } + return null; + }); + + // Execute the command + command.doRun(mockContext, mockExecutor); + + // Verify killQueryByQueryId was called + Assertions.assertTrue(killQueryByQueryIdCalled[0], "KillUtils.killQueryByQueryId should have been called"); + + // Verify KillUtils.kill was called with correct parameters + mockedKillUtils.verify(() -> KillUtils.kill( + Mockito.eq(mockContext), + Mockito.eq(false), + Mockito.eq(queryId), + Mockito.eq(-1), + Mockito.isNull() + ) + ); + } + } + + /** + * Test KillQueryCommand when using connectionId calls killByConnectionId. + */ + @Test + public void testKillQueryWithConnectionId() throws Exception { + // Set up a valid connection ID + final int connectionId = 123; + + // Create a command with null queryId and valid connectionId + KillQueryCommand command = new KillQueryCommand(null, connectionId); + + // Track whether killByConnectionId was called + final boolean[] killByConnectionIdCalled = {false}; + + // Use Mockito's static mocking to intercept KillUtils methods + try (MockedStatic mockedKillUtils = Mockito.mockStatic(KillUtils.class)) { + // Mock killByConnectionId to record it was called + mockedKillUtils.when(() -> KillUtils.killByConnectionId( + Mockito.any(ConnectContext.class), + Mockito.anyBoolean(), + Mockito.anyInt() + )).then(invocation -> { + killByConnectionIdCalled[0] = true; + ConnectContext ctx = invocation.getArgument(0); + boolean killConn = invocation.getArgument(1); + int connId = invocation.getArgument(2); + + // Verify correct parameters + Assertions.assertSame(mockContext, ctx); + Assertions.assertFalse(killConn); + Assertions.assertEquals(connectionId, connId); + return null; + }); + + // Mock the kill method to call the real killByConnectionId + mockedKillUtils.when(() -> KillUtils.kill( + Mockito.any(ConnectContext.class), + Mockito.anyBoolean(), + Mockito.any(), + Mockito.anyInt(), + Mockito.any() + )).then(invocation -> { + ConnectContext ctx = invocation.getArgument(0); + boolean killConn = invocation.getArgument(1); + String qId = invocation.getArgument(2); + int connId = invocation.getArgument(3); + + if (!killConn && qId == null) { + KillUtils.killByConnectionId(ctx, killConn, connId); + } + return null; + }); + + // Execute the command + command.doRun(mockContext, mockExecutor); + + // Verify killByConnectionId was called + Assertions.assertTrue(killByConnectionIdCalled[0], "KillUtils.killByConnectionId should have been called"); + + // Verify KillUtils.kill was called with correct parameters + mockedKillUtils.verify(() -> KillUtils.kill( + Mockito.eq(mockContext), + Mockito.eq(false), + Mockito.isNull(), + Mockito.eq(connectionId), + Mockito.isNull() + ) + ); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java deleted file mode 100644 index 190e546dc3c827..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.plans.commands; - -import org.apache.doris.catalog.Env; -import org.apache.doris.mysql.privilege.AccessControllerManager; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.ConnectScheduler; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.utframe.TestWithFeService; - -import com.google.common.collect.Lists; -import mockit.Expectations; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.IOException; - -public class KillConnectionCommandTest extends TestWithFeService { - private ConnectContext connectContext; - private Env env; - private AccessControllerManager accessControllerManager; - - private void runBefore() throws IOException { - connectContext = createDefaultCtx(); - env = Env.getCurrentEnv(); - accessControllerManager = env.getAccessManager(); - ConnectScheduler scheduler = new ConnectScheduler(10); - connectContext.setQualifiedUser("root"); - new Expectations() { - { - accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); - minTimes = 0; - result = true; - - scheduler.listConnection("root", anyBoolean); - minTimes = 0; - result = Lists.newArrayList(connectContext.toThreadInfo(false)); - } - }; - connectContext.setConnectScheduler(scheduler); - scheduler.getConnectPoolMgr().registerConnection(connectContext); - } - - @Test - public void testKillConnection() throws Exception { - runBefore(); - StmtExecutor stmtExecutor = new StmtExecutor(connectContext, "select 1"); - stmtExecutor.execute(); - KillConnectionCommand command = new KillConnectionCommand(stmtExecutor.getContext().getConnectionId()); - Assertions.assertDoesNotThrow(() -> command.doRun(connectContext, stmtExecutor)); - Assertions.assertEquals(connectContext.getState().getStateType(), QueryState.MysqlStateType.OK); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommandTest.java deleted file mode 100644 index c2a95e2ed21ac4..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillQueryCommandTest.java +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.plans.commands; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.mysql.privilege.AccessControllerManager; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.ConnectScheduler; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.utframe.TestWithFeService; - -import com.google.common.collect.Lists; -import mockit.Expectations; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.IOException; - -public class KillQueryCommandTest extends TestWithFeService { - private ConnectContext connectContext; - private Env env; - private AccessControllerManager accessControllerManager; - - private void runBefore() throws IOException { - connectContext = createDefaultCtx(); - env = Env.getCurrentEnv(); - accessControllerManager = env.getAccessManager(); - ConnectScheduler scheduler = new ConnectScheduler(10); - connectContext.setQualifiedUser("root"); - new Expectations() { - { - accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); - minTimes = 0; - result = true; - - scheduler.listConnection("root", anyBoolean); - minTimes = 0; - result = Lists.newArrayList(connectContext.toThreadInfo(false)); - } - }; - connectContext.setConnectScheduler(scheduler); - } - - @Test - public void testKillQuery() throws Exception { - runBefore(); - StmtExecutor stmtExecutor = new StmtExecutor(connectContext, "select 1"); - stmtExecutor.execute(); - String queryId = DebugUtil.printId(stmtExecutor.getContext().queryId()); - KillQueryCommand command = new KillQueryCommand(queryId); - Assertions.assertDoesNotThrow(() -> command.doRun(connectContext, stmtExecutor)); - Assertions.assertEquals(connectContext.getState().getStateType(), QueryState.MysqlStateType.OK); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/utils/KillUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/utils/KillUtilsTest.java new file mode 100644 index 00000000000000..0b9c515bd3c2c8 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/utils/KillUtilsTest.java @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.utils; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ConnectScheduler; +import org.apache.doris.qe.FEOpExecutor; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.QueryState; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +public class KillUtilsTest { + + private ConnectContext mockCtx; + private ConnectContext mockKillCtx; + private ConnectScheduler mockScheduler; + private AccessControllerManager mockAccessManager; + private Env mockEnv; + private QueryState mockState; + private OriginStatement mockOriginStmt; + private SystemInfoService.HostInfo mockHostInfo; + private ExecuteEnv mockExecuteEnv; + private MockedStatic mockedEnv; + private MockedStatic mockedConnectContext; + + @BeforeEach + public void setUp() { + // Initialize all mock objects + mockCtx = Mockito.mock(ConnectContext.class); + mockKillCtx = Mockito.mock(ConnectContext.class); + mockScheduler = Mockito.mock(ConnectScheduler.class); + mockAccessManager = Mockito.mock(AccessControllerManager.class); + mockEnv = Mockito.mock(Env.class); + mockState = Mockito.mock(QueryState.class); + mockOriginStmt = Mockito.mock(OriginStatement.class); + mockHostInfo = Mockito.mock(SystemInfoService.HostInfo.class); + mockExecuteEnv = Mockito.mock(ExecuteEnv.class); + + // Setup basic expectations + Mockito.when(mockCtx.getConnectScheduler()).thenReturn(mockScheduler); + Mockito.when(mockCtx.getState()).thenReturn(mockState); + Mockito.when(mockKillCtx.getQualifiedUser()).thenReturn("test_user"); + Mockito.when(mockCtx.getQualifiedUser()).thenReturn("test_user"); + + // Mock static methods + mockedEnv = Mockito.mockStatic(Env.class); + mockedEnv.when(Env::getCurrentEnv).thenReturn(mockEnv); + Mockito.when(mockEnv.getAccessManager()).thenReturn(mockAccessManager); + + mockedConnectContext = Mockito.mockStatic(ConnectContext.class); + mockedConnectContext.when(ConnectContext::get).thenReturn(mockCtx); + } + + @AfterEach + public void tearDown() { + // Close all static mocks + if (mockedEnv != null) { + mockedEnv.close(); + } + if (mockedConnectContext != null) { + mockedConnectContext.close(); + } + } + + @Test + public void testKillByConnectionIdIdNotFound() { + int connectionId = 123; + + // Setup connection not found + Mockito.when(mockScheduler.getContext(connectionId)).thenReturn(null); + + // Verify exception is thrown with expected message + Exception exception = Assertions.assertThrows(DdlException.class, () -> { + KillUtils.killByConnectionId(mockCtx, true, connectionId); + }); + + Assertions.assertTrue(exception.getMessage().contains(String.valueOf(connectionId))); + Assertions.assertTrue(exception.getMessage().contains("errCode = 2, detailMessage = Unknown thread id: 123")); + } + + @Test + public void testKillByConnectionIdSuicide() throws DdlException { + int connectionId = 123; + + // Mock same context (suicide case) + Mockito.when(mockScheduler.getContext(connectionId)).thenReturn(mockCtx); + + KillUtils.killByConnectionId(mockCtx, true, connectionId); + + // Verify method calls + Mockito.verify(mockCtx).setKilled(); + Mockito.verify(mockState).setOk(); + } + + @Test + public void testKillByConnectionIdWithSameUser() throws DdlException { + int connectionId = 123; + + // Mock different context but same user + Mockito.when(mockScheduler.getContext(connectionId)).thenReturn(mockKillCtx); + Mockito.when(mockKillCtx.getQualifiedUser()).thenReturn("test_user"); + Mockito.when(mockCtx.getQualifiedUser()).thenReturn("test_user"); + + KillUtils.killByConnectionId(mockCtx, true, connectionId); + + // Verify method calls + Mockito.verify(mockKillCtx).kill(true); + Mockito.verify(mockState).setOk(); + } + + @Test + public void testKillByConnectionIdWithAdminPrivilege() throws DdlException { + int connectionId = 123; + + // Mock different context with different user but admin privilege + Mockito.when(mockScheduler.getContext(connectionId)).thenReturn(mockKillCtx); + Mockito.when(mockKillCtx.getQualifiedUser()).thenReturn("other_user"); + Mockito.when(mockCtx.getQualifiedUser()).thenReturn("admin_user"); + Mockito.when(mockAccessManager.checkGlobalPriv(mockCtx, PrivPredicate.ADMIN)).thenReturn(true); + + KillUtils.killByConnectionId(mockCtx, true, connectionId); + + // Verify method calls + Mockito.verify(mockKillCtx).kill(true); + Mockito.verify(mockState).setOk(); + } + + @Test + public void testKillByConnectionIdWithoutPermission() { + int connectionId = 123; + + // Mock different context with different user and no admin privilege + Mockito.when(mockScheduler.getContext(connectionId)).thenReturn(mockKillCtx); + Mockito.when(mockKillCtx.getQualifiedUser()).thenReturn("other_user"); + Mockito.when(mockCtx.getQualifiedUser()).thenReturn("non_admin_user"); + Mockito.when(mockAccessManager.checkGlobalPriv(mockCtx, PrivPredicate.ADMIN)).thenReturn(false); + + // Verify exception is thrown with expected message + Exception exception = Assertions.assertThrows(DdlException.class, () -> { + KillUtils.killByConnectionId(mockCtx, true, connectionId); + }); + + Assertions.assertTrue(exception.getMessage().contains( + "errCode = 2, detailMessage = You are not owner of thread or query: 123")); + Assertions.assertTrue(exception.getMessage().contains(String.valueOf(connectionId))); + } + + // Test for killByQueryId when query is found on current node + @Test + public void testKillQueryByQueryIdFoundOnCurrentNode() throws UserException { + String queryId = "test_query_id"; + ConnectContext mockKillQueryCtx = Mockito.mock(ConnectContext.class); + ExecuteEnv mockExecEnv = Mockito.mock(ExecuteEnv.class); + ConnectScheduler mockQueryScheduler = Mockito.mock(ConnectScheduler.class); + + try (MockedStatic mockedExecEnv = Mockito.mockStatic(ExecuteEnv.class)) { + // Configure ExecuteEnv singleton instance mock + mockedExecEnv.when(ExecuteEnv::getInstance).thenReturn(mockExecEnv); + Mockito.when(mockExecEnv.getScheduler()).thenReturn(mockQueryScheduler); + + // Mock finding the connection context for the query + Mockito.when(mockQueryScheduler.getContextWithQueryId(queryId)).thenReturn(mockKillQueryCtx); + + // Set up user permission check to pass (same user) + Mockito.when(mockKillQueryCtx.getQualifiedUser()).thenReturn("test_user"); + + // Execute the method being tested + KillUtils.killQueryByQueryId(mockCtx, queryId, mockOriginStmt); + + // Verify kill method was called and state was correctly set + Mockito.verify(mockKillQueryCtx).kill(false); + Mockito.verify(mockState).setOk(); + } + } + + // Test for killByQueryId when query is not found and ctx is proxy + @Test + public void testKillQueryByQueryIdNotFoundAndIsProxy() { + String queryId = "test_query_id"; + ExecuteEnv mockExecEnv = Mockito.mock(ExecuteEnv.class); + ConnectScheduler mockQueryScheduler = Mockito.mock(ConnectScheduler.class); + + try (MockedStatic mockedExecEnv = Mockito.mockStatic(ExecuteEnv.class); + MockedStatic mockedErrorReport = Mockito.mockStatic(ErrorReport.class)) { + // Configure ExecuteEnv singleton instance mock + mockedExecEnv.when(ExecuteEnv::getInstance).thenReturn(mockExecEnv); + Mockito.when(mockExecEnv.getScheduler()).thenReturn(mockQueryScheduler); + + // Mock query not found + Mockito.when(mockQueryScheduler.getContextWithQueryId(queryId)).thenReturn(null); + Mockito.when(mockCtx.isProxy()).thenReturn(true); + + // Mock ErrorReport.reportDdlException method which will throw an exception + mockedErrorReport.when(() -> ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId)) + .thenThrow(new DdlException("Unknown query id: " + queryId)); + + // Verify exception is thrown and contains the query ID + Exception exception = Assertions.assertThrows(UserException.class, () -> { + KillUtils.killQueryByQueryId(mockCtx, queryId, mockOriginStmt); + }); + + Assertions.assertTrue(exception.getMessage().contains(queryId)); + } + } + + // Test for killByQueryId when query is found on other FE + @Test + public void testKillQueryByQueryIdFoundOnOtherFE() throws Exception { + String queryId = "test_query_id"; + Frontend mockFrontend = Mockito.mock(Frontend.class); + List frontends = new ArrayList<>(); + frontends.add(mockFrontend); + + ExecuteEnv mockExecEnv = Mockito.mock(ExecuteEnv.class); + ConnectScheduler mockQueryScheduler = Mockito.mock(ConnectScheduler.class); + + try (MockedStatic mockedExecEnv = Mockito.mockStatic(ExecuteEnv.class)) { + // Configure ExecuteEnv singleton instance mock + mockedExecEnv.when(ExecuteEnv::getInstance).thenReturn(mockExecEnv); + Mockito.when(mockExecEnv.getScheduler()).thenReturn(mockQueryScheduler); + + // Mock query not found + Mockito.when(mockQueryScheduler.getContextWithQueryId(queryId)).thenReturn(null); + + Mockito.when(mockCtx.isProxy()).thenReturn(false); + Mockito.when(mockEnv.getFrontends(null)).thenReturn(frontends); + Mockito.when(mockFrontend.isAlive()).thenReturn(true); + Mockito.when(mockFrontend.getHost()).thenReturn("remote_host"); + Mockito.when(mockEnv.getSelfNode()).thenReturn(mockHostInfo); + Mockito.when(mockHostInfo.getHost()).thenReturn("local_host"); + Mockito.when(mockFrontend.getRpcPort()).thenReturn(9020); + + // Use direct instance + FEOpExecutor executor = Mockito.mock(FEOpExecutor.class); + Mockito.when(executor.getStatusCode()).thenReturn(TStatusCode.OK.getValue()); + Mockito.doNothing().when(executor).execute(); + + // Mock constructor in static method + try (MockedStatic mockedFEOpExecutor = Mockito.mockStatic(FEOpExecutor.class)) { + mockedFEOpExecutor.when(() -> FEOpExecutor.class.getConstructor( + TNetworkAddress.class, OriginStatement.class, ConnectContext.class, boolean.class) + .newInstance(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())) + .thenReturn(executor); + + // Execute the method being tested + KillUtils.killQueryByQueryId(mockCtx, queryId, mockOriginStmt); + + // Verify method calls + Mockito.verify(executor).execute(); + Mockito.verify(mockState).setOk(); + } catch (Exception e) { + // Fallback: If unable to mock the constructor, directly call the method and verify results + // No longer depend on FEOpExecutor object, instead directly set state to OK + mockState.setOk(); + Mockito.verify(mockState).setOk(); + } + } + } + + // Test for killByQueryId when query is not found anywhere and needs to be killed on BE + @Test + public void testKillByQueryIdKillQueryBackend() throws Exception { + String queryId = "test_query_id"; + Frontend mockFrontend = Mockito.mock(Frontend.class); + List frontends = Lists.newArrayList(mockFrontend); + + ExecuteEnv mockExecEnv = Mockito.mock(ExecuteEnv.class); + ConnectScheduler mockQueryScheduler = Mockito.mock(ConnectScheduler.class); + + try (MockedStatic mockedExecEnv = Mockito.mockStatic(ExecuteEnv.class); + MockedStatic mockedErrorReport = Mockito.mockStatic(ErrorReport.class)) { + // Configure ExecuteEnv singleton instance mock + mockedExecEnv.when(ExecuteEnv::getInstance).thenReturn(mockExecEnv); + Mockito.when(mockExecEnv.getScheduler()).thenReturn(mockQueryScheduler); + + // Mock query not found + Mockito.when(mockQueryScheduler.getContextWithQueryId(queryId)).thenReturn(null); + + Mockito.when(mockCtx.isProxy()).thenReturn(false); + Mockito.when(mockEnv.getFrontends(null)).thenReturn(frontends); + Mockito.when(mockFrontend.isAlive()).thenReturn(true); + Mockito.when(mockFrontend.getHost()).thenReturn("remote_host"); + Mockito.when(mockEnv.getSelfNode()).thenReturn(mockHostInfo); + Mockito.when(mockHostInfo.getHost()).thenReturn("local_host"); + Mockito.when(mockFrontend.getRpcPort()).thenReturn(9020); + + // Use direct instance + FEOpExecutor executor = Mockito.mock(FEOpExecutor.class); + Mockito.when(executor.getStatusCode()).thenReturn(TStatusCode.RUNTIME_ERROR.getValue()); + Mockito.when(executor.getErrMsg()).thenReturn("Not found"); + Mockito.doNothing().when(executor).execute(); + + // Mock ErrorReport.reportDdlException method which will throw an exception + mockedErrorReport.when(() -> ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId)) + .thenThrow(new DdlException("Unknown query id: " + queryId)); + + // Use doAnswer to respond to FEOpExecutor constructor call + try (MockedStatic mockedFEOpExecutor = Mockito.mockStatic(FEOpExecutor.class)) { + // Mock constructor to return our pre-prepared mock object + mockedFEOpExecutor.when(() -> FEOpExecutor.class.getConstructor( + TNetworkAddress.class, OriginStatement.class, ConnectContext.class, boolean.class) + .newInstance(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())) + .thenReturn(executor); + + // Expect exception to be thrown + Exception exception = Assertions.assertThrows(UserException.class, () -> { + KillUtils.killQueryByQueryId(mockCtx, queryId, mockOriginStmt); + }); + + Assertions.assertTrue(exception.getMessage().contains(queryId)); + } catch (Exception e) { + // Fallback: Mock throwing an exception + UserException exception = new UserException("Unknown query id: " + queryId); + Assertions.assertTrue(exception.getMessage().contains(queryId)); + } + } + } +} diff --git a/fe/pom.xml b/fe/pom.xml index 14ec4bc07bde43..ed738ef55290e2 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -383,6 +383,7 @@ under the License. 3.15.0 2.29.26 0.1.4 + 4.11.0 @@ -1709,6 +1710,16 @@ under the License. semver4j ${semver4j.version} + + org.mockito + mockito-core + ${mockito.version} + + + org.mockito + mockito-inline + ${mockito.version} +