Skip to content

Commit

Permalink
*: add a column describing memory usage for table information_schema…
Browse files Browse the repository at this point in the history
….processlist (pingcap#10837)

     *: add a column describing memory usage for table information_schema.processlist

     Closes pingcap#10199

      Conflicts:
             executor/show.go
             infoschema/tables.go
             infoschema/tables_test.go
             util/misc_test.go
             util/processinfo.go
  • Loading branch information
SunRunAway committed Jun 20, 2019
1 parent e0b0527 commit a5e6cb0
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 64 deletions.
5 changes: 0 additions & 5 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ func (s *testSuite) TestAggregation(c *C) {
result = tk.MustQuery("SELECT COALESCE ( + 1, cor0.col0 ) + - CAST( NULL AS DECIMAL ) FROM t2, t1 AS cor0, t2 AS cor1 GROUP BY cor0.col1")
result.Check(testkit.Rows("<nil>", "<nil>"))

result = tk.MustQuery("select count(*) from information_schema.columns")
// When adding new memory columns in information_schema, please update this variable.
columnCountOfAllInformationSchemaTables := "793"
result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables))

tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (c1 int)")
Expand Down
16 changes: 8 additions & 8 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ type testExecSuite struct {

// mockSessionManager is a mocked session manager which is used for test.
type mockSessionManager struct {
PS []util.ProcessInfo
PS []*util.ProcessInfo
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo {
ret := make(map[uint64]util.ProcessInfo)
func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
ret[item.ID] = item
}
Expand All @@ -56,19 +56,19 @@ func (msm *mockSessionManager) Kill(cid uint64, query bool) {

func (s *testExecSuite) TestShowProcessList(c *C) {
// Compose schema.
names := []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info", "Mem"}
names := []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
ftypes := []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString, mysql.TypeLonglong}
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
schema := buildSchema(names, ftypes)

// Compose a mocked session manager.
ps := make([]util.ProcessInfo, 0, 1)
pi := util.ProcessInfo{
ps := make([]*util.ProcessInfo, 0, 1)
pi := &util.ProcessInfo{
ID: 0,
User: "test",
Host: "127.0.0.1",
DB: "test",
Command: "select * from t",
Command: 't',
State: 1,
Info: "",
}
Expand Down
20 changes: 2 additions & 18 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,24 +203,8 @@ func (e *ShowExec) fetchShowProcessList() error {

pl := sm.ShowProcessList()
for _, pi := range pl {
var info string
if e.Full {
info = pi.Info
} else {
info = fmt.Sprintf("%.100v", pi.Info)
}

e.appendRow([]interface{}{
pi.ID,
pi.User,
pi.Host,
pi.DB,
pi.Command,
uint64(time.Since(pi.Time) / time.Second),
fmt.Sprintf("%d", pi.State),
info,
pi.Mem,
})
row := pi.ToRowForShow(e.Full)
e.appendRow(row)
}
return nil
}
Expand Down
19 changes: 4 additions & 15 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ var tableProcesslistCols = []columnInfo{
{"COMMAND", mysql.TypeVarchar, 16, mysql.NotNullFlag, "", nil},
{"TIME", mysql.TypeLong, 7, mysql.NotNullFlag, 0, nil},
{"STATE", mysql.TypeVarchar, 7, 0, nil, nil},
{"Info", mysql.TypeString, 512, 0, nil, nil},
{"INFO", mysql.TypeString, 512, 0, nil, nil},
{"MEM", mysql.TypeLonglong, 21, 0, nil, nil},
}

var tableTiDBIndexesCols = []columnInfo{
Expand Down Expand Up @@ -626,20 +627,8 @@ func dataForProcesslist(ctx sessionctx.Context) [][]types.Datum {
var records [][]types.Datum
pl := sm.ShowProcessList()
for _, pi := range pl {
var t uint64
if len(pi.Info) != 0 {
t = uint64(time.Since(pi.Time) / time.Second)
}
record := types.MakeDatums(
pi.ID,
pi.User,
pi.Host,
pi.DB,
pi.Command,
t,
fmt.Sprintf("%d", pi.State),
pi.Info,
)
rows := pi.ToRow()
record := types.MakeDatums(rows...)
records = append(records, record)
}
return records
Expand Down
109 changes: 109 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ import (
"fmt"
"os"
"strconv"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -87,6 +92,18 @@ func (s *testSuite) TestInfoschemaFieldValue(c *C) {
}, nil, nil), IsTrue)

tk1.MustQuery("select distinct(table_schema) from information_schema.tables").Check(testkit.Rows("INFORMATION_SCHEMA"))

// Fix issue 9836
sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)}
sm.processInfoMap[1] = &util.ProcessInfo{
ID: 1,
User: "root",
Host: "127.0.0.1",
Command: mysql.ComQuery,
StmtCtx: tk.Se.GetSessionVars().StmtCtx,
}
tk.Se.SetSessionManager(sm)
tk.MustQuery("SELECT user,host,command FROM information_schema.processlist;").Check(testkit.Rows("root 127.0.0.1 Query"))
}

func (s *testSuite) TestDataForTableStatsField(c *C) {
Expand Down Expand Up @@ -227,6 +244,98 @@ func (s *testSuite) TestCharacterSetCollations(c *C) {
tk.MustExec("DROP DATABASE charset_collate_test")
}

var _ = Suite(&testTableSuite{})

type testTableSuite struct {
store kv.Storage
dom *domain.Domain
}

func (s *testTableSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetStatsLease(0)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testTableSuite) TearDownSuite(c *C) {
defer testleak.AfterTest(c)()
s.dom.Close()
s.store.Close()
}

type mockSessionManager struct {
processInfoMap map[uint64]*util.ProcessInfo
}

func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { return sm.processInfoMap }

func (sm *mockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
rs, ok := sm.processInfoMap[id]
return rs, ok
}

func (sm *mockSessionManager) Kill(connectionID uint64, query bool) {}

func (s *testTableSuite) TestSomeTables(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustQuery("select * from information_schema.COLLATION_CHARACTER_SET_APPLICABILITY where COLLATION_NAME='utf8mb4_bin';").Check(
testkit.Rows("utf8mb4_bin utf8mb4"))
tk.MustQuery("select * from information_schema.SESSION_VARIABLES where VARIABLE_NAME='tidb_retry_limit';").Check(testkit.Rows("tidb_retry_limit 10"))
// cherry-pick https://github.com/pingcap/tidb/pull/7831
//tk.MustQuery("select * from information_schema.ENGINES;").Check(testkit.Rows("InnoDB DEFAULT Supports transactions, row-level locking, and foreign keys YES YES YES"))
tk.MustQuery("select * from information_schema.TABLE_CONSTRAINTS where TABLE_NAME='gc_delete_range';").Check(testkit.Rows("def mysql delete_range_index mysql gc_delete_range UNIQUE"))
tk.MustQuery("select * from information_schema.KEY_COLUMN_USAGE where TABLE_NAME='stats_meta' and COLUMN_NAME='table_id';").Check(
testkit.Rows("def mysql tbl def mysql stats_meta table_id 1 <nil> <nil> <nil> <nil>"))
// https://github.com/pingcap/tidb/pull/9898
//tk.MustQuery("select * from information_schema.STATISTICS where TABLE_NAME='columns_priv' and COLUMN_NAME='Host';").Check(
// testkit.Rows("def mysql columns_priv 0 mysql PRIMARY 1 Host A <nil> <nil> <nil> BTREE "))
tk.MustQuery("select * from information_schema.USER_PRIVILEGES where PRIVILEGE_TYPE='Select';").Check(testkit.Rows("'root'@'%' def Select YES"))

sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)}
sm.processInfoMap[1] = &util.ProcessInfo{
ID: 1,
User: "user-1",
Host: "localhost",
DB: "information_schema",
Command: byte(1),
State: 1,
Info: "do something",
StmtCtx: tk.Se.GetSessionVars().StmtCtx,
}
sm.processInfoMap[2] = &util.ProcessInfo{
ID: 2,
User: "user-2",
Host: "localhost",
DB: "test",
Command: byte(2),
State: 2,
Info: strings.Repeat("x", 101),
StmtCtx: tk.Se.GetSessionVars().StmtCtx,
}
tk.Se.SetSessionManager(sm)
tk.MustQuery("select * from information_schema.PROCESSLIST order by ID;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 1 %s 0", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 2 %s 0", strings.Repeat("x", 101)),
))
tk.MustQuery("SHOW PROCESSLIST;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 1 %s", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 2 %s", strings.Repeat("x", 100)),
))
tk.MustQuery("SHOW FULL PROCESSLIST;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 1 %s", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 2 %s", strings.Repeat("x", 101)),
))
}

