Skip to content

Commit

Permalink
*: fix Command and Time in show processlist (pingcap#7844)
Browse files Browse the repository at this point in the history
 Conflicts:
       executor/adapter.go
       executor/show_test.go
       server/conn.go
       server/driver_tidb.go
       sessionctx/variable/session.go
  • Loading branch information
Lingyu Song authored and SunRunAway committed Jun 20, 2019
1 parent 3765b91 commit e0b0527
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 75 deletions.
37 changes: 16 additions & 21 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,16 @@ import (

// processinfoSetter is the interface use to set current running process info.
type processinfoSetter interface {
SetProcessInfo(string)
SetProcessInfo(string, time.Time, byte)
}

// recordSet wraps an executor, implements sqlexec.RecordSet interface
type recordSet struct {
fields []*ast.ResultField
executor Executor
stmt *ExecStmt
processinfo processinfoSetter
lastErr error
txnStartTS uint64
fields []*ast.ResultField
executor Executor
stmt *ExecStmt
lastErr error
txnStartTS uint64
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -124,9 +123,6 @@ func (a *recordSet) NewChunk() *chunk.Chunk {
func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil)
if a.processinfo != nil {
a.processinfo.SetProcessInfo("")
}
a.stmt.logAudit()
return errors.Trace(err)
}
Expand Down Expand Up @@ -226,6 +222,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
return nil, errors.Trace(err)
}

cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var pi processinfoSetter
if raw, ok := sctx.(processinfoSetter); ok {
pi = raw
Expand All @@ -237,17 +235,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}
}
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql)
pi.SetProcessInfo(sql, time.Now(), cmd)
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
return a.handleNoDelayExecutor(ctx, sctx, e, pi)
return a.handleNoDelayExecutor(ctx, sctx, e)
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
return a.handleNoDelayExecutor(ctx, sctx, e, pi)
return a.handleNoDelayExecutor(ctx, sctx, e)
}

var txnStartTS uint64
Expand All @@ -259,14 +258,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
txnStartTS = txn.StartTS()
}
return &recordSet{
executor: e,
stmt: a,
processinfo: pi,
txnStartTS: txnStartTS,
executor: e,
stmt: a,
txnStartTS: txnStartTS,
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor, pi processinfoSetter) (sqlexec.RecordSet, error) {
func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand All @@ -279,9 +277,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co

var err error
defer func() {
if pi != nil {
pi.SetProcessInfo("")
}
terror.Log(errors.Trace(e.Close()))
a.logAudit()
}()
Expand Down
24 changes: 0 additions & 24 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
"golang.org/x/net/context"
Expand Down Expand Up @@ -470,33 +469,10 @@ type mockSessionManager struct {
session.Session
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo {
ps := msm.ShowProcess()
return map[uint64]util.ProcessInfo{ps.ID: ps}
}

// Kill implements the SessionManager.Kill interface.
func (msm *mockSessionManager) Kill(cid uint64, query bool) {
}

func (s *testSuite) TestShowFullProcessList(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("select 1") // for tk.Se init

se := tk.Se
se.SetSessionManager(&mockSessionManager{se})

fullSQL := "show full processlist"
simpSQL := "show processlist"

cols := []int{4, 5, 6, 7} // columns to check: Command, Time, State, Info
tk.MustQuery(fullSQL).CheckAt(cols, testutil.RowsWithSep("|", "Query|0|2|"+fullSQL))
tk.MustQuery(simpSQL).CheckAt(cols, testutil.RowsWithSep("|", "Query|0|2|"+simpSQL[:100]))

se.SetSessionManager(nil) // reset sm so other tests won't use this
}

type stats struct {
}

Expand Down
37 changes: 37 additions & 0 deletions mysql/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ const (
ComDaemon
ComBinlogDumpGtid
ComResetConnection
ComEnd
)

// Client information.
Expand Down Expand Up @@ -278,6 +279,42 @@ var Priv2UserCol = map[PrivilegeType]string{
IndexPriv: "Index_priv",
}

// Command2Str is the command information to command name.
var Command2Str = map[byte]string{
ComSleep: "Sleep",
ComQuit: "Quit",
ComInitDB: "Init DB",
ComQuery: "Query",
ComFieldList: "Field List",
ComCreateDB: "Create DB",
ComDropDB: "Drop DB",
ComRefresh: "Refresh",
ComShutdown: "Shutdown",
ComStatistics: "Statistics",
ComProcessInfo: "Processlist",
ComConnect: "Connect",
ComProcessKill: "Kill",
ComDebug: "Debug",
ComPing: "Ping",
ComTime: "Time",
ComDelayedInsert: "Delayed Insert",
ComChangeUser: "Change User",
ComBinlogDump: "Binlog Dump",
ComTableDump: "Table Dump",
ComConnectOut: "Connect out",
ComRegisterSlave: "Register Slave",
ComStmtPrepare: "Prepare",
ComStmtExecute: "Execute",
ComStmtSendLongData: "Long Data",
ComStmtClose: "Close stmt",
ComStmtReset: "Reset stmt",
ComSetOption: "Set option",
ComStmtFetch: "Fetch",
ComDaemon: "Daemon",
ComBinlogDumpGtid: "Binlog Dump",
ComResetConnection: "Reset connect",
}

// Col2PrivType is the privilege tables column name to privilege type.
var Col2PrivType = map[string]PrivilegeType{
"Create_priv": CreatePriv,
Expand Down
10 changes: 10 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,13 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
cc.mu.cancelFunc = cancelFunc
cc.mu.Unlock()

t := time.Now()
cmd := data[0]
data = data[1:]
cc.lastCmd = hack.String(data)
token := cc.server.getToken()
defer func() {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep)
cc.server.releaseToken(token)
span.Finish()
}()
Expand All @@ -639,6 +641,14 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
cc.ctx.SetCommandValue(cmd)
}

switch cmd {
case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset,
mysql.ComSetOption, mysql.ComChangeUser:
cc.ctx.SetProcessInfo("", t, cmd)
case mysql.ComInitDB:
cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd)
}

