Skip to content

Commit

Permalink
domain: revise extract plan package format (#41876)
Browse files Browse the repository at this point in the history
ref #41130
  • Loading branch information
Yisaer authored Mar 6, 2023
1 parent 9e27518 commit b62e363
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 105 deletions.
233 changes: 153 additions & 80 deletions domain/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"archive/zip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math/rand"
"os"
Expand All @@ -41,7 +42,7 @@ import (

const (
// ExtractMetaFile indicates meta file for extract
ExtractMetaFile = "meta.txt"
ExtractMetaFile = "extract_meta.txt"
)

const (
Expand Down Expand Up @@ -147,55 +148,33 @@ func (w *extractWorker) extractPlanTask(ctx context.Context, task *ExtractTask)
return w.dumpExtractPlanPackage(p)
}

func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord, error) {
func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, error) {
w.Lock()
defer w.Unlock()
exec := w.sctx.(sqlexec.RestrictedSQLExecutor)
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, TABLE_NAMES, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY WHERE SUMMARY_END_TIME > '%s' OR SUMMARY_BEGIN_TIME < '%s'",
rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN, TABLE_NAMES FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY WHERE SUMMARY_END_TIME > '%s' OR SUMMARY_BEGIN_TIME < '%s'",
task.Begin.Format(types.TimeFormat), task.End.Format(types.TimeFormat)))
if err != nil {
return nil, err
}
collectMap := make(map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord, 0)
is := GetDomain(w.sctx).InfoSchema()
collectMap := make(map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, 0)
for _, row := range rows {
record := stmtSummaryHistoryRecord{}
record := &stmtSummaryHistoryRecord{}
record.stmtType = row.GetString(0)
record.digest = row.GetString(2)
record.planDigest = row.GetString(3)
record.sql = row.GetString(4)
record.binaryPlan = row.GetString(5)
record.digest = row.GetString(1)
record.planDigest = row.GetString(2)
record.sql = row.GetString(3)
record.binaryPlan = row.GetString(4)
tableNames := row.GetString(5)
key := stmtSummaryHistoryKey{
digest: record.digest,
planDigest: record.planDigest,
}
record.tables = make([]tableNamePair, 0)
tables := row.GetString(1)
setRecord := true

for _, t := range strings.Split(tables, ",") {
names := strings.Split(t, ".")
if len(names) != 2 {
setRecord = false
break
}
dbName := names[0]
tblName := names[1]
t, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName))
if err != nil {
return nil, err
}
record.schemaName = dbName
// skip internal schema record
switch strings.ToLower(record.schemaName) {
case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql":
setRecord = false
}
if !setRecord {
break
}
record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()})
setRecord, err := w.handleTableNames(tableNames, record)
if err != nil {
return nil, err
}
if setRecord && checkRecordValid(record) {
collectMap[key] = record
Expand All @@ -204,7 +183,38 @@ func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (
return collectMap, nil
}

func checkRecordValid(r stmtSummaryHistoryRecord) bool {
func (w *extractWorker) handleTableNames(tableNames string, record *stmtSummaryHistoryRecord) (bool, error) {
is := GetDomain(w.sctx).InfoSchema()
for _, t := range strings.Split(tableNames, ",") {
names := strings.Split(t, ".")
if len(names) != 2 {
return false, nil
}
dbName := names[0]
tblName := names[1]
record.schemaName = dbName
// skip internal schema record
switch strings.ToLower(record.schemaName) {
case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql":
return false, nil
}
exists := is.TableExists(model.NewCIStr(dbName), model.NewCIStr(tblName))
if !exists {
return false, nil
}
t, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName))
if err != nil {
return false, err
}
record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()})
}
return true, nil
}

func checkRecordValid(r *stmtSummaryHistoryRecord) bool {
if r.stmtType != "Select" {
return false
}
if r.schemaName == "" {
return false
}
Expand All @@ -214,31 +224,65 @@ func checkRecordValid(r stmtSummaryHistoryRecord) bool {
return true
}

func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord) (*extractPlanPackage, error) {
func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord) (*extractPlanPackage, error) {
p := &extractPlanPackage{}
p.sqls = make([]string, 0)
p.plans = make([]string, 0)
p.skippedSQLs = make([]string, 0)
p.records = records
p.tables = make(map[tableNamePair]struct{}, 0)
for _, record := range records {
// skip the sql which has been cut off
if strings.Contains(record.sql, "(len:") {
p.skippedSQLs = append(p.skippedSQLs, record.sql)
record.skip = true
continue
}
p.sqls = append(p.sqls, record.sql)
plan, err := w.decodeBinaryPlan(ctx, record.binaryPlan)
if err != nil {
return nil, err
}
p.plans = append(p.plans, plan)
record.plan = plan
for _, tbl := range record.tables {
p.tables[tbl] = struct{}{}
}
}
if err := w.handleIsView(ctx, p); err != nil {
return nil, err
}
return p, nil
}

func (w *extractWorker) handleIsView(ctx context.Context, p *extractPlanPackage) error {
is := GetDomain(w.sctx).InfoSchema()
tne := &tableNameExtractor{
ctx: ctx,
executor: w.sctx.(sqlexec.RestrictedSQLExecutor),
is: is,
curDB: model.NewCIStr(""),
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
}
for v := range p.tables {
if v.IsView {
v, err := is.TableByName(model.NewCIStr(v.DBName), model.NewCIStr(v.TableName))
if err != nil {
return err
}
sql := v.Meta().View.SelectStmt
node, err := tne.executor.ParseWithParams(tne.ctx, sql)
if err != nil {
return err
}
node.Accept(tne)
}
}
if tne.err != nil {
return tne.err
}
r := tne.getTablesAndViews()
for t := range r {
p.tables[t] = struct{}{}
}
return nil
}

