Skip to content

Commit

Permalink
planner, executor: support SQL show pump/drainer status (#9456)
Browse files Browse the repository at this point in the history
  • Loading branch information
caohe authored and WangXiangUSTC committed Mar 4, 2019
1 parent f16081b commit e7ff050
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
47 changes: 47 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb-tools/pkg/etcd"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
Expand All @@ -44,6 +48,8 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
)

var etcdDialTimeout = 5 * time.Second

// ShowExec represents a show executor.
type ShowExec struct {
baseExecutor
Expand Down Expand Up @@ -115,6 +121,8 @@ func (e *ShowExec) fetchAll() error {
return e.fetchShowCreateDatabase()
case ast.ShowDatabases:
return e.fetchShowDatabases()
case ast.ShowDrainerStatus:
return e.fetchShowPumpOrDrainerStatus(node.DrainerNode)
case ast.ShowEngines:
return e.fetchShowEngines()
case ast.ShowGrants:
Expand All @@ -123,6 +131,8 @@ func (e *ShowExec) fetchAll() error {
return e.fetchShowIndex()
case ast.ShowProcedureStatus:
return e.fetchShowProcedureStatus()
case ast.ShowPumpStatus:
return e.fetchShowPumpOrDrainerStatus(node.PumpNode)
case ast.ShowStatus:
return e.fetchShowStatus()
case ast.ShowTables:
Expand Down Expand Up @@ -996,6 +1006,43 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error {
return nil
}

// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows.
func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
registry, err := createRegistry(config.GetGlobalConfig().Path)
if err != nil {
return errors.Trace(err)
}

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
err = registry.Close()
if err != nil {
return errors.Trace(err)
}

for _, n := range nodes {
e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format(types.TimeFormat)})
}

return nil
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
ectdEndpoints, err := utils.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
}
cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
if err != nil {
return nil, errors.Trace(err)
}

return node.NewEtcdRegistry(cli, etcdDialTimeout), nil
}

func (e *ShowExec) getTable() (table.Table, error) {
if e.Table == nil {
return nil, errors.New("table not found")
Expand Down
6 changes: 6 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) {
names = []string{"View", "Create View", "character_set_client", "collation_connection"}
case ast.ShowCreateDatabase:
names = []string{"Database", "Create Database"}
case ast.ShowDrainerStatus:
names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar}
case ast.ShowGrants:
if s.User != nil {
names = []string{fmt.Sprintf("Grants for %s", s.User)}
Expand All @@ -1773,6 +1776,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) {
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
case ast.ShowPumpStatus:
names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar}
case ast.ShowStatsMeta:
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong}
Expand Down

0 comments on commit e7ff050

Please sign in to comment.