From b62e3638e0bfe0199ebd5f2af63928c2fe12039c Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 6 Mar 2023 12:31:11 +0800 Subject: [PATCH] domain: revise extract plan package format (#41876) ref pingcap/tidb#41130 --- domain/extract.go | 233 +++++++++++++++++++++++------------ domain/plan_replayer_dump.go | 36 +++--- server/extract.go | 27 ++-- server/extract_test.go | 6 + 4 files changed, 197 insertions(+), 105 deletions(-) diff --git a/domain/extract.go b/domain/extract.go index 896a65af8ed53..8f2e278f2c4a0 100644 --- a/domain/extract.go +++ b/domain/extract.go @@ -18,6 +18,7 @@ import ( "archive/zip" "context" "encoding/base64" + "encoding/json" "fmt" "math/rand" "os" @@ -41,7 +42,7 @@ import ( const ( // ExtractMetaFile indicates meta file for extract - ExtractMetaFile = "meta.txt" + ExtractMetaFile = "extract_meta.txt" ) const ( @@ -147,55 +148,33 @@ func (w *extractWorker) extractPlanTask(ctx context.Context, task *ExtractTask) return w.dumpExtractPlanPackage(p) } -func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord, error) { +func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, error) { w.Lock() defer w.Unlock() exec := w.sctx.(sqlexec.RestrictedSQLExecutor) ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) - rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, TABLE_NAMES, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY WHERE SUMMARY_END_TIME > '%s' OR SUMMARY_BEGIN_TIME < '%s'", + rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN, TABLE_NAMES FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY WHERE SUMMARY_END_TIME > '%s' OR SUMMARY_BEGIN_TIME < '%s'", task.Begin.Format(types.TimeFormat), task.End.Format(types.TimeFormat))) if err != nil { return nil, err } - collectMap := make(map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord, 0) - is := GetDomain(w.sctx).InfoSchema() + collectMap := make(map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, 0) for _, row := range rows { - record := stmtSummaryHistoryRecord{} + record := &stmtSummaryHistoryRecord{} record.stmtType = row.GetString(0) - record.digest = row.GetString(2) - record.planDigest = row.GetString(3) - record.sql = row.GetString(4) - record.binaryPlan = row.GetString(5) + record.digest = row.GetString(1) + record.planDigest = row.GetString(2) + record.sql = row.GetString(3) + record.binaryPlan = row.GetString(4) + tableNames := row.GetString(5) key := stmtSummaryHistoryKey{ digest: record.digest, planDigest: record.planDigest, } record.tables = make([]tableNamePair, 0) - tables := row.GetString(1) - setRecord := true - - for _, t := range strings.Split(tables, ",") { - names := strings.Split(t, ".") - if len(names) != 2 { - setRecord = false - break - } - dbName := names[0] - tblName := names[1] - t, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName)) - if err != nil { - return nil, err - } - record.schemaName = dbName - // skip internal schema record - switch strings.ToLower(record.schemaName) { - case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql": - setRecord = false - } - if !setRecord { - break - } - record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()}) + setRecord, err := w.handleTableNames(tableNames, record) + if err != nil { + return nil, err } if setRecord && checkRecordValid(record) { collectMap[key] = record @@ -204,7 +183,38 @@ func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) ( return collectMap, nil } -func checkRecordValid(r stmtSummaryHistoryRecord) bool { +func (w *extractWorker) handleTableNames(tableNames string, record *stmtSummaryHistoryRecord) (bool, error) { + is := GetDomain(w.sctx).InfoSchema() + for _, t := range strings.Split(tableNames, ",") { + names := strings.Split(t, ".") + if len(names) != 2 { + return false, nil + } + dbName := names[0] + tblName := names[1] + record.schemaName = dbName + // skip internal schema record + switch strings.ToLower(record.schemaName) { + case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql": + return false, nil + } + exists := is.TableExists(model.NewCIStr(dbName), model.NewCIStr(tblName)) + if !exists { + return false, nil + } + t, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName)) + if err != nil { + return false, err + } + record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()}) + } + return true, nil +} + +func checkRecordValid(r *stmtSummaryHistoryRecord) bool { + if r.stmtType != "Select" { + return false + } if r.schemaName == "" { return false } @@ -214,31 +224,65 @@ func checkRecordValid(r stmtSummaryHistoryRecord) bool { return true } -func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord) (*extractPlanPackage, error) { +func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord) (*extractPlanPackage, error) { p := &extractPlanPackage{} - p.sqls = make([]string, 0) - p.plans = make([]string, 0) - p.skippedSQLs = make([]string, 0) + p.records = records p.tables = make(map[tableNamePair]struct{}, 0) for _, record := range records { // skip the sql which has been cut off if strings.Contains(record.sql, "(len:") { - p.skippedSQLs = append(p.skippedSQLs, record.sql) + record.skip = true continue } - p.sqls = append(p.sqls, record.sql) plan, err := w.decodeBinaryPlan(ctx, record.binaryPlan) if err != nil { return nil, err } - p.plans = append(p.plans, plan) + record.plan = plan for _, tbl := range record.tables { p.tables[tbl] = struct{}{} } } + if err := w.handleIsView(ctx, p); err != nil { + return nil, err + } return p, nil } +func (w *extractWorker) handleIsView(ctx context.Context, p *extractPlanPackage) error { + is := GetDomain(w.sctx).InfoSchema() + tne := &tableNameExtractor{ + ctx: ctx, + executor: w.sctx.(sqlexec.RestrictedSQLExecutor), + is: is, + curDB: model.NewCIStr(""), + names: make(map[tableNamePair]struct{}), + cteNames: make(map[string]struct{}), + } + for v := range p.tables { + if v.IsView { + v, err := is.TableByName(model.NewCIStr(v.DBName), model.NewCIStr(v.TableName)) + if err != nil { + return err + } + sql := v.Meta().View.SelectStmt + node, err := tne.executor.ParseWithParams(tne.ctx, sql) + if err != nil { + return err + } + node.Accept(tne) + } + } + if tne.err != nil { + return tne.err + } + r := tne.getTablesAndViews() + for t := range r { + p.tables[t] = struct{}{} + } + return nil +} + func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (string, error) { exec := w.sctx.(sqlexec.RestrictedSQLExecutor) ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) @@ -253,7 +297,11 @@ func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (str // dumpExtractPlanPackage will dump the information about sqls collected in stmt_summary_history // The files will be organized into the following format: /* + |-extract_meta.txt |-meta.txt + |-config.toml + |-variables.toml + |-bindings.sql |-schema | |-schema_meta.txt | |-db1.table1.schema.txt @@ -269,8 +317,12 @@ func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (str | |-.... |-table_tiflash_replica.txt |-sql - | |-sqls.sql - | |-skippedSQLs.sql + | |-digest1.sql + | |-digest2.sql + | |-.... + |-skippedSQLs + | |-digest1.sql + | |-... */ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name string, err error) { f, name, err := GenerateExtractFile() @@ -290,6 +342,14 @@ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name stri } }() + // Dump config + if err = dumpConfig(zw); err != nil { + return "", err + } + // Dump meta + if err = dumpMeta(zw); err != nil { + return "", err + } // dump extract plan task meta if err = dumpExtractMeta(ExtractPlanType, zw); err != nil { return "", err @@ -302,40 +362,34 @@ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name stri if err = dumpTiFlashReplica(w.sctx, zw, p.tables); err != nil { return "", err } - // Dump stats - if err = dumpStats(zw, p.tables, GetDomain(w.sctx)); err != nil { + // Dump variables + if err = dumpVariables(w.sctx, w.sctx.GetSessionVars(), zw); err != nil { return "", err } - // Dump sqls - if err = dumpExtractPlanSQLs(p.sqls, p.skippedSQLs, zw); err != nil { + // Dump global bindings + if err = dumpGlobalBindings(w.sctx, zw); err != nil { return "", err } - // dump plans - if err = dumpExtractPlans(p.plans, zw); err != nil { + // Dump stats + if err = dumpStats(zw, p.tables, GetDomain(w.sctx)); err != nil { return "", err } - return name, nil -} - -func dumpExtractPlanSQLs(sqls, skippedSQLs []string, zw *zip.Writer) error { - if err := dumpTargetSQLs(sqls, "sql/sqls.sql", zw); err != nil { - return err + // Dump sqls and plan + if err = dumpSQLRecords(p.records, zw); err != nil { + return "", err } - return dumpTargetSQLs(skippedSQLs, "sql/skippedSQLs.sql", zw) + return name, nil } -func dumpExtractPlans(plans []string, zw *zip.Writer) error { - zf, err := zw.Create("plans.txt") - if err != nil { - return err - } - for i, plan := range plans { - _, err = zf.Write([]byte(plan)) - if err != nil { - return err - } - if i < len(plans)-1 { - _, err = zf.Write([]byte("\n<--------->\n")) +func dumpSQLRecords(records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, zw *zip.Writer) error { + for key, record := range records { + if record.skip { + err := dumpSQLRecord(record, fmt.Sprintf("skippedSQLs/%v.json", key.digest), zw) + if err != nil { + return err + } + } else { + err := dumpSQLRecord(record, fmt.Sprintf("SQLs/%v.json", key.digest), zw) if err != nil { return err } @@ -344,16 +398,34 @@ func dumpExtractPlans(plans []string, zw *zip.Writer) error { return nil } -func dumpTargetSQLs(sqls []string, path string, zw *zip.Writer) error { +type singleSQLRecord struct { + Schema string `json:"schema"` + Plan string `json:"plan"` + SQL string `json:"sql"` + Digest string `json:"digest"` + BinaryPlan string `json:"binaryPlan"` +} + +// dumpSQLRecord dumps sql records into one file for each record, the format is in json. +func dumpSQLRecord(record *stmtSummaryHistoryRecord, path string, zw *zip.Writer) error { zf, err := zw.Create(path) if err != nil { return err } - for _, sql := range sqls { - _, err = zf.Write([]byte(fmt.Sprintf("%s;\n", sql))) - if err != nil { - return err - } + singleSQLRecord := &singleSQLRecord{ + Schema: record.schemaName, + Plan: record.plan, + SQL: record.sql, + Digest: record.digest, + BinaryPlan: record.binaryPlan, + } + content, err := json.Marshal(singleSQLRecord) + if err != nil { + return err + } + _, err = zf.Write(content) + if err != nil { + return err } return nil } @@ -372,10 +444,8 @@ func dumpExtractMeta(t ExtractType, zw *zip.Writer) error { } type extractPlanPackage struct { - sqls []string - plans []string - skippedSQLs []string - tables map[tableNamePair]struct{} + tables map[tableNamePair]struct{} + records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord } type stmtSummaryHistoryKey struct { @@ -391,6 +461,9 @@ type stmtSummaryHistoryRecord struct { planDigest string sql string binaryPlan string + + plan string + skip bool } // GenerateExtractFile generates extract stmt file diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 1e661a8ccdb74..7da17e24a4b3a 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -92,6 +92,22 @@ type tableNameExtractor struct { err error } +func (tne *tableNameExtractor) getTablesAndViews() map[tableNamePair]struct{} { + r := make(map[tableNamePair]struct{}) + for tablePair := range tne.names { + if tablePair.IsView { + r[tablePair] = struct{}{} + continue + } + // remove cte in table names + _, ok := tne.cteNames[tablePair.TableName] + if !ok { + r[tablePair] = struct{}{} + } + } + return r +} + func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) { if _, ok := in.(*ast.TableName); ok { return in, true @@ -109,14 +125,12 @@ func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) { tne.err = err return in, true } - if t.TableInfo != nil { + if tne.is.TableExists(t.Schema, t.Name) { tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L, IsView: isView} if tp.DBName == "" { tp.DBName = tne.curDB.L } - if _, ok := tne.names[tp]; !ok { - tne.names[tp] = struct{}{} - } + tne.names[tp] = struct{}{} } } else if s, ok := in.(*ast.SelectStmt); ok { if s.With != nil && len(s.With.CTEs) > 0 { @@ -711,19 +725,7 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context, if tableExtractor.err != nil { return nil, tableExtractor.err } - r := make(map[tableNamePair]struct{}) - for tablePair := range tableExtractor.names { - if tablePair.IsView { - r[tablePair] = struct{}{} - continue - } - // remove cte in table names - _, ok := tableExtractor.cteNames[tablePair.TableName] - if !ok { - r[tablePair] = struct{}{} - } - } - return r, nil + return tableExtractor.getTablesAndViews(), nil } func getStatsForTable(do *Domain, pair tableNamePair) (*handle.JSONTable, error) { diff --git a/server/extract.go b/server/extract.go index 30e04c481b4e0..166d12ffbf29d 100644 --- a/server/extract.go +++ b/server/extract.go @@ -127,15 +127,26 @@ func buildExtractTask(req *http.Request) (*domain.ExtractTask, bool, error) { func buildExtractPlanTask(req *http.Request) (*domain.ExtractTask, bool, error) { beginStr := req.URL.Query().Get(pBegin) endStr := req.URL.Query().Get(pEnd) - begin, err := time.Parse(types.TimeFormat, beginStr) - if err != nil { - logutil.BgLogger().Error("extract task begin time failed", zap.Error(err), zap.String("begin", beginStr)) - return nil, false, err + var begin time.Time + var err error + if len(beginStr) < 1 { + begin = time.Now().Add(30 * time.Minute) + } else { + begin, err = time.Parse(types.TimeFormat, beginStr) + if err != nil { + logutil.BgLogger().Error("extract task begin time failed", zap.Error(err), zap.String("begin", beginStr)) + return nil, false, err + } } - end, err := time.Parse(types.TimeFormat, endStr) - if err != nil { - logutil.BgLogger().Error("extract task end time failed", zap.Error(err), zap.String("end", endStr)) - return nil, false, err + var end time.Time + if len(endStr) < 1 { + end = time.Now() + } else { + end, err = time.Parse(types.TimeFormat, endStr) + if err != nil { + logutil.BgLogger().Error("extract task end time failed", zap.Error(err), zap.String("end", endStr)) + return nil, false, err + } } isDumpStr := req.URL.Query().Get(pIsDump) isDump, err := strconv.ParseBool(isDumpStr) diff --git a/server/extract_test.go b/server/extract_test.go index b9dac858cdc4b..a595fbe7168bc 100644 --- a/server/extract_test.go +++ b/server/extract_test.go @@ -78,6 +78,12 @@ func TestExtractHandler(t *testing.T) { require.NoError(t, resp0.Body.Close()) }() require.Equal(t, resp0.StatusCode, http.StatusOK) + resp0, err = client.fetchStatus("/extract_task/dump?type=plan") + require.NoError(t, err) + defer func() { + require.NoError(t, resp0.Body.Close()) + }() + require.Equal(t, resp0.StatusCode, http.StatusOK) } func prepareData4ExtractPlanTask(t *testing.T, client *testServerClient) {