func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (string, error) {
exec := w.sctx.(sqlexec.RestrictedSQLExecutor)
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
Expand All @@ -253,7 +297,11 @@ func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (str
// dumpExtractPlanPackage will dump the information about sqls collected in stmt_summary_history
// The files will be organized into the following format:
/*
|-extract_meta.txt
|-meta.txt
|-config.toml
|-variables.toml
|-bindings.sql
|-schema
| |-schema_meta.txt
| |-db1.table1.schema.txt
Expand All @@ -269,8 +317,12 @@ func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (str
| |-....
|-table_tiflash_replica.txt
|-sql
| |-sqls.sql
| |-skippedSQLs.sql
| |-digest1.sql
| |-digest2.sql
| |-....
|-skippedSQLs
| |-digest1.sql
| |-...
*/
func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name string, err error) {
f, name, err := GenerateExtractFile()
Expand All @@ -290,6 +342,14 @@ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name stri
}
}()

// Dump config
if err = dumpConfig(zw); err != nil {
return "", err
}
// Dump meta
if err = dumpMeta(zw); err != nil {
return "", err
}
// dump extract plan task meta
if err = dumpExtractMeta(ExtractPlanType, zw); err != nil {
return "", err
Expand All @@ -302,40 +362,34 @@ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name stri
if err = dumpTiFlashReplica(w.sctx, zw, p.tables); err != nil {
return "", err
}
// Dump stats
if err = dumpStats(zw, p.tables, GetDomain(w.sctx)); err != nil {
// Dump variables
if err = dumpVariables(w.sctx, w.sctx.GetSessionVars(), zw); err != nil {
return "", err
}
// Dump sqls
if err = dumpExtractPlanSQLs(p.sqls, p.skippedSQLs, zw); err != nil {
// Dump global bindings
if err = dumpGlobalBindings(w.sctx, zw); err != nil {
return "", err
}
// dump plans
if err = dumpExtractPlans(p.plans, zw); err != nil {
// Dump stats
if err = dumpStats(zw, p.tables, GetDomain(w.sctx)); err != nil {
return "", err
}
return name, nil
}

func dumpExtractPlanSQLs(sqls, skippedSQLs []string, zw *zip.Writer) error {
if err := dumpTargetSQLs(sqls, "sql/sqls.sql", zw); err != nil {
return err
// Dump sqls and plan
if err = dumpSQLRecords(p.records, zw); err != nil {
return "", err
}
return dumpTargetSQLs(skippedSQLs, "sql/skippedSQLs.sql", zw)
return name, nil
}

func dumpExtractPlans(plans []string, zw *zip.Writer) error {
zf, err := zw.Create("plans.txt")
if err != nil {
return err
}
for i, plan := range plans {
_, err = zf.Write([]byte(plan))
if err != nil {
return err
}
if i < len(plans)-1 {
_, err = zf.Write([]byte("\n<--------->\n"))
func dumpSQLRecords(records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, zw *zip.Writer) error {
for key, record := range records {
if record.skip {
err := dumpSQLRecord(record, fmt.Sprintf("skippedSQLs/%v.json", key.digest), zw)
if err != nil {
return err
}
} else {
err := dumpSQLRecord(record, fmt.Sprintf("SQLs/%v.json", key.digest), zw)
if err != nil {
return err
}
Expand All @@ -344,16 +398,34 @@ func dumpExtractPlans(plans []string, zw *zip.Writer) error {
return nil
}

func dumpTargetSQLs(sqls []string, path string, zw *zip.Writer) error {
type singleSQLRecord struct {
Schema string `json:"schema"`
Plan string `json:"plan"`
SQL string `json:"sql"`
Digest string `json:"digest"`
BinaryPlan string `json:"binaryPlan"`
}

// dumpSQLRecord dumps sql records into one file for each record, the format is in json.
func dumpSQLRecord(record *stmtSummaryHistoryRecord, path string, zw *zip.Writer) error {
zf, err := zw.Create(path)
if err != nil {
return err
}
for _, sql := range sqls {
_, err = zf.Write([]byte(fmt.Sprintf("%s;\n", sql)))
if err != nil {
return err
}
singleSQLRecord := &singleSQLRecord{
Schema: record.schemaName,
Plan: record.plan,
SQL: record.sql,
Digest: record.digest,
BinaryPlan: record.binaryPlan,
}
content, err := json.Marshal(singleSQLRecord)
if err != nil {
return err
}
_, err = zf.Write(content)
if err != nil {
return err
}
return nil
}
Expand All @@ -372,10 +444,8 @@ func dumpExtractMeta(t ExtractType, zw *zip.Writer) error {
}

type extractPlanPackage struct {
sqls []string
plans []string
skippedSQLs []string
tables map[tableNamePair]struct{}
tables map[tableNamePair]struct{}
records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord
}

type stmtSummaryHistoryKey struct {
Expand All @@ -391,6 +461,9 @@ type stmtSummaryHistoryRecord struct {
planDigest string
sql string
binaryPlan string

plan string
skip bool
}

// GenerateExtractFile generates extract stmt file
Expand Down
Loading

0 comments on commit b62e363

Please sign in to comment.