Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ void RuntimeQueryStatisticsMgr::register_fragment_profile(
_load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile_x;
}

LOG_INFO("register x profile done {}, fragment {}, profiles {}", print_id(query_id),
fragment_id, p_profiles.size());
LOG_INFO("register profile done {}, fragment {}, profiles {}", print_id(query_id), fragment_id,
p_profiles.size());
}

void RuntimeQueryStatisticsMgr::_report_query_profiles_function() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ public static String printId(final TUniqueId id) {
// id is a String generated by DebugUtil.printId(TUniqueId)
public static TUniqueId parseTUniqueIdFromString(String id) {
if (Strings.isNullOrEmpty(id)) {
throw new NumberFormatException("invalid query id");
throw new NumberFormatException("query id null or empty");
}

String[] parts = id.split("-");
if (parts.length != 2) {
throw new NumberFormatException("invalid query id");
throw new NumberFormatException("invalid query id: " + id);
}

TUniqueId uniqueId = new TUniqueId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,40 @@
import java.util.Collection;
/**
* kill query command
* follow https://dev.mysql.com/doc/refman/8.4/en/kill.html
*/

public class KillQueryCommand extends KillCommand {
private static final Logger LOG = LogManager.getLogger(KillQueryCommand.class);
private final String queryId;
private final String processId;

public KillQueryCommand(String queryId) {
public KillQueryCommand(String processId) {
super(PlanType.KILL_QUERY_COMMAND);
this.queryId = queryId;
this.processId = processId;
}

@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
ConnectContext killCtx = ctx.getConnectScheduler().getContextWithQueryId(queryId);
ConnectContext killCtx = ctx.getConnectScheduler().getContextWithQueryId(processId);
if (killCtx == null) {
try {
Integer connectionId = Integer.valueOf(processId);
killCtx = ctx.getConnectScheduler().getContext(connectionId);
} catch (NumberFormatException e) {
// processId is query id
}
}
// when killCtx == null, this means the query not in FE,
// then we just send kill signal to BE
if (killCtx == null) {
TUniqueId tQueryId = null;
try {
tQueryId = DebugUtil.parseTUniqueIdFromString(queryId);
tQueryId = DebugUtil.parseTUniqueIdFromString(processId);
} catch (NumberFormatException e) {
throw new UserException(e.getMessage());
}

LOG.info("kill query {}", queryId);
LOG.info("kill query {}", processId);
Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo()
.getAllBackendsByAllCluster().values();
for (Backend be : nodesToPublish) {
Expand All @@ -75,7 +84,7 @@ public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
.cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId,
cancelReason);
} catch (Throwable t) {
LOG.info("send kill query {} rpc to be {} failed", queryId, be);
LOG.info("send kill query {} rpc to be {} failed", processId, be);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ public void testParseIdFromString() {
try {
nullTUniqueId = DebugUtil.parseTUniqueIdFromString(null);
} catch (NumberFormatException e) {
Assert.assertTrue("invalid query id".equals(e.getMessage()));
Assert.assertEquals("query id null or empty", e.getMessage());
}
Assert.assertTrue(nullTUniqueId == null);


try {
nullTUniqueId = DebugUtil.parseTUniqueIdFromString("");
} catch (NumberFormatException e) {
Assert.assertTrue("invalid query id".equals(e.getMessage()));
Assert.assertEquals("query id null or empty", e.getMessage());
}
Assert.assertTrue(nullTUniqueId == null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private void runBefore() throws IOException {
accessControllerManager = env.getAccessManager();
ConnectScheduler scheduler = new ConnectScheduler(10);
connectContext.setQualifiedUser("root");
scheduler.registerConnection(connectContext);
new Expectations() {
{
accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN);
Expand All @@ -69,4 +70,15 @@ public void testKillQuery() throws Exception {
Assertions.assertDoesNotThrow(() -> command.doRun(connectContext, stmtExecutor));
Assertions.assertEquals(connectContext.getState().getStateType(), QueryState.MysqlStateType.OK);
}

@Test
public void testKillQueryByConnection() throws Exception {
runBefore();
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, "select 1");
stmtExecutor.execute();
String connectionId = String.valueOf(stmtExecutor.getContext().getConnectionId());
KillQueryCommand command = new KillQueryCommand(connectionId);
Assertions.assertDoesNotThrow(() -> command.doRun(connectContext, stmtExecutor));
Assertions.assertEquals(connectContext.getState().getStateType(), QueryState.MysqlStateType.OK);
}
}
Loading