diff --git a/executor/simple.go b/executor/simple.go index e54ef77c9a6bf..cf91d77687c72 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -305,7 +305,13 @@ func (e *SimpleExec) executeSetPwd(s *ast.SetPwdStmt) error { } func (e *SimpleExec) executeKillStmt(s *ast.KillStmt) error { - // TODO: Implement it. + if s.TiDBExtension { + sm := e.ctx.GetSessionManager() + if sm == nil { + return nil + } + sm.Kill(s.ConnectionID, s.Query) + } return nil } diff --git a/parser/parser_test.go b/parser/parser_test.go index 97938e100f28b..2f92ab7d618c3 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -1472,6 +1472,7 @@ func (s *testParserSuite) TestSessionManage(c *C) { {"kill tidb 23123", true}, {"kill tidb connection 23123", true}, {"kill tidb query 23123", true}, + {"show processlist", true}, } s.RunTest(c, table) } diff --git a/server/conn.go b/server/conn.go index 09f8346b47c28..8de8411deffd9 100644 --- a/server/conn.go +++ b/server/conn.go @@ -77,6 +77,7 @@ type clientConn struct { lastCmd string // latest sql query string, currently used for logging error. ctx QueryCtx // an interface to execute sql statements. attrs map[string]string // attributes parsed from client handshake response, not used for now. + killed bool } func (cc *clientConn) String() string { @@ -336,7 +337,7 @@ func (cc *clientConn) Run() { cc.Close() }() - for { + for !cc.killed { cc.alloc.Reset() data, err := cc.readPacket() if err != nil { diff --git a/server/driver.go b/server/driver.go index 3e4fca667afba..f42b670b193c1 100644 --- a/server/driver.go +++ b/server/driver.go @@ -80,6 +80,9 @@ type QueryCtx interface { ShowProcess() util.ProcessInfo SetSessionManager(util.SessionManager) + + // Cancel the execution of current transaction. + Cancel() } // PreparedStatement is the interface to use a prepared statement. diff --git a/server/driver_tidb.go b/server/driver_tidb.go index e368f9d3b83fb..2fe7671487a12 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -280,6 +280,11 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo { return tc.session.ShowProcess() } +// Cancel implements QueryCtx Cancel method. +func (tc *TiDBContext) Cancel() { + tc.session.Cancel() +} + type tidbResultSet struct { recordSet ast.RecordSet } diff --git a/server/server.go b/server/server.go index 557a0c0f83d41..b92d12707b6d3 100644 --- a/server/server.go +++ b/server/server.go @@ -223,6 +223,18 @@ func (s *Server) ShowProcessList() []util.ProcessInfo { // Kill implements the SessionManager interface. func (s *Server) Kill(connectionID uint64, query bool) { + s.rwlock.Lock() + defer s.rwlock.Unlock() + + conn, ok := s.clients[uint32(connectionID)] + if !ok { + return + } + + conn.ctx.Cancel() + if !query { + conn.killed = true + } } var once sync.Once