diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 05de6977371267..d6888cf6caadeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -864,12 +864,30 @@ public PlSqlOperation getPlSqlOperation() { return plSqlOperation; } + /** + * This method is idempotent. + */ protected void closeChannel() { if (mysqlChannel != null) { mysqlChannel.close(); } } + /** + * kill connection by other thread + */ + protected void killConnection() { + isKilled = true; + // Close channel to break connection with client + closeChannel(); + returnRows = 0; + deleteTempTable(); + Env.getCurrentEnv().unregisterSessionInfo(this.sessionId); + } + + /** + * kill connection by self + */ public void cleanup() { closeChannel(); threadLocalInfo.remove(); @@ -1004,9 +1022,7 @@ public void kill(boolean killConnection) { killConnection); if (killConnection) { - isKilled = true; - // Close channel to break connection with client - closeChannel(); + killConnection(); } // Now, cancel running query. cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user")); @@ -1018,9 +1034,7 @@ private void killByTimeout(boolean killConnection) { LOG.warn("kill wait timeout connection, connection type: {}, connectionId: {}, remote: {}, " + "wait timeout: {}", getConnectType(), connectionId, getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); - isKilled = true; - // Close channel to break connection with client - closeChannel(); + killConnection(); } // Now, cancel running query. // cancelQuery by time out diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java index 35293273f1e0aa..111a79745e1f03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java @@ -74,9 +74,7 @@ public void kill(boolean killConnection) { LOG.warn("kill query from {}, kill flight sql connection: {}", getRemoteHostPortString(), killConnection); if (killConnection) { - isKilled = true; - // Close channel and break connection with client. - closeChannel(); + killConnection(); } // Now, cancel running query. cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query killed by user")); diff --git a/regression-test/suites/temp_table_p0/test_temp_table_with_conn_timeout.groovy b/regression-test/suites/temp_table_p0/test_temp_table_with_conn_timeout.groovy new file mode 100644 index 00000000000000..d78774160a03f0 --- /dev/null +++ b/regression-test/suites/temp_table_p0/test_temp_table_with_conn_timeout.groovy @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_temp_table_with_conn_timeout", "p0") { + String db = context.config.getDbNameByFile(context.file) + def tableName = "t_test_temp_table_with_conn_timeout" + String tempTableFullName + sql "select 1" // ensure db is created + connect(context.config.jdbcUser, context.config.jdbcPassword, context.config.jdbcUrl) { + sql"use ${db}" + sql """create temporary table ${tableName}(id int) properties("replication_num" = "1") """ + def show_result = sql_return_maparray("show data") + + show_result.each { row -> + if (row.TableName.contains(tableName)) { + tempTableFullName = row.TableName + } + } + assert tempTableFullName != null + + // set session variable for a short connection timeout + sql "set interactive_timeout=5" + sql "set wait_timeout=5" + + sleep(10*1000) + } + + // temp table should not exist after session exit + def tables = sql_return_maparray("show data") + assert tables.find { it.TableName == tempTableFullName } == null +}