Skip to content

Commit

Permalink
Merge remote-tracking branch 'cui/qiuyesuifeng/hackathon' into tbssql2
Browse files Browse the repository at this point in the history
  • Loading branch information
spongedu committed Oct 17, 2019
2 parents cfa4156 + 3fb5f65 commit 527ff47
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 46 deletions.
11 changes: 5 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
domainutil "github.com/pingcap/tidb/domain/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/infoschema/perfschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -63,7 +63,7 @@ type Domain struct {
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *domainutil.InfoSyncer
info *infosync.InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
Expand Down Expand Up @@ -291,7 +291,7 @@ func (do *Domain) DDL() ddl.DDL {
}

// InfoSyncer gets infoSyncer from domain.
func (do *Domain) InfoSyncer() *domainutil.InfoSyncer {
func (do *Domain) InfoSyncer() *infosync.InfoSyncer {
return do.info
}

Expand Down Expand Up @@ -421,7 +421,7 @@ func (do *Domain) topNSlowQueryLoop() {
func (do *Domain) infoSyncerKeeper() {
defer do.wg.Done()
defer recoverInDomain("infoSyncerKeeper", false)
ticker := time.NewTicker(time.Second * time.Duration(domainutil.InfoSessionTTL) / 2)
ticker := time.NewTicker(time.Second * time.Duration(infosync.InfoSessionTTL) / 2)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -661,8 +661,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
if err != nil {
return err
}
do.info = domainutil.NewInfoSyncer(do.ddl.GetID(), do.etcdClient)
err = do.info.Init(ctx)
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.etcdClient)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -116,21 +117,21 @@ func TestInfo(t *testing.T) {

// Test for GetServerInfo and GetServerInfoByID.
ddlID := dom.ddl.GetID()
serverInfo := dom.InfoSyncer().GetServerInfo()
info, err := dom.info.GetServerInfoByID(goCtx, ddlID)
serverInfo := infosync.GetServerInfo()
info, err := infosync.GetServerInfoByID(goCtx, ddlID)
if err != nil {
t.Fatal(err)
}
if serverInfo.ID != info.ID {
t.Fatalf("server self info %v, info %v", serverInfo, info)
}
_, err = dom.info.GetServerInfoByID(goCtx, "not_exist_id")
_, err = infosync.GetServerInfoByID(goCtx, "not_exist_id")
if err == nil || (err != nil && err.Error() != "[info-syncer] get /tidb/server/info/not_exist_id failed") {
t.Fatal(err)
}

// Test for GetAllServerInfo.
infos, err := dom.info.GetAllServerInfo(goCtx)
infos, err := infosync.GetAllServerInfo(goCtx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestInfo(t *testing.T) {

// Test for RemoveServerInfo.
dom.info.RemoveServerInfo()
infos, err = dom.info.GetAllServerInfo(goCtx)
infos, err = infosync.GetAllServerInfo(goCtx)
if err != nil || len(infos) != 0 {
t.Fatalf("err %v, infos %v", err, infos)
}
Expand Down
40 changes: 30 additions & 10 deletions domain/util/info.go → domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package util
package infosync

import (
"context"
Expand Down Expand Up @@ -79,18 +79,21 @@ type ServerVersionInfo struct {
GitHash string `json:"git_hash"`
}

// NewInfoSyncer return new InfoSyncer. It is exported for testing.
func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer {
return &InfoSyncer{
var globalInfoSyncer *InfoSyncer

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(ctx context.Context, id string, etcdCli *clientv3.Client) (*InfoSyncer, error) {
globalInfoSyncer = &InfoSyncer{
etcdCli: etcdCli,
info: getServerInfo(id),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
}
return globalInfoSyncer, globalInfoSyncer.init(ctx)
}

// Init creates a new etcd session and stores server info to etcd.
func (is *InfoSyncer) Init(ctx context.Context) error {
func (is *InfoSyncer) init(ctx context.Context) error {
return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
}

Expand All @@ -100,12 +103,22 @@ func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) {
}

// GetServerInfo gets self server static information.
func (is *InfoSyncer) GetServerInfo() *ServerInfo {
return is.info
func GetServerInfo() *ServerInfo {
if globalInfoSyncer == nil {
return nil
}
return globalInfoSyncer.info
}

// GetServerInfoByID gets server static information from etcd.
func (is *InfoSyncer) GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) {
// GetServerInfoByID gets specified server static information from etcd.
func GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) {
if globalInfoSyncer == nil {
return nil, errors.New("infoSyncer is not initialized")
}
return globalInfoSyncer.getServerInfoByID(ctx, id)
}

func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) {
if is.etcdCli == nil || id == is.info.ID {
return is.info, nil
}
Expand All @@ -122,7 +135,14 @@ func (is *InfoSyncer) GetServerInfoByID(ctx context.Context, id string) (*Server
}

// GetAllServerInfo gets all servers static information from etcd.
func (is *InfoSyncer) GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
if globalInfoSyncer == nil {
return nil, errors.New("infoSyncer is not initialized")
}
return globalInfoSyncer.getAllServerInfo(ctx)
}

func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
allInfo := make(map[string]*ServerInfo)
if is.etcdCli == nil {
allInfo[is.info.ID] = is.info
Expand Down
4 changes: 2 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -297,8 +298,7 @@ func (e *ShowDDLExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
}

do := domain.GetDomain(e.ctx)
serverInfo, err := do.InfoSyncer().GetServerInfoByID(ctx, e.ddlOwnerID)
serverInfo, err := infosync.GetServerInfoByID(ctx, e.ddlOwnerID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -299,8 +300,7 @@ func (s *testSuiteP1) TestAdmin(c *C) {
// rowOwnerInfos := strings.Split(row.Data[1].GetString(), ",")
// ownerInfos := strings.Split(ddlInfo.Owner.String(), ",")
// c.Assert(rowOwnerInfos[0], Equals, ownerInfos[0])
do := domain.GetDomain(tk.Se.(sessionctx.Context))
serverInfo, err := do.InfoSyncer().GetServerInfoByID(ctx, row.GetString(1))
serverInfo, err := infosync.GetServerInfoByID(ctx, row.GetString(1))
c.Assert(err, IsNil)
c.Assert(row.GetString(2), Equals, serverInfo.IP+":"+
strconv.FormatUint(uint64(serverInfo.Port), 10))
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ require (
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
golang.org/x/tools v0.0.0-20190911022129-16c5e0f7d110
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 // indirect
google.golang.org/grpc v1.23.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
Expand Down
37 changes: 37 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package infoschema

import (
"context"
"encoding/json"
"fmt"
"sort"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/privilege"
Expand Down Expand Up @@ -81,6 +83,7 @@ const (
tableAnalyzeStatus = "ANALYZE_STATUS"
tableTiKVRegionStatus = "TIKV_REGION_STATUS"
tableTiKVRegionPeers = "TIKV_REGION_PEERS"
tableTiDBServersInfo = "TIDB_SERVERS_INFO"
)

type columnInfo struct {
Expand Down Expand Up @@ -646,6 +649,16 @@ var tableTiKVRegionPeersCols = []columnInfo{
{"DOWN_SECONDS", mysql.TypeLonglong, 21, 0, 0, nil},
}

var tableTiDBServersInfoCols = []columnInfo{
{"DDL_ID", mysql.TypeVarchar, 64, 0, nil, nil},
{"IP", mysql.TypeVarchar, 64, 0, nil, nil},
{"PORT", mysql.TypeLonglong, 21, 0, nil, nil},
{"STATUS_PORT", mysql.TypeLonglong, 21, 0, nil, nil},
{"LEASE", mysql.TypeVarchar, 64, 0, nil, nil},
{"VERSION", mysql.TypeVarchar, 64, 0, nil, nil},
{"GIT_HASH", mysql.TypeVarchar, 64, 0, nil, nil},
}

func dataForTiKVRegionStatus(ctx sessionctx.Context) (records [][]types.Datum, err error) {
tikvStore, ok := ctx.GetStore().(tikv.Storage)
if !ok {
Expand Down Expand Up @@ -1794,6 +1807,27 @@ func DataForAnalyzeStatus() (rows [][]types.Datum) {
return
}

func dataForServersInfo() ([][]types.Datum, error) {
serversInfo, err := infosync.GetAllServerInfo(context.Background())
if err != nil {
return nil, err
}
rows := make([][]types.Datum, 0, len(serversInfo))
for _, info := range serversInfo {
row := types.MakeDatums(
info.ID, // DDL_ID
info.IP, // IP
int(info.Port), // PORT
int(info.StatusPort), // STATUS_PORT
info.Lease, // LEASE
info.Version, // VERSION
info.GitHash, // GIT_HASH
)
rows = append(rows, row)
}
return rows, nil
}

var tableNameToColumns = map[string][]columnInfo{
tableSchemata: schemataCols,
tableTables: tablesCols,
Expand Down Expand Up @@ -1834,6 +1868,7 @@ var tableNameToColumns = map[string][]columnInfo{
tableAnalyzeStatus: tableAnalyzeStatusCols,
tableTiKVRegionStatus: tableTiKVRegionStatusCols,
tableTiKVRegionPeers: tableTiKVRegionPeersCols,
tableTiDBServersInfo: tableTiDBServersInfoCols,
}

func createInfoSchemaTable(handle *Handle, meta *model.TableInfo) *infoschemaTable {
Expand Down Expand Up @@ -1937,6 +1972,8 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
fullRows, err = dataForTiKVRegionStatus(ctx)
case tableTiKVRegionPeers:
fullRows, err = dataForTikVRegionPeers(ctx)
case tableTiDBServersInfo:
fullRows, err = dataForServersInfo()
}
if err != nil {
return nil, err
Expand Down
22 changes: 22 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package infoschema_test

import (
"context"
"fmt"
"os"
"strconv"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -538,6 +540,26 @@ func (s *testTableSuite) TestForAnalyzeStatus(c *C) {
c.Assert(result.Rows()[1][6], Equals, "finished")
}

func (s *testTableSuite) TestForServersInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)
result := tk.MustQuery("select * from information_schema.TIDB_SERVERS_INFO")
c.Assert(len(result.Rows()), Equals, 1)

serversInfo, err := infosync.GetAllServerInfo(context.Background())
c.Assert(err, IsNil)
c.Assert(len(serversInfo), Equals, 1)

for _, info := range serversInfo {
c.Assert(result.Rows()[0][0], Equals, info.ID)
c.Assert(result.Rows()[0][1], Equals, info.IP)
c.Assert(result.Rows()[0][2], Equals, strconv.FormatInt(int64(info.Port), 10))
c.Assert(result.Rows()[0][3], Equals, strconv.FormatInt(int64(info.StatusPort), 10))
c.Assert(result.Rows()[0][4], Equals, info.Lease)
c.Assert(result.Rows()[0][5], Equals, info.Version)
c.Assert(result.Rows()[0][6], Equals, info.GitHash)
}
}

func (s *testTableSuite) TestColumnStatistics(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustQuery("select * from information_schema.column_statistics").Check(testkit.Rows())
Expand Down
Loading

0 comments on commit 527ff47

Please sign in to comment.