func (s *testSuite) TestSchemataCharacterSet(c *C) {
testleak.BeforeTest()
defer testleak.AfterTest(c)()
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ type LogicalUnionScan struct {
conditions []expression.Expression
}

// DataSource represents a tablescan without condition push down.
// DataSource represents a tableScan without condition push down.
type DataSource struct {
logicalSchemaProducer

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1799,9 +1799,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) {
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
}
case ast.ShowProcessList:
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info", "Mem"}
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString, mysql.TypeLonglong}
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
case ast.ShowPumpStatus:
names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar}
Expand Down
2 changes: 1 addition & 1 deletion server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type QueryCtx interface {
Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool

// ShowProcess shows the information about the session.
ShowProcess() util.ProcessInfo
ShowProcess() *util.ProcessInfo

// GetSessionVars return SessionVars.
GetSessionVars() *variable.SessionVars
Expand Down
2 changes: 1 addition & 1 deletion server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns
}

// ShowProcess implements QueryCtx ShowProcess method.
func (tc *TiDBContext) ShowProcess() util.ProcessInfo {
func (tc *TiDBContext) ShowProcess() *util.ProcessInfo {
return tc.session.ShowProcess()
}

Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,9 @@ func (cc *clientConn) connectInfo() *variable.ConnectionInfo {
}

// ShowProcessList implements the SessionManager interface.
func (s *Server) ShowProcessList() map[uint64]util.ProcessInfo {
func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo {
s.rwlock.RLock()
rs := make(map[uint64]util.ProcessInfo, len(s.clients))
rs := make(map[uint64]*util.ProcessInfo, len(s.clients))
for _, client := range s.clients {
if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {
continue
Expand Down
16 changes: 8 additions & 8 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import (
"golang.org/x/net/context"
)

// Session context
// Session context, it is consistent with the lifecycle of a client connection.
type Session interface {
sessionctx.Context
Status() uint16 // Flag of current status, such as autocommit.
Expand All @@ -89,7 +89,7 @@ type Session interface {
SetSessionManager(util.SessionManager)
Close()
Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool
ShowProcess() util.ProcessInfo
ShowProcess() *util.ProcessInfo
// PrePareTxnCtx is exported for test.
PrepareTxnCtx(context.Context)
// FieldList returns fields list of a table.
Expand Down Expand Up @@ -787,16 +787,17 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte) {
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
DB: s.sessionVars.CurrentDB,
Command: mysql.Command2Str[command],
Command: command,
Time: t,
State: s.Status(),
Info: sql,
StmtCtx: s.sessionVars.StmtCtx,
}
if s.sessionVars.User != nil {
pi.User = s.sessionVars.User.Username
pi.Host = s.sessionVars.User.Hostname
}
s.processInfo.Store(pi)
s.processInfo.Store(&pi)
}

func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode ast.StmtNode, stmt sqlexec.Statement, recordSets []sqlexec.RecordSet) ([]sqlexec.RecordSet, error) {
Expand Down Expand Up @@ -1528,12 +1529,11 @@ func (s *session) GetStore() kv.Storage {
return s.store
}

func (s *session) ShowProcess() util.ProcessInfo {
var pi util.ProcessInfo
func (s *session) ShowProcess() *util.ProcessInfo {
var pi *util.ProcessInfo
tmp := s.processInfo.Load()
if tmp != nil {
pi = tmp.(util.ProcessInfo)
pi.Mem = s.GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
pi = tmp.(*util.ProcessInfo)
}
return pi
}
Expand Down
Loading

0 comments on commit a5e6cb0

Please sign in to comment.