switch cmd {
case mysql.ComSleep:
// TODO: According to mysql document, this command is supposed to be used only internally.
Expand Down
6 changes: 6 additions & 0 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"fmt"
"math"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -218,6 +219,11 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch")
}
sql := ""
if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok {
sql = prepared.sql
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute)
rs := stmt.GetResultSet()
if rs == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
Expand Down
3 changes: 3 additions & 0 deletions server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"crypto/tls"
"fmt"
"time"

"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -47,6 +48,8 @@ type QueryCtx interface {
// SetValue saves a value associated with this context for key.
SetValue(key fmt.Stringer, value interface{})

SetProcessInfo(sql string, t time.Time, command byte)

// CommitTxn commits the transaction operations.
CommitTxn(ctx context.Context) error

Expand Down
18 changes: 13 additions & 5 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"crypto/tls"
"fmt"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -62,6 +63,7 @@ type TiDBStatement struct {
paramsType []byte
ctx *TiDBContext
rs ResultSet
sql string
}

// ID implements PreparedStatement ID method.
Expand Down Expand Up @@ -210,6 +212,11 @@ func (tc *TiDBContext) CommitTxn(ctx context.Context) error {
return tc.session.CommitTxn(ctx)
}

// SetProcessInfo implements QueryCtx SetProcessInfo method.
func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte) {
tc.session.SetProcessInfo(sql, t, command)
}

// RollbackTxn implements QueryCtx RollbackTxn method.
func (tc *TiDBContext) RollbackTxn() error {
return tc.session.RollbackTxn(context.TODO())
Expand Down Expand Up @@ -303,6 +310,7 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns
return
}
stmt := &TiDBStatement{
sql: sql,
id: stmtID,
numParams: paramCount,
boundParams: make([][]byte, paramCount),
Expand All @@ -328,16 +336,16 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo {
return tc.session.ShowProcess()
}

// GetSessionVars return SessionVars.
func (tc *TiDBContext) GetSessionVars() *variable.SessionVars {
return tc.session.GetSessionVars()
}

// SetCommandValue implements QueryCtx SetCommandValue method.
func (tc *TiDBContext) SetCommandValue(command byte) {
tc.session.SetCommandValue(command)
}

// GetSessionVars return SessionVars.
func (tc *TiDBContext) GetSessionVars() *variable.SessionVars {
return tc.session.GetSessionVars()
}

type tidbResultSet struct {
recordSet sqlexec.RecordSet
columns []*ColumnInfo
Expand Down
23 changes: 23 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,29 @@ func checkErrorCode(c *C, e error, codes ...uint16) {
c.Assert(isMatchCode, IsTrue, Commentf("got err %v, expected err codes %v", me, codes))
}

func runTestShowProcessList(c *C) {
runTests(c, nil, func(dbt *DBTest) {
fullSQL := "show full processlist"
simpSQL := "show processlist"
rows := dbt.mustQuery(fullSQL)
c.Assert(rows.Next(), IsTrue)
var outA, outB, outC, outD, outE, outF, outG, outH, outI string
err := rows.Scan(&outA, &outB, &outC, &outD, &outE, &outF, &outG, &outH, &outI)
c.Assert(err, IsNil)
c.Assert(outE, Equals, "Query")
c.Assert(outF, Equals, "0")
c.Assert(outG, Equals, "2")
c.Assert(outH, Equals, fullSQL)
rows = dbt.mustQuery(simpSQL)
err = rows.Scan(&outA, &outB, &outC, &outD, &outE, &outF, &outG, &outH, &outI)
c.Assert(err, IsNil)
c.Assert(outE, Equals, "Query")
c.Assert(outF, Equals, "0")
c.Assert(outG, Equals, "2")
c.Assert(outH, Equals, simpSQL[:100])
})
}

func runTestAuth(c *C) {
runTests(c, nil, func(dbt *DBTest) {
dbt.mustExec(`CREATE USER 'authtest'@'%' IDENTIFIED BY '123';`)
Expand Down
14 changes: 0 additions & 14 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,17 +526,3 @@ func (ts *TidbTestSuite) TestSumAvg(c *C) {
c.Parallel()
runTestSumAvg(c)
}

func (ts *TidbTestSuite) TestShowProcess(c *C) {
qctx, err := ts.tidbdrv.OpenCtx(uint64(0), 0, uint8(tmysql.DefaultCollationID), "test", nil)
c.Assert(err, IsNil)
ctx := context.Background()
results, err := qctx.Execute(ctx, "select 1")
c.Assert(err, IsNil)
pi := qctx.ShowProcess()
c.Assert(pi.Command, Equals, "Query")
results[0].Close()
pi = qctx.ShowProcess()
c.Assert(pi.Command, Equals, "Sleep")
qctx.Close()
}
Loading

0 comments on commit e0b0527

Please sign in to comment.