Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

systable: Adapt with TiFlash system table update #38191

Merged
merged 12 commits into from
Sep 30, 2022
Merged
15 changes: 15 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,13 @@ def go_deps():
sum = "h1:0Vihzu20St42/UDsvZGdNE6jak7oi/UOeMzwMPHkgFY=",
version = "v3.2.0+incompatible",
)
go_repository(
name = "com_github_jarcoal_httpmock",
build_file_proto_mode = "disable",
importpath = "github.com/jarcoal/httpmock",
sum = "h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=",
version = "v1.2.0",
)

go_repository(
name = "com_github_jcmturner_aescts_v2",
Expand Down Expand Up @@ -2326,6 +2333,14 @@ def go_deps():
sum = "h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=",
version = "v1.0.1",
)
go_repository(
name = "com_github_maxatome_go_testdeep",
build_file_proto_mode = "disable",
importpath = "github.com/maxatome/go-testdeep",
sum = "h1:Tgh5efyCYyJFGUYiT0qxBSIDeXw0F5zSoatlou685kk=",
version = "v1.11.0",
)

go_repository(
name = "com_github_mbilski_exhaustivestruct",
build_file_proto_mode = "disable",
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ go_test(
"//util/topsql/state",
"@com_github_golang_protobuf//proto",
"@com_github_gorilla_mux//:mux",
"@com_github_jarcoal_httpmock//:httpmock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_fn//:fn",
Expand Down
89 changes: 53 additions & 36 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3030,14 +3030,17 @@ func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflas
return nil
}

type tiFlashSQLExecuteResponseMetaColumn struct {
Name string `json:"name"`
Type string `json:"type"`
}

type tiFlashSQLExecuteResponse struct {
Meta []tiFlashSQLExecuteResponseMetaColumn `json:"meta"`
Data [][]interface{} `json:"data"`
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
var columnNames []string //nolint: prealloc
for _, c := range e.outputCols {
if c.Name.O == "TIFLASH_INSTANCE" {
continue
}
columnNames = append(columnNames, c.Name.L)
}
maxCount := 1024
targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
var filters []string
Expand All @@ -3047,12 +3050,11 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
if len(tidbTables) > 0 {
filters = append(filters, fmt.Sprintf("tidb_table IN (%s)", strings.ReplaceAll(tidbTables, "\"", "'")))
}
sql := fmt.Sprintf("SELECT %s FROM system.%s", strings.Join(columnNames, ","), targetTable)
sql := fmt.Sprintf("SELECT * FROM system.%s", targetTable)
if len(filters) > 0 {
sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
}
sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
notNumber := "nan"
instanceInfo := e.instanceInfos[e.instanceIdx]
url := instanceInfo.url
req, err := http.NewRequest(http.MethodGet, url, nil)
Expand All @@ -3061,6 +3063,7 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
}
q := req.URL.Query()
q.Add("query", sql)
q.Add("default_format", "JSONCompact")
req.URL.RawQuery = q.Encode()
resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
Expand All @@ -3071,54 +3074,68 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
if err != nil {
return nil, errors.Trace(err)
}
records := strings.Split(string(body), "\n")
rows := make([][]types.Datum, 0, len(records))
for _, record := range records {
if len(record) == 0 {
continue
var result tiFlashSQLExecuteResponse
err = json.Unmarshal(body, &result)
if err != nil {
return nil, errors.Wrapf(err, "Failed to decode JSON from TiFlash")
}

// Map result columns back to our columns. It is possible that some columns cannot be
// recognized and some other columns are missing. This may happen during upgrading.
outputColIndexMap := map[string]int{} // Map from TiDB Column name to Output Column Index
for idx, c := range e.outputCols {
outputColIndexMap[c.Name.L] = idx
}
tiflashColIndexMap := map[int]int{} // Map from TiFlash Column index to Output Column Index
for tiFlashColIdx, col := range result.Meta {
if outputIdx, ok := outputColIndexMap[strings.ToLower(col.Name)]; ok {
tiflashColIndexMap[tiFlashColIdx] = outputIdx
}
fields := strings.Split(record, "\t")
if len(fields) < len(e.outputCols)-1 {
return nil, errors.Errorf("Record from tiflash doesn't match schema %v", fields)
}
outputRows := make([][]types.Datum, 0, len(result.Data))
for _, rowFields := range result.Data {
if len(rowFields) == 0 {
continue
}
row := make([]types.Datum, len(e.outputCols))
for index, column := range e.outputCols {
if column.Name.O == "TIFLASH_INSTANCE" {
outputRow := make([]types.Datum, len(e.outputCols))
for tiFlashColIdx, fieldValue := range rowFields {
outputIdx, ok := tiflashColIndexMap[tiFlashColIdx]
if !ok {
// Discard this field, we don't know which output column is the destination
continue
}
if fieldValue == nil {
continue
}
valStr := fmt.Sprint(fieldValue)
column := e.outputCols[outputIdx]
if column.GetType() == mysql.TypeVarchar {
row[index].SetString(fields[index], mysql.DefaultCollationName)
outputRow[outputIdx].SetString(valStr, mysql.DefaultCollationName)
} else if column.GetType() == mysql.TypeLonglong {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseInt(fields[index], 10, 64)
value, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetInt64(value)
outputRow[outputIdx].SetInt64(value)
} else if column.GetType() == mysql.TypeDouble {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseFloat(fields[index], 64)
value, err := strconv.ParseFloat(valStr, 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetFloat64(value)
outputRow[outputIdx].SetFloat64(value)
} else {
return nil, errors.Errorf("Meet column of unknown type %v", column)
}
}
row[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
rows = append(rows, row)
outputRow[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
outputRows = append(outputRows, outputRow)
}
e.rowIdx += len(rows)
if len(rows) < maxCount {
e.rowIdx += len(outputRows)
if len(outputRows) < maxCount {
e.instanceIdx += 1
e.rowIdx = 0
}
return rows, nil
return outputRows, nil
}

func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is infoschema.InfoSchema) error {
Expand Down
94 changes: 91 additions & 3 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package executor_test

import (
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/jarcoal/httpmock"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
Expand Down Expand Up @@ -649,11 +651,97 @@ func TestSequences(t *testing.T) {
tk.MustQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME , TABLE_TYPE, ENGINE, TABLE_ROWS FROM information_schema.tables WHERE TABLE_TYPE='SEQUENCE' AND TABLE_NAME='seq2'").Check(testkit.Rows("def test seq2 SEQUENCE InnoDB 1"))
}

func TestTiFlashSystemTables(t *testing.T) {
func TestTiFlashSystemTableWithTiFlashV620(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()

instances := []string{
"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
}
fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable(fpName, fpExpr))
defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

httpmock.RegisterResponder("GET", "http://127.0.0.1:7777/config",
httpmock.NewStringResponder(200, `
{
"raftstore-proxy": {},
"engine-store":{
"http_port":8123,
"tcp_port":9000
}
}
`))

data, err := os.ReadFile("testdata/tiflash_v620_dt_segments.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_segments+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

data, err = os.ReadFile("testdata/tiflash_v620_dt_tables.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_tables+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select * from information_schema.TIFLASH_SEGMENTS;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 [-9223372036854775808,9223372036854775807) <nil> 0 0 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 2032 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
"db_1 t_8 mysql db 8 0 1 [-9223372036854775808,9223372036854775807) <nil> 0 0 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 2032 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
"db_2 t_70 test segment 70 0 1 [01,FA) <nil> 30511 50813627 0.6730359542460096 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 3578860 409336 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())

tk.MustQuery("select * from information_schema.TIFLASH_TABLES;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_1 t_8 mysql db 8 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_2 t_70 test segment 70 0 1 102000 169873868 0 0 0 <nil> 0 <nil> <nil> 0 102000 169873868 0 0 0 <nil> <nil> <nil> 1 102000 169873868 43867622 102000 169873868 0 <nil> <nil> <nil> 13 13 7846.153846153846 13067220.615384616 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTiFlashSystemTableWithTiFlashV630(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()

instances := []string{
"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
}
fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable(fpName, fpExpr))
defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

httpmock.RegisterResponder("GET", "http://127.0.0.1:7777/config",
httpmock.NewStringResponder(200, `
{
"raftstore-proxy": {},
"engine-store":{
"http_port":8123,
"tcp_port":9000
}
}
`))

data, err := os.ReadFile("testdata/tiflash_v630_dt_segments.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_segments+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("select * from information_schema.TIFLASH_TABLES;")
tk.MustExec("select * from information_schema.TIFLASH_SEGMENTS;")
tk.MustQuery("select * from information_schema.TIFLASH_SEGMENTS;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 [-9223372036854775808,9223372036854775807) 0 0 0 <nil> 0 0 0 0 2 0 0 0 0 0 2032 3 0 0 1 1 0 0 0 0 127.0.0.1:3933",
"db_2 t_70 test segment 70 436272981189328904 1 [01,FA) 5 102000 169874232 0 0 0 0 0 2 0 0 0 0 0 2032 3 102000 169874232 1 68 102000 169874232 43951837 20 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 1 [01,013130303030393535FF61653666642D6136FF61382D343032382DFF616436312D663736FF3062323736643461FF3600000000000000F8) 2 0 0 <nil> 0 0 1 1 110 0 0 4 4 0 2032 111 0 0 1 70 0 0 0 0 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 113 [013130303030393535FF61653666642D6136FF61382D343032382DFF616436312D663736FF3062323736643461FF3600000000000000F8,013139393938363264FF33346535382D3735FF31382D343661612DFF626235392D636264FF3139333434623736FF3100000000000000F9) 2 10167 16932617 0.4887380741615029 0 0 0 0 114 4969 8275782 2 0 0 63992 112 5198 8656835 1 71 5198 8656835 2254100 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 116 [013139393938363264FF33346535382D3735FF31382D343661612DFF626235392D636264FF3139333434623736FF3100000000000000F9,013330303131383034FF61323537662D6638FF63302D346466622DFF383235632D353361FF3236306338616662FF3400000000000000F8) 3 8 13322 0.5 3 4986 1 0 117 1 1668 4 3 4986 2032 115 4 6668 1 78 4 6668 6799 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 125 [013330303131383034FF61323537662D6638FF63302D346466622DFF383235632D353361FF3236306338616662FF3400000000000000F8,013339393939613861FF30663062332D6537FF32372D346234642DFF396535632D363865FF3336323066383431FF6300000000000000F9) 2 8677 14451079 0.4024432407514118 3492 5816059 3 0 126 0 0 0 0 5816059 2032 124 5185 8635020 1 79 5185 8635020 2247938 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 128 [013339393939613861FF30663062332D6537FF32372D346234642DFF396535632D363865FF3336323066383431FF6300000000000000F9,013730303031636230FF32663330652D3539FF62352D346134302DFF613539312D383930FF6132316364633466FF3200000000000000F8) 0 1 1668 1 0 0 0 0 129 1 1668 5 4 0 2032 127 0 0 1 78 4 6668 6799 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 119 [013730303031636230FF32663330652D3539FF62352D346134302DFF613539312D383930FF6132316364633466FF3200000000000000F8,013739393939386561FF36393566612D3534FF64302D346437642DFF383136612D646335FF6432613130353533FF3200000000000000F9) 2 10303 17158730 0.489372027564787 0 0 0 0 120 5042 8397126 2 0 0 63992 118 5261 8761604 1 77 5261 8761604 2280506 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 122 [013739393939386561FF36393566612D3534FF64302D346437642DFF383136612D646335FF6432613130353533FF3200000000000000F9,FA) 0 1 1663 1 0 0 0 0 123 1 1663 4 3 0 2032 121 0 0 1 78 4 6668 6799 1 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTablesPKType(t *testing.T) {
Expand Down
99 changes: 99 additions & 0 deletions executor/testdata/tiflash_v620_dt_segments.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"meta":
[
{
"name": "database",
"type": "String"
},
{
"name": "table",
"type": "String"
},
{
"name": "tidb_database",
"type": "String"
},
{
"name": "tidb_table",
"type": "String"
},
{
"name": "table_id",
"type": "Int64"
},
{
"name": "is_tombstone",
"type": "UInt64"
},
{
"name": "segment_id",
"type": "UInt64"
},
{
"name": "range",
"type": "String"
},
{
"name": "rows",
"type": "UInt64"
},
{
"name": "size",
"type": "UInt64"
},
{
"name": "delete_ranges",
"type": "UInt64"
},
{
"name": "stable_size_on_disk",
"type": "UInt64"
},
{
"name": "delta_pack_count",
"type": "UInt64"
},
{
"name": "stable_pack_count",
"type": "UInt64"
},
{
"name": "avg_delta_pack_rows",
"type": "Float64"
},
{
"name": "avg_stable_pack_rows",
"type": "Float64"
},
{
"name": "delta_rate",
"type": "Float64"
},
{
"name": "delta_cache_size",
"type": "UInt64"
},
{
"name": "delta_index_size",
"type": "UInt64"
}
],

"data":
[
["db_1", "t_10", "mysql", "tables_priv", "10", "0", "1", "[-9223372036854775808,9223372036854775807)", "0", "0", "0", "0", "0", "0", null, null, null, "0", "2032"],
["db_1", "t_8", "mysql", "db", "8", "0", "1", "[-9223372036854775808,9223372036854775807)", "0", "0", "0", "0", "0", "0", null, null, null, "0", "2032"],
["db_2", "t_70", "test", "segment", "70", "0", "1", "[01,FA)", "30511", "50813627", "12", "4296273", "18", "2", 1140.8333333333333, 4988, 0.6730359542460096, "3578860", "409336"]
],

"rows": 3,

"rows_before_limit_at_least": 3,

"statistics":
{
"elapsed": 0.000075,
"rows_read": 3,
"bytes_read": 8324
}
}
Loading