Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix tiflash store information in cluster_info table (#15871) #16024

Merged
merged 10 commits into from
Apr 15, 2020
12 changes: 9 additions & 3 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,15 +960,21 @@ func (e *memtableRetriever) dataForTiDBClusterInfo(ctx sessionctx.Context) error
}
rows := make([][]types.Datum, 0, len(servers))
for _, server := range servers {
startTime := time.Unix(server.StartTimestamp, 0)
startTimeStr := ""
upTimeStr := ""
if server.StartTimestamp > 0 {
startTime := time.Unix(server.StartTimestamp, 0)
startTimeStr = startTime.Format(time.RFC3339)
upTimeStr = time.Since(startTime).String()
}
row := types.MakeDatums(
server.ServerType,
server.Address,
server.StatusAddr,
server.Version,
server.GitHash,
startTime.Format(time.RFC3339),
time.Since(startTime).String(),
startTimeStr,
upTimeStr,
)
rows = append(rows, row)
}
Expand Down
9 changes: 3 additions & 6 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,28 +689,25 @@ func (s *mockStore) TLSConfig() *tls.Config { panic("not implemented") }
func (s *mockStore) StartGCWorker() error { panic("not implemented") }

func (s *testInfoschemaClusterTableSuite) TestTiDBClusterInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)
err := tk.QueryToErr("select * from information_schema.cluster_info")
c.Assert(err, NotNil)
mockAddr := s.mockAddr
store := &mockStore{
s.store.(tikv.Storage),
mockAddr,
}

// information_schema.cluster_info
tk = testkit.NewTestKit(c, store)
tk := testkit.NewTestKit(c, store)
tidbStatusAddr := fmt.Sprintf(":%d", config.GetGlobalConfig().Status.StatusPort)
row := func(cols ...string) string { return strings.Join(cols, " ") }
tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
row("tidb", ":4000", tidbStatusAddr, "5.7.25-TiDB-None", "None"),
row("pd", mockAddr, mockAddr, "4.0.0-alpha", "mock-pd-githash"),
row("tikv", "127.0.0.1:20160", mockAddr, "4.0.0-alpha", "mock-tikv-githash"),
row("tikv", "store1", "", "", ""),
))
startTime := s.startTime.Format(time.RFC3339)
tk.MustQuery("select type, instance, start_time from information_schema.cluster_info where type != 'tidb'").Check(testkit.Rows(
row("pd", mockAddr, startTime),
row("tikv", "127.0.0.1:20160", startTime),
row("tikv", "store1", ""),
))

// information_schema.cluster_config
Expand Down
24 changes: 12 additions & 12 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,24 +1287,24 @@ func GetTiKVServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
if !ok {
return nil, errors.Errorf("%T is not an TiKV store instance", store)
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
pdClient := tikvStore.GetRegionCache().PDClient()
if pdClient == nil {
return nil, errors.New("pd unavailable")
}

storesStat, err := tikvHelper.GetStoresStat()
stores, err := pdClient.GetAllStores(context.Background())
if err != nil {
return nil, errors.Trace(err)
}
var servers []ServerInfo
for _, storeStat := range storesStat.Stores {
for _, store := range stores {
tp := tikv.GetStoreTypeByMeta(store).Name()
servers = append(servers, ServerInfo{
ServerType: "tikv",
Address: storeStat.Store.Address,
StatusAddr: storeStat.Store.StatusAddress,
Version: storeStat.Store.Version,
GitHash: storeStat.Store.GitHash,
StartTimestamp: storeStat.Store.StartTimestamp,
ServerType: tp,
Address: store.Address,
StatusAddr: store.StatusAddress,
Version: store.Version,
GitHash: store.GitHash,
StartTimestamp: store.StartTimestamp,
})
}
return servers, nil
Expand Down
34 changes: 16 additions & 18 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,15 +1332,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
}
addr = store.GetAddress()
s.addr = addr
s.storeType = kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == kv.TiFlash.Name() {
s.storeType = kv.TiFlash
}
break
}
}
s.storeType = GetStoreTypeByMeta(store)
retry:
state = s.getResolveState()
if state != unresolved {
Expand All @@ -1354,6 +1346,20 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
}
}

// GetStoreTypeByMeta gets store type by store meta pb.
func GetStoreTypeByMeta(store *metapb.Store) kv.StoreType {
tp := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == kv.TiFlash.Name() {
tp = kv.TiFlash
}
break
}
}
return tp
}

// reResolve try to resolve addr for store that need check.
func (s *Store) reResolve(c *RegionCache) {
var addr string
Expand All @@ -1377,15 +1383,7 @@ func (s *Store) reResolve(c *RegionCache) {
return
}

storeType := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == kv.TiFlash.Name() {
storeType = kv.TiFlash
}
break
}
}
storeType := GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr {
state := resolved
Expand Down