Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support loading queries from statement_summary to run Index Advisor #56160

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,11 +2748,15 @@ func (e *RecommendIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
return fmt.Errorf("unsupported action: %s", e.Action)
}

results, err := indexadvisor.AdviseIndexes(ctx, e.Ctx(), &indexadvisor.Option{
opt := &indexadvisor.Option{
MaxNumIndexes: 3,
MaxIndexWidth: 3,
SpecifiedSQLs: []string{e.SQL},
})
}
if e.SQL != "" {
opt.SpecifiedSQLs = []string{e.SQL}
}

results, err := indexadvisor.AdviseIndexes(ctx, e.Ctx(), opt)

for _, r := range results {
req.AppendString(0, r.Database)
Expand Down
3 changes: 0 additions & 3 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5737,9 +5737,6 @@ func (*PlanBuilder) buildRecommendIndex(v *ast.RecommendIndexStmt) (base.Plan, e

switch v.Action {
case "run":
if v.SQL == "" {
return nil, errors.New("recommend index SQL is empty")
}
schema := newColumnsWithNames(7)
schema.Append(buildColumnWithName("", "database", mysql.TypeVarchar, 64))
schema.Append(buildColumnWithName("", "table", mysql.TypeVarchar, 64))
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/indexadvisor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
deps = [
"//pkg/domain",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/parser",
"//pkg/parser/ast",
Expand All @@ -24,10 +25,12 @@ go_library(
"//pkg/sessionctx",
"//pkg/types",
"//pkg/types/parser_driver",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/parser",
"//pkg/util/set",
"//pkg/util/sqlexec",
"@com_github_google_uuid//:uuid",
"@org_uber_go_zap//:zap",
],
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/indexadvisor/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ func (aa *autoAdmin) heuristicCoveredIndexes(
var bestCoverIndex Index
var bestCoverIndexCost IndexSetCost
for i, coverIndex := range coverIndexSet.ToList() {
if candidateIndexes.Contains(coverIndex) {
continue // the new generated cover-index is duplicated
}
candidateIndexes.Add(coverIndex)
cost, err := evaluateIndexSetCost(querySet, aa.optimizer, candidateIndexes)
if err != nil {
Expand Down
49 changes: 41 additions & 8 deletions pkg/planner/indexadvisor/indexadvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"strings"

Expand Down Expand Up @@ -92,8 +93,6 @@ func AdviseIndexes(ctx context.Context, sctx sessionctx.Context,
func prepareQuerySet(ctx context.Context, sctx sessionctx.Context,
defaultDB string, opt Optimizer, specifiedSQLs []string) (s.Set[Query], error) {
advisorLogger().Info("prepare target query set")
defer advisorLogger().Info("prepare target query set finished")

querySet := s.NewSet[Query]()
if len(specifiedSQLs) > 0 { // if target queries are specified
for _, sql := range specifiedSQLs {
Expand All @@ -104,9 +103,12 @@ func prepareQuerySet(ctx context.Context, sctx sessionctx.Context,
querySet = ctx.Value(TestKey("query_set")).(s.Set[Query])
} else {
var err error
if querySet, err = loadQuerySetFromStmtSummary(sctx, defaultDB); err != nil {
if querySet, err = loadQuerySetFromStmtSummary(sctx); err != nil {
return nil, err
}
if querySet.Size() == 0 {
return nil, errors.New("can't get any queries from statements_summary")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the msg is quite identifying for searching, no stack error is acceptable?

}
}
}

Expand All @@ -124,12 +126,39 @@ func prepareQuerySet(ctx context.Context, sctx sessionctx.Context,
if err != nil {
return nil, err
}
advisorLogger().Info("finish query preparation", zap.Int("num_query", querySet.Size()))
return querySet, nil
}

func loadQuerySetFromStmtSummary(sessionctx.Context, string) (s.Set[Query], error) {
// TODO: load target queries from statement_summary automatically
return nil, errors.New("not implemented yet")
func loadQuerySetFromStmtSummary(sctx sessionctx.Context) (s.Set[Query], error) {
sql := `SELECT any_value(schema_name) as schema_name,
any_value(query_sample_text) as query_sample_text,
sum(cast(exec_count as double)) as exec_count
FROM information_schema.statements_summary_history
WHERE stmt_type = "Select" AND
summary_begin_time >= date_sub(now(), interval 1 day) AND
prepared = 0 AND
upper(schema_name) not in ("MYSQL", "INFORMATION_SCHEMA", "METRICS_SCHEMA", "PERFORMANCE_SCHEMA")
GROUP BY digest
ORDER BY sum(exec_count) DESC
LIMIT 5000`
rows, err := exec(sctx, sql)
if err != nil {
return nil, err
}

querySet := s.NewSet[Query]()
for _, r := range rows {
schemaName := r.GetString(0)
queryText := r.GetString(1)
execCount := r.GetFloat64(2)
querySet.Add(Query{
SchemaName: schemaName,
Text: queryText,
Frequency: int(execCount),
})
}
return querySet, nil
}

func prepareRecommendation(indexes s.Set[Index], queries s.Set[Query], optimizer Optimizer) ([]*Recommendation, error) {
Expand Down Expand Up @@ -181,7 +210,7 @@ func prepareRecommendation(indexes s.Set[Index], queries s.Set[Query], optimizer
workloadCostBefore += costBefore * float64(query.Frequency)
workloadCostAfter += costAfter * float64(query.Frequency)

queryImprovement := (costBefore - costAfter) / costBefore
queryImprovement := round((costBefore-costAfter)/costBefore, 6)
if queryImprovement < 0.0001 {
continue // this query has no benefit
}
Expand All @@ -204,7 +233,7 @@ func prepareRecommendation(indexes s.Set[Index], queries s.Set[Query], optimizer
workloadCostBefore += 0.1
workloadCostAfter += 0.1
}
workloadImpact.WorkloadImprovement = (workloadCostBefore - workloadCostAfter) / workloadCostBefore
workloadImpact.WorkloadImprovement = round((workloadCostBefore-workloadCostAfter)/workloadCostBefore, 6)

if workloadImpact.WorkloadImprovement < 0.000001 || len(indexResult.TopImpactedQueries) == 0 {
continue // this index has no benefit
Expand All @@ -219,6 +248,10 @@ Range Predicate clause(s) in query '%v'`, cols, normText)
return results, nil
}

func round(v float64, n int) float64 {
return math.Round(v*math.Pow(10, float64(n))) / math.Pow(10, float64(n))
}

func gracefulIndexName(opt Optimizer, schema, tableName string, cols []string) string {
indexName := fmt.Sprintf("idx_%v", strings.Join(cols, "_"))
if len(indexName) > 64 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/indexadvisor/indexadvisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func TestIndexAdvisorBasic1(t *testing.T) {
check(nil, t, tk, "test.t.a", option("select * from t where a=1"))
check(nil, t, tk, "test.t.a,test.t.b",
option("select * from t where a=1; select * from t where b=1"))
check(nil, t, tk, "test.t.a,test.t.b",
option("select a from t where a=1; select b from t where b=1"))
}

func TestIndexAdvisorBasic2(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/planner/indexadvisor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package indexadvisor

import (
"context"
"fmt"
"sort"
"strings"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/opcode"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
parser2 "github.com/pingcap/tidb/pkg/util/parser"
s "github.com/pingcap/tidb/pkg/util/set"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -524,3 +529,22 @@ func evaluateIndexSetCost(

return IndexSetCost{workloadCost, totCols, strings.Join(keys, ",")}, nil
}

func exec(sctx sessionctx.Context, sql string) (ret []chunk.Row, err error) {
executor := sctx.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
result, err := executor.ExecuteInternal(ctx, sql)
if err != nil {
return nil, fmt.Errorf("execute %v failed: %v", sql, err)
}
if result == nil {
return nil, nil
}
defer func() {
closeErr := result.Close()
if err == nil {
err = closeErr
}
}()
return sqlexec.DrainRecordSet(context.Background(), result, 64)
}
2 changes: 1 addition & 1 deletion pkg/util/stmtsummary/v2/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"table_test.go",
],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/config",
"//pkg/kv",
Expand Down
20 changes: 20 additions & 0 deletions pkg/util/stmtsummary/v2/tests/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ import (
"github.com/stretchr/testify/require"
)

func TestStmtSummaryIndexAdvisor(t *testing.T) {
setupStmtSummary()
defer closeStmtSummary()
store := testkit.CreateMockStore(t)
tk := newTestKitWithRoot(t, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t (a int, b int, c int)`)

tk.MustQueryToErr(`recommend index run`) // no query

tk.MustQuery(`select a from t where a=1`)
rs := tk.MustQuery(`recommend index run`).Sort().Rows()
require.Equal(t, rs[0][2], "idx_a")

tk.MustQuery(`select b from t where b=1`)
rs = tk.MustQuery(`recommend index run`).Sort().Rows()
require.Equal(t, rs[0][2], "idx_a")
require.Equal(t, rs[1][2], "idx_b")
}

func TestStmtSummaryTable(t *testing.T) {
setupStmtSummary()
defer closeStmtSummary()
Expand Down