diff --git a/executor/adapter.go b/executor/adapter.go index d906ed35b1a1c..57b9b04f2295a 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -46,17 +46,16 @@ import ( // processinfoSetter is the interface use to set current running process info. type processinfoSetter interface { - SetProcessInfo(string) + SetProcessInfo(string, time.Time, byte) } // recordSet wraps an executor, implements sqlexec.RecordSet interface type recordSet struct { - fields []*ast.ResultField - executor Executor - stmt *ExecStmt - processinfo processinfoSetter - lastErr error - txnStartTS uint64 + fields []*ast.ResultField + executor Executor + stmt *ExecStmt + lastErr error + txnStartTS uint64 } func (a *recordSet) Fields() []*ast.ResultField { @@ -124,9 +123,6 @@ func (a *recordSet) NewChunk() *chunk.Chunk { func (a *recordSet) Close() error { err := a.executor.Close() a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil) - if a.processinfo != nil { - a.processinfo.SetProcessInfo("") - } a.stmt.logAudit() return errors.Trace(err) } @@ -226,6 +222,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { return nil, errors.Trace(err) } + cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue) + cmd := byte(cmd32) var pi processinfoSetter if raw, ok := sctx.(processinfoSetter); ok { pi = raw @@ -237,17 +235,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { } } // Update processinfo, ShowProcess() will use it. - pi.SetProcessInfo(sql) + pi.SetProcessInfo(sql, time.Now(), cmd) a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode) } + // If the executor doesn't return any result to the client, we execute it without delay. if e.Schema().Len() == 0 { - return a.handleNoDelayExecutor(ctx, sctx, e, pi) + return a.handleNoDelayExecutor(ctx, sctx, e) } else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay { // Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example: // the Projection has two expressions and two columns in the schema, but we should // not return the result of the two expressions. - return a.handleNoDelayExecutor(ctx, sctx, e, pi) + return a.handleNoDelayExecutor(ctx, sctx, e) } var txnStartTS uint64 @@ -259,14 +258,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { txnStartTS = txn.StartTS() } return &recordSet{ - executor: e, - stmt: a, - processinfo: pi, - txnStartTS: txnStartTS, + executor: e, + stmt: a, + txnStartTS: txnStartTS, }, nil } -func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor, pi processinfoSetter) (sqlexec.RecordSet, error) { +func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) { // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. switch e.(type) { @@ -279,9 +277,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co var err error defer func() { - if pi != nil { - pi.SetProcessInfo("") - } terror.Log(errors.Trace(e.Close())) a.logAudit() }() diff --git a/executor/show_test.go b/executor/show_test.go index e487d834194da..9a811ef3fa22a 100644 --- a/executor/show_test.go +++ b/executor/show_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" "golang.org/x/net/context" @@ -470,33 +469,10 @@ type mockSessionManager struct { session.Session } -// ShowProcessList implements the SessionManager.ShowProcessList interface. -func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo { - ps := msm.ShowProcess() - return map[uint64]util.ProcessInfo{ps.ID: ps} -} - // Kill implements the SessionManager.Kill interface. func (msm *mockSessionManager) Kill(cid uint64, query bool) { } -func (s *testSuite) TestShowFullProcessList(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("select 1") // for tk.Se init - - se := tk.Se - se.SetSessionManager(&mockSessionManager{se}) - - fullSQL := "show full processlist" - simpSQL := "show processlist" - - cols := []int{4, 5, 6, 7} // columns to check: Command, Time, State, Info - tk.MustQuery(fullSQL).CheckAt(cols, testutil.RowsWithSep("|", "Query|0|2|"+fullSQL)) - tk.MustQuery(simpSQL).CheckAt(cols, testutil.RowsWithSep("|", "Query|0|2|"+simpSQL[:100])) - - se.SetSessionManager(nil) // reset sm so other tests won't use this -} - type stats struct { } diff --git a/mysql/const.go b/mysql/const.go index e7d632682883e..47623f1808e0a 100644 --- a/mysql/const.go +++ b/mysql/const.go @@ -122,6 +122,7 @@ const ( ComDaemon ComBinlogDumpGtid ComResetConnection + ComEnd ) // Client information. @@ -278,6 +279,42 @@ var Priv2UserCol = map[PrivilegeType]string{ IndexPriv: "Index_priv", } +// Command2Str is the command information to command name. +var Command2Str = map[byte]string{ + ComSleep: "Sleep", + ComQuit: "Quit", + ComInitDB: "Init DB", + ComQuery: "Query", + ComFieldList: "Field List", + ComCreateDB: "Create DB", + ComDropDB: "Drop DB", + ComRefresh: "Refresh", + ComShutdown: "Shutdown", + ComStatistics: "Statistics", + ComProcessInfo: "Processlist", + ComConnect: "Connect", + ComProcessKill: "Kill", + ComDebug: "Debug", + ComPing: "Ping", + ComTime: "Time", + ComDelayedInsert: "Delayed Insert", + ComChangeUser: "Change User", + ComBinlogDump: "Binlog Dump", + ComTableDump: "Table Dump", + ComConnectOut: "Connect out", + ComRegisterSlave: "Register Slave", + ComStmtPrepare: "Prepare", + ComStmtExecute: "Execute", + ComStmtSendLongData: "Long Data", + ComStmtClose: "Close stmt", + ComStmtReset: "Reset stmt", + ComSetOption: "Set option", + ComStmtFetch: "Fetch", + ComDaemon: "Daemon", + ComBinlogDumpGtid: "Binlog Dump", + ComResetConnection: "Reset connect", +} + // Col2PrivType is the privilege tables column name to privilege type. var Col2PrivType = map[string]PrivilegeType{ "Create_priv": CreatePriv, diff --git a/server/conn.go b/server/conn.go index e957ef877caa4..302a8333eef11 100644 --- a/server/conn.go +++ b/server/conn.go @@ -626,11 +626,13 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.mu.cancelFunc = cancelFunc cc.mu.Unlock() + t := time.Now() cmd := data[0] data = data[1:] cc.lastCmd = hack.String(data) token := cc.server.getToken() defer func() { + cc.ctx.SetProcessInfo("", t, mysql.ComSleep) cc.server.releaseToken(token) span.Finish() }() @@ -639,6 +641,14 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.ctx.SetCommandValue(cmd) } + switch cmd { + case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset, + mysql.ComSetOption, mysql.ComChangeUser: + cc.ctx.SetProcessInfo("", t, cmd) + case mysql.ComInitDB: + cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd) + } + switch cmd { case mysql.ComSleep: // TODO: According to mysql document, this command is supposed to be used only internally. diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 3a6ce7583fd57..6803f17e550e9 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -39,6 +39,7 @@ import ( "fmt" "math" "strconv" + "time" "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" @@ -218,6 +219,11 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err return mysql.NewErr(mysql.ErrUnknownStmtHandler, strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch") } + sql := "" + if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok { + sql = prepared.sql + } + cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute) rs := stmt.GetResultSet() if rs == nil { return mysql.NewErr(mysql.ErrUnknownStmtHandler, diff --git a/server/driver.go b/server/driver.go index bc19a1eece7b0..734956b5117e1 100644 --- a/server/driver.go +++ b/server/driver.go @@ -16,6 +16,7 @@ package server import ( "crypto/tls" "fmt" + "time" "github.com/pingcap/parser/auth" "github.com/pingcap/tidb/sessionctx/variable" @@ -47,6 +48,8 @@ type QueryCtx interface { // SetValue saves a value associated with this context for key. SetValue(key fmt.Stringer, value interface{}) + SetProcessInfo(sql string, t time.Time, command byte) + // CommitTxn commits the transaction operations. CommitTxn(ctx context.Context) error diff --git a/server/driver_tidb.go b/server/driver_tidb.go index c05c250a12ebb..dd3dfc6f00428 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -17,6 +17,7 @@ import ( "crypto/tls" "fmt" "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/parser/ast" @@ -62,6 +63,7 @@ type TiDBStatement struct { paramsType []byte ctx *TiDBContext rs ResultSet + sql string } // ID implements PreparedStatement ID method. @@ -210,6 +212,11 @@ func (tc *TiDBContext) CommitTxn(ctx context.Context) error { return tc.session.CommitTxn(ctx) } +// SetProcessInfo implements QueryCtx SetProcessInfo method. +func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte) { + tc.session.SetProcessInfo(sql, t, command) +} + // RollbackTxn implements QueryCtx RollbackTxn method. func (tc *TiDBContext) RollbackTxn() error { return tc.session.RollbackTxn(context.TODO()) @@ -303,6 +310,7 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns return } stmt := &TiDBStatement{ + sql: sql, id: stmtID, numParams: paramCount, boundParams: make([][]byte, paramCount), @@ -328,16 +336,16 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo { return tc.session.ShowProcess() } -// GetSessionVars return SessionVars. -func (tc *TiDBContext) GetSessionVars() *variable.SessionVars { - return tc.session.GetSessionVars() -} - // SetCommandValue implements QueryCtx SetCommandValue method. func (tc *TiDBContext) SetCommandValue(command byte) { tc.session.SetCommandValue(command) } +// GetSessionVars return SessionVars. +func (tc *TiDBContext) GetSessionVars() *variable.SessionVars { + return tc.session.GetSessionVars() +} + type tidbResultSet struct { recordSet sqlexec.RecordSet columns []*ColumnInfo diff --git a/server/server_test.go b/server/server_test.go index 512a4cffe19ed..a1f6a3e7c291a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -735,6 +735,29 @@ func checkErrorCode(c *C, e error, codes ...uint16) { c.Assert(isMatchCode, IsTrue, Commentf("got err %v, expected err codes %v", me, codes)) } +func runTestShowProcessList(c *C) { + runTests(c, nil, func(dbt *DBTest) { + fullSQL := "show full processlist" + simpSQL := "show processlist" + rows := dbt.mustQuery(fullSQL) + c.Assert(rows.Next(), IsTrue) + var outA, outB, outC, outD, outE, outF, outG, outH, outI string + err := rows.Scan(&outA, &outB, &outC, &outD, &outE, &outF, &outG, &outH, &outI) + c.Assert(err, IsNil) + c.Assert(outE, Equals, "Query") + c.Assert(outF, Equals, "0") + c.Assert(outG, Equals, "2") + c.Assert(outH, Equals, fullSQL) + rows = dbt.mustQuery(simpSQL) + err = rows.Scan(&outA, &outB, &outC, &outD, &outE, &outF, &outG, &outH, &outI) + c.Assert(err, IsNil) + c.Assert(outE, Equals, "Query") + c.Assert(outF, Equals, "0") + c.Assert(outG, Equals, "2") + c.Assert(outH, Equals, simpSQL[:100]) + }) +} + func runTestAuth(c *C) { runTests(c, nil, func(dbt *DBTest) { dbt.mustExec(`CREATE USER 'authtest'@'%' IDENTIFIED BY '123';`) diff --git a/server/tidb_test.go b/server/tidb_test.go index e4a9cae8bbfda..cf3b02fe84be2 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -526,17 +526,3 @@ func (ts *TidbTestSuite) TestSumAvg(c *C) { c.Parallel() runTestSumAvg(c) } - -func (ts *TidbTestSuite) TestShowProcess(c *C) { - qctx, err := ts.tidbdrv.OpenCtx(uint64(0), 0, uint8(tmysql.DefaultCollationID), "test", nil) - c.Assert(err, IsNil) - ctx := context.Background() - results, err := qctx.Execute(ctx, "select 1") - c.Assert(err, IsNil) - pi := qctx.ShowProcess() - c.Assert(pi.Command, Equals, "Query") - results[0].Close() - pi = qctx.ShowProcess() - c.Assert(pi.Command, Equals, "Sleep") - qctx.Close() -} diff --git a/session/session.go b/session/session.go index 5ef2d4d56efa2..b34dda6646034 100644 --- a/session/session.go +++ b/session/session.go @@ -82,6 +82,8 @@ type Session interface { DropPreparedStmt(stmtID uint32) error SetClientCapability(uint32) // Set client capability flags. SetConnectionID(uint64) + SetCommandValue(byte) + SetProcessInfo(string, time.Time, byte) SetTLSState(*tls.ConnectionState) SetCollation(coID int) error SetSessionManager(util.SessionManager) @@ -92,7 +94,6 @@ type Session interface { PrepareTxnCtx(context.Context) // FieldList returns fields list of a table. FieldList(tableName string) (fields []*ast.ResultField, err error) - SetCommandValue(byte) } var ( @@ -204,6 +205,10 @@ func (s *session) SetTLSState(tlsState *tls.ConnectionState) { } } +func (s *session) SetCommandValue(command byte) { + atomic.StoreUint32(&s.sessionVars.CommandValue, uint32(command)) +} + func (s *session) GetTLSState() *tls.ConnectionState { return s.sessionVars.TLSConnectionState } @@ -275,10 +280,6 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { return fields, nil } -func (s *session) SetCommandValue(command byte) { - atomic.StoreUint32(&s.sessionVars.CommandValue, uint32(command)) -} - func (s *session) doCommit(ctx context.Context) error { if !s.txn.Valid() { return nil @@ -782,18 +783,15 @@ func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) return s.parser.Parse(sql, charset, collation) } -func (s *session) SetProcessInfo(sql string) { +func (s *session) SetProcessInfo(sql string, t time.Time, command byte) { pi := util.ProcessInfo{ ID: s.sessionVars.ConnectionID, DB: s.sessionVars.CurrentDB, - Command: "Query", - Time: time.Now(), + Command: mysql.Command2Str[command], + Time: t, State: s.Status(), Info: sql, } - if sql == "" { - pi.Command = "Sleep" - } if s.sessionVars.User != nil { pi.User = s.sessionVars.User.Username pi.Host = s.sessionVars.User.Hostname diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 19f341fd97c71..0072bc39fee93 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -364,6 +364,7 @@ func NewSessionVars() *SessionVars { DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry, DDLReorgPriority: kv.PriorityLow, SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, + CommandValue: uint32(mysql.ComSleep), } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency,