From 669f39817586eacfa0a76de6248dab73d8839f1a Mon Sep 17 00:00:00 2001 From: qiuyesuifeng Date: Thu, 17 Oct 2019 22:37:39 +0800 Subject: [PATCH 1/2] go.mod: tiny clean up --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index 4a55312d4f837..7ba25318edf94 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( ) replace github.com/pingcap/parser => github.com/qiuyesuifeng/parser v0.0.0-20191013064818-dec0e5f63128 + replace github.com/google/pprof => github.com/lonng/pprof v0.0.0-20191012154247-04dfd648ce8d go 1.13 From 3fb5f652ce1db1d53f1c93acb7064e5d84e4b52f Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 14 Oct 2019 10:58:19 +0800 Subject: [PATCH 2/2] infoschema: add tidb_servers_info memory table (#12580) --- domain/domain.go | 11 ++++---- domain/domain_test.go | 11 ++++---- domain/{util => infosync}/info.go | 40 ++++++++++++++++++++------- executor/executor.go | 4 +-- executor/executor_test.go | 4 +-- infoschema/tables.go | 37 +++++++++++++++++++++++++ infoschema/tables_test.go | 22 +++++++++++++++ server/http_handler.go | 22 +++++++-------- store/tikv/gcworker/gc_worker.go | 4 +-- store/tikv/gcworker/gc_worker_test.go | 14 +++++----- 10 files changed, 124 insertions(+), 45 deletions(-) rename domain/{util => infosync}/info.go (88%) diff --git a/domain/domain.go b/domain/domain.go index ff18d57353cd2..dea7ebc957c11 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -32,7 +32,7 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" - domainutil "github.com/pingcap/tidb/domain/util" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/infoschema/perfschema" "github.com/pingcap/tidb/kv" @@ -63,7 +63,7 @@ type Domain struct { statsHandle unsafe.Pointer statsLease time.Duration ddl ddl.DDL - info *domainutil.InfoSyncer + info *infosync.InfoSyncer m sync.Mutex SchemaValidator SchemaValidator sysSessionPool *sessionPool @@ -291,7 +291,7 @@ func (do *Domain) DDL() ddl.DDL { } // InfoSyncer gets infoSyncer from domain. -func (do *Domain) InfoSyncer() *domainutil.InfoSyncer { +func (do *Domain) InfoSyncer() *infosync.InfoSyncer { return do.info } @@ -421,7 +421,7 @@ func (do *Domain) topNSlowQueryLoop() { func (do *Domain) infoSyncerKeeper() { defer do.wg.Done() defer recoverInDomain("infoSyncerKeeper", false) - ticker := time.NewTicker(time.Second * time.Duration(domainutil.InfoSessionTTL) / 2) + ticker := time.NewTicker(time.Second * time.Duration(infosync.InfoSessionTTL) / 2) defer ticker.Stop() for { select { @@ -661,8 +661,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R if err != nil { return err } - do.info = domainutil.NewInfoSyncer(do.ddl.GetID(), do.etcdClient) - err = do.info.Init(ctx) + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.etcdClient) if err != nil { return err } diff --git a/domain/domain_test.go b/domain/domain_test.go index b0280702fff95..9097ae76f3756 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" @@ -116,21 +117,21 @@ func TestInfo(t *testing.T) { // Test for GetServerInfo and GetServerInfoByID. ddlID := dom.ddl.GetID() - serverInfo := dom.InfoSyncer().GetServerInfo() - info, err := dom.info.GetServerInfoByID(goCtx, ddlID) + serverInfo := infosync.GetServerInfo() + info, err := infosync.GetServerInfoByID(goCtx, ddlID) if err != nil { t.Fatal(err) } if serverInfo.ID != info.ID { t.Fatalf("server self info %v, info %v", serverInfo, info) } - _, err = dom.info.GetServerInfoByID(goCtx, "not_exist_id") + _, err = infosync.GetServerInfoByID(goCtx, "not_exist_id") if err == nil || (err != nil && err.Error() != "[info-syncer] get /tidb/server/info/not_exist_id failed") { t.Fatal(err) } // Test for GetAllServerInfo. - infos, err := dom.info.GetAllServerInfo(goCtx) + infos, err := infosync.GetAllServerInfo(goCtx) if err != nil { t.Fatal(err) } @@ -180,7 +181,7 @@ func TestInfo(t *testing.T) { // Test for RemoveServerInfo. dom.info.RemoveServerInfo() - infos, err = dom.info.GetAllServerInfo(goCtx) + infos, err = infosync.GetAllServerInfo(goCtx) if err != nil || len(infos) != 0 { t.Fatalf("err %v, infos %v", err, infos) } diff --git a/domain/util/info.go b/domain/infosync/info.go similarity index 88% rename from domain/util/info.go rename to domain/infosync/info.go index 20208a7a92d15..244592f08dc91 100644 --- a/domain/util/info.go +++ b/domain/infosync/info.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package util +package infosync import ( "context" @@ -79,18 +79,21 @@ type ServerVersionInfo struct { GitHash string `json:"git_hash"` } -// NewInfoSyncer return new InfoSyncer. It is exported for testing. -func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer { - return &InfoSyncer{ +var globalInfoSyncer *InfoSyncer + +// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing. +func GlobalInfoSyncerInit(ctx context.Context, id string, etcdCli *clientv3.Client) (*InfoSyncer, error) { + globalInfoSyncer = &InfoSyncer{ etcdCli: etcdCli, info: getServerInfo(id), serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), } + return globalInfoSyncer, globalInfoSyncer.init(ctx) } // Init creates a new etcd session and stores server info to etcd. -func (is *InfoSyncer) Init(ctx context.Context) error { +func (is *InfoSyncer) init(ctx context.Context) error { return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) } @@ -100,12 +103,22 @@ func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) { } // GetServerInfo gets self server static information. -func (is *InfoSyncer) GetServerInfo() *ServerInfo { - return is.info +func GetServerInfo() *ServerInfo { + if globalInfoSyncer == nil { + return nil + } + return globalInfoSyncer.info } -// GetServerInfoByID gets server static information from etcd. -func (is *InfoSyncer) GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { +// GetServerInfoByID gets specified server static information from etcd. +func GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { + if globalInfoSyncer == nil { + return nil, errors.New("infoSyncer is not initialized") + } + return globalInfoSyncer.getServerInfoByID(ctx, id) +} + +func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { if is.etcdCli == nil || id == is.info.ID { return is.info, nil } @@ -122,7 +135,14 @@ func (is *InfoSyncer) GetServerInfoByID(ctx context.Context, id string) (*Server } // GetAllServerInfo gets all servers static information from etcd. -func (is *InfoSyncer) GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { +func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { + if globalInfoSyncer == nil { + return nil, errors.New("infoSyncer is not initialized") + } + return globalInfoSyncer.getAllServerInfo(ctx) +} + +func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { allInfo := make(map[string]*ServerInfo) if is.etcdCli == nil { allInfo[is.info.ID] = is.info diff --git a/executor/executor.go b/executor/executor.go index 7483d449c547c..3e5e511fe63b9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -297,8 +298,7 @@ func (e *ShowDDLExec) Next(ctx context.Context, req *chunk.Chunk) error { } } - do := domain.GetDomain(e.ctx) - serverInfo, err := do.InfoSyncer().GetServerInfoByID(ctx, e.ddlOwnerID) + serverInfo, err := infosync.GetServerInfoByID(ctx, e.ddlOwnerID) if err != nil { return err } diff --git a/executor/executor_test.go b/executor/executor_test.go index 7b3df5060e59b..917b982dbfd70 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -299,8 +300,7 @@ func (s *testSuiteP1) TestAdmin(c *C) { // rowOwnerInfos := strings.Split(row.Data[1].GetString(), ",") // ownerInfos := strings.Split(ddlInfo.Owner.String(), ",") // c.Assert(rowOwnerInfos[0], Equals, ownerInfos[0]) - do := domain.GetDomain(tk.Se.(sessionctx.Context)) - serverInfo, err := do.InfoSyncer().GetServerInfoByID(ctx, row.GetString(1)) + serverInfo, err := infosync.GetServerInfoByID(ctx, row.GetString(1)) c.Assert(err, IsNil) c.Assert(row.GetString(2), Equals, serverInfo.IP+":"+ strconv.FormatUint(uint64(serverInfo.Port), 10)) diff --git a/infoschema/tables.go b/infoschema/tables.go index 6f2fc8e098ee5..42b439980acb5 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -14,6 +14,7 @@ package infoschema import ( + "context" "encoding/json" "fmt" "sort" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/privilege" @@ -81,6 +83,7 @@ const ( tableAnalyzeStatus = "ANALYZE_STATUS" tableTiKVRegionStatus = "TIKV_REGION_STATUS" tableTiKVRegionPeers = "TIKV_REGION_PEERS" + tableTiDBServersInfo = "TIDB_SERVERS_INFO" ) type columnInfo struct { @@ -646,6 +649,16 @@ var tableTiKVRegionPeersCols = []columnInfo{ {"DOWN_SECONDS", mysql.TypeLonglong, 21, 0, 0, nil}, } +var tableTiDBServersInfoCols = []columnInfo{ + {"DDL_ID", mysql.TypeVarchar, 64, 0, nil, nil}, + {"IP", mysql.TypeVarchar, 64, 0, nil, nil}, + {"PORT", mysql.TypeLonglong, 21, 0, nil, nil}, + {"STATUS_PORT", mysql.TypeLonglong, 21, 0, nil, nil}, + {"LEASE", mysql.TypeVarchar, 64, 0, nil, nil}, + {"VERSION", mysql.TypeVarchar, 64, 0, nil, nil}, + {"GIT_HASH", mysql.TypeVarchar, 64, 0, nil, nil}, +} + func dataForTiKVRegionStatus(ctx sessionctx.Context) (records [][]types.Datum, err error) { tikvStore, ok := ctx.GetStore().(tikv.Storage) if !ok { @@ -1794,6 +1807,27 @@ func DataForAnalyzeStatus() (rows [][]types.Datum) { return } +func dataForServersInfo() ([][]types.Datum, error) { + serversInfo, err := infosync.GetAllServerInfo(context.Background()) + if err != nil { + return nil, err + } + rows := make([][]types.Datum, 0, len(serversInfo)) + for _, info := range serversInfo { + row := types.MakeDatums( + info.ID, // DDL_ID + info.IP, // IP + int(info.Port), // PORT + int(info.StatusPort), // STATUS_PORT + info.Lease, // LEASE + info.Version, // VERSION + info.GitHash, // GIT_HASH + ) + rows = append(rows, row) + } + return rows, nil +} + var tableNameToColumns = map[string][]columnInfo{ tableSchemata: schemataCols, tableTables: tablesCols, @@ -1834,6 +1868,7 @@ var tableNameToColumns = map[string][]columnInfo{ tableAnalyzeStatus: tableAnalyzeStatusCols, tableTiKVRegionStatus: tableTiKVRegionStatusCols, tableTiKVRegionPeers: tableTiKVRegionPeersCols, + tableTiDBServersInfo: tableTiDBServersInfoCols, } func createInfoSchemaTable(handle *Handle, meta *model.TableInfo) *infoschemaTable { @@ -1937,6 +1972,8 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) fullRows, err = dataForTiKVRegionStatus(ctx) case tableTiKVRegionPeers: fullRows, err = dataForTikVRegionPeers(ctx) + case tableTiDBServersInfo: + fullRows, err = dataForServersInfo() } if err != nil { return nil, err diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 77f6c2a32b311..734b4ad56bd3c 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -14,6 +14,7 @@ package infoschema_test import ( + "context" "fmt" "os" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" @@ -538,6 +540,26 @@ func (s *testTableSuite) TestForAnalyzeStatus(c *C) { c.Assert(result.Rows()[1][6], Equals, "finished") } +func (s *testTableSuite) TestForServersInfo(c *C) { + tk := testkit.NewTestKit(c, s.store) + result := tk.MustQuery("select * from information_schema.TIDB_SERVERS_INFO") + c.Assert(len(result.Rows()), Equals, 1) + + serversInfo, err := infosync.GetAllServerInfo(context.Background()) + c.Assert(err, IsNil) + c.Assert(len(serversInfo), Equals, 1) + + for _, info := range serversInfo { + c.Assert(result.Rows()[0][0], Equals, info.ID) + c.Assert(result.Rows()[0][1], Equals, info.IP) + c.Assert(result.Rows()[0][2], Equals, strconv.FormatInt(int64(info.Port), 10)) + c.Assert(result.Rows()[0][3], Equals, strconv.FormatInt(int64(info.StatusPort), 10)) + c.Assert(result.Rows()[0][4], Equals, info.Lease) + c.Assert(result.Rows()[0][5], Equals, info.Version) + c.Assert(result.Rows()[0][6], Equals, info.GitHash) + } +} + func (s *testTableSuite) TestColumnStatistics(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustQuery("select * from information_schema.column_statistics").Check(testkit.Rows()) diff --git a/server/http_handler.go b/server/http_handler.go index d2cbbf0a5e586..a010f0a89554c 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -37,7 +37,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/domain/util" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -1448,7 +1448,7 @@ func (h *mvccTxnHandler) handleMvccGetByTxn(params map[string]string) (interface // serverInfo is used to report the servers info when do http request. type serverInfo struct { IsOwner bool `json:"is_owner"` - *util.ServerInfo + *infosync.ServerInfo } // ServeHTTP handles request of ddl server info. @@ -1460,18 +1460,18 @@ func (h serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } info := serverInfo{} - info.ServerInfo = do.InfoSyncer().GetServerInfo() + info.ServerInfo = infosync.GetServerInfo() info.IsOwner = do.DDL().OwnerManager().IsOwner() writeData(w, info) } // clusterServerInfo is used to report cluster servers info when do http request. type clusterServerInfo struct { - ServersNum int `json:"servers_num,omitempty"` - OwnerID string `json:"owner_id"` - IsAllServerVersionConsistent bool `json:"is_all_server_version_consistent,omitempty"` - AllServersDiffVersions []util.ServerVersionInfo `json:"all_servers_diff_versions,omitempty"` - AllServersInfo map[string]*util.ServerInfo `json:"all_servers_info,omitempty"` + ServersNum int `json:"servers_num,omitempty"` + OwnerID string `json:"owner_id"` + IsAllServerVersionConsistent bool `json:"is_all_server_version_consistent,omitempty"` + AllServersDiffVersions []infosync.ServerVersionInfo `json:"all_servers_diff_versions,omitempty"` + AllServersInfo map[string]*infosync.ServerInfo `json:"all_servers_info,omitempty"` } // ServeHTTP handles request of all ddl servers info. @@ -1483,7 +1483,7 @@ func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request return } ctx := context.Background() - allServersInfo, err := do.InfoSyncer().GetAllServerInfo(ctx) + allServersInfo, err := infosync.GetAllServerInfo(ctx) if err != nil { writeError(w, errors.New("ddl server information not found")) log.Error(err) @@ -1497,8 +1497,8 @@ func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request log.Error(err) return } - allVersionsMap := map[util.ServerVersionInfo]struct{}{} - allVersions := make([]util.ServerVersionInfo, 0, len(allServersInfo)) + allVersionsMap := map[infosync.ServerVersionInfo]struct{}{} + allVersions := make([]infosync.ServerVersionInfo, 0, len(allServersInfo)) for _, v := range allServersInfo { if _, ok := allVersionsMap[v.ServerVersionInfo]; ok { continue diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 1d4761472b6ba..3689a4e941782 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -30,7 +30,7 @@ import ( "github.com/pingcap/parser/terror" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb/ddl/util" - domainutil "github.com/pingcap/tidb/domain/util" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/privilege" @@ -323,7 +323,7 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { // calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point. func (w *GCWorker) calSafePointByMinStartTS(safePoint time.Time) time.Time { - kvs, err := w.store.GetSafePointKV().GetWithPrefix(domainutil.ServerMinStartTSPath) + kvs, err := w.store.GetSafePointKV().GetWithPrefix(infosync.ServerMinStartTSPath) if err != nil { logutil.BgLogger().Warn("get all minStartTS failed", zap.Error(err)) return safePoint diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 55c630b6ddf9d..ba299517b39c0 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -32,7 +32,7 @@ import ( pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" - domainutil "github.com/pingcap/tidb/domain/util" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" @@ -192,28 +192,28 @@ func (s *testGCWorkerSuite) TestGetOracleTime(c *C) { func (s *testGCWorkerSuite) TestMinStartTS(c *C) { spkv := s.store.GetSafePointKV() - err := spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10)) + err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10)) c.Assert(err, IsNil) now := time.Now() sp := s.gcWorker.calSafePointByMinStartTS(now) c.Assert(sp.Second(), Equals, now.Second()) - err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), "0") + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0") c.Assert(err, IsNil) sp = s.gcWorker.calSafePointByMinStartTS(now) zeroTime := time.Unix(0, oracle.ExtractPhysical(0)*1e6) c.Assert(sp, Equals, zeroTime) - err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), "0") + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0") c.Assert(err, IsNil) - err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "b"), "1") + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), "1") c.Assert(err, IsNil) sp = s.gcWorker.calSafePointByMinStartTS(now) c.Assert(sp, Equals, zeroTime) - err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(variable.GoTimeToTS(now), 10)) c.Assert(err, IsNil) - err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "b"), + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), strconv.FormatUint(variable.GoTimeToTS(now.Add(-20*time.Second)), 10)) c.Assert(err, IsNil) sp = s.gcWorker.calSafePointByMinStartTS(now.Add(-10 * time.Second))