diff --git a/config/config.go b/config/config.go index 1993ee1924afd..eaacab84b1c1d 100644 --- a/config/config.go +++ b/config/config.go @@ -36,6 +36,7 @@ type Config struct { Status Status `toml:"status" json:"status"` Performance Performance `toml:"performance" json:"performance"` XProtocol XProtocol `toml:"xprotocol" json:"xprotocol"` + PlanCache PlanCache `toml:"plan-cache" json:"plan-cache"` } // Log is the log section of config. @@ -88,6 +89,13 @@ type XProtocol struct { XSocket string `toml:"xsocket" json:"xsocket"` } +// PlanCache is the PlanCache section of the config. +type PlanCache struct { + Enabled bool `toml:"plan-cache-enabled" json:"plan-cache-enabled"` + Capacity int64 `toml:"plan-cache-capacity" json:"plan-cache-capacity"` + Shards int64 `toml:"plan-cache-shards" json:"plan-cache-shards"` +} + var defaultConf = Config{ Host: "0.0.0.0", Port: 4000, @@ -121,6 +129,11 @@ var defaultConf = Config{ XHost: "0.0.0.0", XPort: 14000, }, + PlanCache: PlanCache{ + Enabled: false, + Capacity: 2560, + Shards: 256, + }, } var globalConf = defaultConf diff --git a/config/config.toml.example b/config/config.toml.example index 88a47eeaf0e54..16c100a8969a0 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -114,3 +114,8 @@ xport = 14000 # The socket file to use for x protocol connection. xsocket = "" + +[plan-cache] +plan-cache-enabled = false +plan-cache-capacity = 2560 +plan-cache-shards = 256 diff --git a/ddl/ddl_db_change_test.go b/ddl/ddl_db_change_test.go index 6a254634faf70..c16d81e4ce76f 100644 --- a/ddl/ddl_db_change_test.go +++ b/ddl/ddl_db_change_test.go @@ -242,8 +242,7 @@ func (t *testExecInfo) parseSQLs(p *parser.Parser) error { return nil } -func (t *testExecInfo) compileSQL(idx int) error { - var err error +func (t *testExecInfo) compileSQL(idx int) (err error) { compiler := executor.Compiler{} for _, info := range t.sqlInfos { c := info.cases[idx] @@ -251,6 +250,7 @@ func (t *testExecInfo) compileSQL(idx int) error { se.PrepareTxnCtx() ctx := se.(context.Context) executor.ResetStmtCtx(ctx, c.rawStmt) + c.stmt, err = compiler.Compile(ctx, c.rawStmt) if err != nil { return errors.Trace(err) diff --git a/executor/adapter.go b/executor/adapter.go index 52a1fa7891e5c..9ddcb42aba72b 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -40,7 +40,7 @@ type processinfoSetter interface { type recordSet struct { fields []*ast.ResultField executor Executor - stmt *statement + stmt *ExecStmt processinfo processinfoSetter err error } @@ -90,23 +90,31 @@ func (a *recordSet) Close() error { return errors.Trace(err) } -// statement implements the ast.Statement interface, it builds a plan.Plan to an ast.Statement. -type statement struct { - is infoschema.InfoSchema // The InfoSchema cannot change during execution, so we hold a reference to it. +// ExecStmt implements the ast.Statement interface, it builds a plan.Plan to an ast.Statement. +type ExecStmt struct { + // InfoSchema stores a reference to the schema information. + InfoSchema infoschema.InfoSchema + // Plan stores a reference to the final physical plan. + Plan plan.Plan + // Expensive represents whether this query is an expensive one. + Expensive bool + // Cacheable represents whether the physical plan can be cached. + Cacheable bool + // Text represents the origin query text. + Text string ctx context.Context - text string - plan plan.Plan startTime time.Time isPreparedStmt bool - expensive bool } -func (a *statement) OriginText() string { - return a.text +// OriginText implements ast.Statement interface. +func (a *ExecStmt) OriginText() string { + return a.Text } -func (a *statement) IsPrepared() bool { +// IsPrepared implements ast.Statement interface. +func (a *ExecStmt) IsPrepared() bool { return a.isPreparedStmt } @@ -114,11 +122,11 @@ func (a *statement) IsPrepared() bool { // This function builds an Executor from a plan. If the Executor doesn't return result, // like the INSERT, UPDATE statements, it executes in this function, if the Executor returns // result, execution is done after this function returns, in the returned ast.RecordSet Next method. -func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) { +func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) { a.startTime = time.Now() a.ctx = ctx - if _, ok := a.plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL { + if _, ok := a.Plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL { oriStats := ctx.GetSessionVars().Systems[variable.TiDBBuildStatsConcurrency] oriScan := ctx.GetSessionVars().DistSQLScanConcurrency oriIndex := ctx.GetSessionVars().IndexSerialScanConcurrency @@ -148,7 +156,7 @@ func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) { if raw, ok := ctx.(processinfoSetter); ok { pi = raw sql := a.OriginText() - if simple, ok := a.plan.(*plan.Simple); ok && simple.Statement != nil { + if simple, ok := a.Plan.(*plan.Simple); ok && simple.Statement != nil { if ss, ok := simple.Statement.(ast.SensitiveStmtNode); ok { // Use SecureText to avoid leak password information. sql = ss.SecureText() @@ -169,7 +177,7 @@ func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) { }, nil } -func (a *statement) handleNoDelayExecutor(e Executor, ctx context.Context, pi processinfoSetter) (ast.RecordSet, error) { +func (a *ExecStmt) handleNoDelayExecutor(e Executor, ctx context.Context, pi processinfoSetter) (ast.RecordSet, error) { // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. switch e.(type) { @@ -203,18 +211,18 @@ func (a *statement) handleNoDelayExecutor(e Executor, ctx context.Context, pi pr } // buildExecutor build a executor from plan, prepared statement may need additional procedure. -func (a *statement) buildExecutor(ctx context.Context) (Executor, error) { +func (a *ExecStmt) buildExecutor(ctx context.Context) (Executor, error) { priority := kv.PriorityNormal - if _, ok := a.plan.(*plan.Execute); !ok { + if _, ok := a.Plan.(*plan.Execute); !ok { // Do not sync transaction for Execute statement, because the real optimization work is done in // "ExecuteExec.Build". var err error - isPointGet := IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.plan) + isPointGet := IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan) if isPointGet { - log.Debugf("[%d][InitTxnWithStartTS] %s", ctx.GetSessionVars().ConnectionID, a.text) + log.Debugf("[%d][InitTxnWithStartTS] %s", ctx.GetSessionVars().ConnectionID, a.Text) err = ctx.InitTxnWithStartTS(math.MaxUint64) } else { - log.Debugf("[%d][ActivePendingTxn] %s", ctx.GetSessionVars().ConnectionID, a.text) + log.Debugf("[%d][ActivePendingTxn] %s", ctx.GetSessionVars().ConnectionID, a.Text) err = ctx.ActivePendingTxn() } if err != nil { @@ -227,17 +235,17 @@ func (a *statement) buildExecutor(ctx context.Context) (Executor, error) { switch { case isPointGet: priority = kv.PriorityHigh - case a.expensive: + case a.Expensive: priority = kv.PriorityLow } } } - if _, ok := a.plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL { + if _, ok := a.Plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL { priority = kv.PriorityLow } - b := newExecutorBuilder(ctx, a.is, priority) - e := b.build(a.plan) + b := newExecutorBuilder(ctx, a.InfoSchema, priority) + e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) } @@ -248,18 +256,18 @@ func (a *statement) buildExecutor(ctx context.Context) (Executor, error) { if err != nil { return nil, errors.Trace(err) } - a.text = executorExec.Stmt.Text() + a.Text = executorExec.Stmt.Text() a.isPreparedStmt = true - a.plan = executorExec.Plan + a.Plan = executorExec.Plan e = executorExec.StmtExec } return e, nil } -func (a *statement) logSlowQuery() { +func (a *ExecStmt) logSlowQuery() { cfg := config.GetGlobalConfig() costTime := time.Since(a.startTime) - sql := a.text + sql := a.Text if len(sql) > cfg.Log.QueryLogMaxLen { sql = sql[:cfg.Log.QueryLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql)) } diff --git a/executor/builder.go b/executor/builder.go index a56c68fb41b7e..deedc1b8c735f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1207,7 +1207,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpRead tableReaderSchema *expression.Schema ) table, _ := b.is.TableByID(is.Table.ID) - len := v.Schema().Len() + length := v.Schema().Len() if v.NeedColHandle { handleCol = v.Schema().TblID2Handle[is.Table.ID][0] } else if !is.OutOfOrder { @@ -1219,13 +1219,18 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpRead RetType: types.NewFieldType(mysql.TypeLonglong), } tableReaderSchema.Append(handleCol) - len = tableReaderSchema.Len() + length = tableReaderSchema.Len() } - for i := 0; i < len; i++ { + for i := 0; i < length; i++ { tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i)) } + ranges := make([]*types.IndexRange, 0, len(is.Ranges)) + for _, rangeInPlan := range is.Ranges { + ranges = append(ranges, rangeInPlan.Clone()) + } + e := &IndexLookUpExecutor{ ctx: b.ctx, schema: v.Schema(), @@ -1235,7 +1240,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpRead index: is.Index, keepOrder: !is.OutOfOrder, desc: is.Desc, - ranges: is.Ranges, + ranges: ranges, tableRequest: tableReq, columns: is.Columns, handleCol: handleCol, diff --git a/executor/compiler.go b/executor/compiler.go index 39040c6843e0d..0ca4a9c525248 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -22,36 +22,33 @@ import ( "github.com/pingcap/tidb/plan" ) -// Compiler compiles an ast.StmtNode to a stmt.Statement. +// Compiler compiles an ast.StmtNode to a physical plan. type Compiler struct { } -// Compile compiles an ast.StmtNode to an ast.Statement. -// After preprocessed and validated, it will be optimized to a plan, -// then wrappped to an adapter *statement as stmt.Statement. -func (c *Compiler) Compile(ctx context.Context, node ast.StmtNode) (ast.Statement, error) { - is := GetInfoSchema(ctx) - if err := plan.Preprocess(node, is, ctx); err != nil { +// Compile compiles an ast.StmtNode to a physical plan. +func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStmt, error) { + infoSchema := GetInfoSchema(ctx) + if err := plan.Preprocess(stmtNode, infoSchema, ctx); err != nil { return nil, errors.Trace(err) } // Validate should be after NameResolve. - if err := plan.Validate(node, false); err != nil { + if err := plan.Validate(stmtNode, false); err != nil { return nil, errors.Trace(err) } - p, err := plan.Optimize(ctx, node, is) + + finalPlan, err := plan.Optimize(ctx, stmtNode, infoSchema) if err != nil { return nil, errors.Trace(err) } - // Don't take restricted SQL into account for metrics. - isExpensive := stmtCount(node, p, ctx.GetSessionVars().InRestrictedSQL) - sa := &statement{ - is: is, - plan: p, - text: node.Text(), - expensive: isExpensive, - } - return sa, nil + return &ExecStmt{ + InfoSchema: infoSchema, + Plan: finalPlan, + Expensive: stmtCount(stmtNode, finalPlan, ctx.GetSessionVars().InRestrictedSQL), + Cacheable: plan.Cacheable(stmtNode), + Text: stmtNode.Text(), + }, nil } // GetInfoSchema gets TxnCtx InfoSchema if snapshot schema is not set, diff --git a/executor/prepared.go b/executor/prepared.go index 5da648277b234..cc23d75c78927 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -318,14 +318,14 @@ func CompileExecutePreparedStmt(ctx context.Context, ID uint32, args ...interfac value := ast.NewValueExpr(val) execPlan.UsingVars[i] = &expression.Constant{Value: value.Datum, RetType: &value.Type} } - sa := &statement{ - is: GetInfoSchema(ctx), - plan: execPlan, + stmt := &ExecStmt{ + InfoSchema: GetInfoSchema(ctx), + Plan: execPlan, } if prepared, ok := ctx.GetSessionVars().PreparedStmts[ID].(*Prepared); ok { - sa.text = prepared.Stmt.Text() + stmt.Text = prepared.Stmt.Text() } - return sa + return stmt } // ResetStmtCtx resets the StmtContext. diff --git a/plan/cache/key.go b/plan/cache/key.go new file mode 100644 index 0000000000000..8ae297744754e --- /dev/null +++ b/plan/cache/key.go @@ -0,0 +1,95 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "time" + + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/hack" +) + +// Key is the interface that every key in LRU Cache should implement. +type Key interface { + Hash() []byte +} + +type sqlCacheKey struct { + user string + host string + database string + sql string + snapshot uint64 + schemaVersion int64 + sqlMode mysql.SQLMode + timezoneOffset int + readOnly bool // stores the current tidb-server status. + + hash []byte +} + +// Hash implements Key interface. +func (key *sqlCacheKey) Hash() []byte { + if key.hash == nil { + var ( + userBytes = hack.Slice(key.user) + hostBytes = hack.Slice(key.host) + dbBytes = hack.Slice(key.database) + sqlBytes = hack.Slice(key.sql) + bufferSize = len(userBytes) + len(hostBytes) + len(dbBytes) + len(sqlBytes) + 8*4 + 1 + ) + + key.hash = make([]byte, 0, bufferSize) + key.hash = append(key.hash, userBytes...) + key.hash = append(key.hash, hostBytes...) + key.hash = append(key.hash, dbBytes...) + key.hash = append(key.hash, sqlBytes...) + key.hash = codec.EncodeInt(key.hash, int64(key.snapshot)) + key.hash = codec.EncodeInt(key.hash, key.schemaVersion) + key.hash = codec.EncodeInt(key.hash, int64(key.sqlMode)) + key.hash = codec.EncodeInt(key.hash, int64(key.timezoneOffset)) + if key.readOnly { + key.hash = append(key.hash, '1') + } else { + key.hash = append(key.hash, '0') + } + } + return key.hash +} + +// NewSQLCacheKey creates a new sqlCacheKey object. +func NewSQLCacheKey(sessionVars *variable.SessionVars, sql string, schemaVersion int64, readOnly bool) Key { + timezoneOffset, user, host := 0, "", "" + if sessionVars.TimeZone != nil { + _, timezoneOffset = time.Now().In(sessionVars.TimeZone).Zone() + } + if sessionVars.User != nil { + user = sessionVars.User.Username + host = sessionVars.User.Hostname + } + + return &sqlCacheKey{ + user: user, + host: host, + database: sessionVars.CurrentDB, + sql: sql, + snapshot: sessionVars.SnapshotTS, + schemaVersion: schemaVersion, + sqlMode: sessionVars.SQLMode, + timezoneOffset: timezoneOffset, + readOnly: readOnly, + } +} diff --git a/plan/cache/sharded_lru.go b/plan/cache/sharded_lru.go new file mode 100644 index 0000000000000..c5c1a9fea8017 --- /dev/null +++ b/plan/cache/sharded_lru.go @@ -0,0 +1,69 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "sync" + + "github.com/spaolacci/murmur3" +) + +var ( + // PlanCacheEnabled stores the global config "plan-cache-enabled". + PlanCacheEnabled bool + // PlanCacheShards stores the global config "plan-cache-shards". + PlanCacheShards int64 + // PlanCacheCapacity stores the global config "plan-cache-capacity". + PlanCacheCapacity int64 + // GlobalPlanCache stores the global plan cache for every session in a tidb-server. + GlobalPlanCache *ShardedLRUCache +) + +// ShardedLRUCache is a sharded LRU Cache, thread safe. +type ShardedLRUCache struct { + shards []*SimpleLRUCache + locks []sync.RWMutex +} + +// NewShardedLRUCache creates a ShardedLRUCache. +func NewShardedLRUCache(capacity, shardCount int64) *ShardedLRUCache { + shardedLRUCache := &ShardedLRUCache{ + shards: make([]*SimpleLRUCache, 0, shardCount), + locks: make([]sync.RWMutex, shardCount), + } + for i := int64(0); i < shardCount; i++ { + shardedLRUCache.shards = append(shardedLRUCache.shards, NewSimpleLRUCache(capacity/shardCount)) + } + return shardedLRUCache +} + +// Get gets a value from a ShardedLRUCache. +func (s *ShardedLRUCache) Get(key Key) (Value, bool) { + id := int(murmur3.Sum32(key.Hash())) % len(s.shards) + + s.locks[id].Lock() + value, ok := s.shards[id].Get(key) + s.locks[id].Unlock() + + return value, ok +} + +// Put puts a (key, value) pair to a ShardedLRUCache. +func (s *ShardedLRUCache) Put(key Key, value Value) { + id := int(murmur3.Sum32(key.Hash())) % len(s.shards) + + s.locks[id].Lock() + s.shards[id].Put(key, value) + s.locks[id].Unlock() +} diff --git a/plan/cache/simple_lru.go b/plan/cache/simple_lru.go new file mode 100644 index 0000000000000..2e07d736aeded --- /dev/null +++ b/plan/cache/simple_lru.go @@ -0,0 +1,82 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "container/list" +) + +// cacheEntry wraps Key and Value. It's the value of list.Element. +type cacheEntry struct { + key Key + value Value +} + +// SimpleLRUCache is a simple least recently used cache, not thread-safe, use it carefully. +type SimpleLRUCache struct { + capacity int64 + size int64 + elements map[string]*list.Element + cache *list.List +} + +// NewSimpleLRUCache creates a SimpleLRUCache object, whose capacity is "capacity". +// NOTE: "capacity" should be a positive value. +func NewSimpleLRUCache(capacity int64) *SimpleLRUCache { + if capacity <= 0 { + panic("capacity of LRU Cache should be positive.") + } + return &SimpleLRUCache{ + capacity: capacity, + size: 0, + elements: make(map[string]*list.Element), + cache: list.New(), + } +} + +// Get tries to find the corresponding value according to the given key. +func (l *SimpleLRUCache) Get(key Key) (value Value, ok bool) { + element, exists := l.elements[string(key.Hash())] + if !exists { + return nil, false + } + l.cache.MoveToFront(element) + return element.Value.(*cacheEntry).value, true +} + +// Put puts the (key, value) pair into the LRU Cache. +func (l *SimpleLRUCache) Put(key Key, value Value) { + hash := string(key.Hash()) + element, exists := l.elements[hash] + if exists { + l.cache.MoveToFront(element) + return + } + + newCacheEntry := &cacheEntry{ + key: key, + value: value, + } + + element = l.cache.PushFront(newCacheEntry) + l.elements[hash] = element + l.size++ + + for l.size > l.capacity { + lru := l.cache.Back() + l.cache.Remove(lru) + delete(l.elements, string(lru.Value.(*cacheEntry).key.Hash())) + l.size-- + } +} diff --git a/plan/cache/simple_lru_test.go b/plan/cache/simple_lru_test.go new file mode 100644 index 0000000000000..cd0d0a7ff810f --- /dev/null +++ b/plan/cache/simple_lru_test.go @@ -0,0 +1,145 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "testing" + + . "github.com/pingcap/check" +) + +func TestT(t *testing.T) { + CustomVerboseFlag = true + TestingT(t) +} + +var _ = Suite(&testLRUCacheSuite{}) + +type testLRUCacheSuite struct { +} + +type mockCacheKey struct { + hash []byte + key int64 +} + +func (mk *mockCacheKey) Hash() []byte { + if mk.hash != nil { + return mk.hash + } + mk.hash = make([]byte, 8) + for i := uint(0); i < 8; i++ { + mk.hash[i] = byte((mk.key >> ((i - 1) * 8)) & 0xff) + } + return mk.hash +} + +func newMockHashKey(key int64) *mockCacheKey { + return &mockCacheKey{ + key: key, + } +} + +func (s *testLRUCacheSuite) TestPut(c *C) { + lru := NewSimpleLRUCache(3) + c.Assert(lru.capacity, Equals, int64(3)) + + keys := make([]*mockCacheKey, 5) + vals := make([]int64, 5) + + for i := 0; i < 5; i++ { + keys[i] = newMockHashKey(int64(i)) + vals[i] = int64(i) + lru.Put(keys[i], vals[i]) + } + c.Assert(lru.size, Equals, lru.capacity) + c.Assert(lru.size, Equals, int64(3)) + + // test for non-existent elements + for i := 0; i < 2; i++ { + hash := string(keys[i].Hash()) + element, exists := lru.elements[hash] + c.Assert(exists, IsFalse) + c.Assert(element, IsNil) + } + + // test for existent elements + root := lru.cache.Front() + c.Assert(root, NotNil) + for i := 4; i >= 2; i-- { + entry, ok := root.Value.(*cacheEntry) + c.Assert(ok, IsTrue) + c.Assert(entry, NotNil) + + // test key + key := entry.key + c.Assert(key, NotNil) + c.Assert(key, Equals, keys[i]) + + hash := string(keys[i].Hash()) + element, exists := lru.elements[hash] + c.Assert(exists, IsTrue) + c.Assert(element, NotNil) + c.Assert(element, Equals, root) + + // test value + value, ok := entry.value.(int64) + c.Assert(ok, IsTrue) + c.Assert(value, Equals, vals[i]) + + root = root.Next() + } + // test for end of double-linked list + c.Assert(root, IsNil) +} + +func (s *testLRUCacheSuite) TestGet(c *C) { + lru := NewSimpleLRUCache(3) + + keys := make([]*mockCacheKey, 5) + vals := make([]int64, 5) + + for i := 0; i < 5; i++ { + keys[i] = newMockHashKey(int64(i)) + vals[i] = int64(i) + lru.Put(keys[i], vals[i]) + } + + // test for non-existent elements + for i := 0; i < 2; i++ { + value, exists := lru.Get(keys[i]) + c.Assert(exists, IsFalse) + c.Assert(value, IsNil) + } + + for i := 2; i < 5; i++ { + value, exists := lru.Get(keys[i]) + c.Assert(exists, IsTrue) + c.Assert(value, NotNil) + c.Assert(value, Equals, vals[i]) + c.Assert(lru.size, Equals, int64(3)) + c.Assert(lru.capacity, Equals, int64(3)) + + root := lru.cache.Front() + c.Assert(root, NotNil) + + entry, ok := root.Value.(*cacheEntry) + c.Assert(ok, IsTrue) + c.Assert(entry.key, Equals, keys[i]) + + value, ok = entry.value.(int64) + c.Assert(ok, IsTrue) + c.Assert(value, Equals, vals[i]) + } +} diff --git a/plan/cache/value.go b/plan/cache/value.go new file mode 100644 index 0000000000000..956f8a8e28c9c --- /dev/null +++ b/plan/cache/value.go @@ -0,0 +1,39 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/plan" +) + +// Value is the interface that every value in LRU Cache should implement. +type Value interface { +} + +// SQLCacheValue stores the cached Statement and StmtNode. +type SQLCacheValue struct { + StmtNode ast.StmtNode + Plan plan.Plan + Expensive bool +} + +// NewSQLCacheValue creates a SQLCacheValue. +func NewSQLCacheValue(ast ast.StmtNode, plan plan.Plan, expensive bool) *SQLCacheValue { + return &SQLCacheValue{ + StmtNode: ast, + Plan: plan, + Expensive: expensive, + } +} diff --git a/plan/cacheable_checker.go b/plan/cacheable_checker.go new file mode 100644 index 0000000000000..73ea81fa4fa86 --- /dev/null +++ b/plan/cacheable_checker.go @@ -0,0 +1,62 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +import ( + "github.com/pingcap/tidb/ast" +) + +// Cacheable checks whether the input ast is cacheable. +func Cacheable(node ast.Node) bool { + if _, isSelect := node.(*ast.SelectStmt); !isSelect { + return false + } + checker := cacheableChecker{ + cacheable: true, + } + node.Accept(&checker) + return checker.cacheable +} + +// cacheableChecker checks whether a query's plan can be cached, querys that: +// 1. have ExistsSubqueryExpr, or +// 2. have VariableExpr +// will not be cached currently. +// NOTE: we can add more rules in the future. +type cacheableChecker struct { + cacheable bool +} + +// Enter implements Visitor interface. +func (checker *cacheableChecker) Enter(in ast.Node) (out ast.Node, skipChildren bool) { + switch node := in.(type) { + case *ast.VariableExpr, *ast.ExistsSubqueryExpr: + checker.cacheable = false + return in, true + case *ast.FuncCallExpr: + if node.FnName.L == ast.Now || node.FnName.L == ast.CurrentTimestamp || + node.FnName.L == ast.UTCTime || node.FnName.L == ast.Curtime || + node.FnName.L == ast.CurrentTime || node.FnName.L == ast.UTCTimestamp || + node.FnName.L == ast.UnixTimestamp { + checker.cacheable = false + return in, true + } + } + return in, false +} + +// Leave implements Visitor interface. +func (checker *cacheableChecker) Leave(in ast.Node) (out ast.Node, ok bool) { + return in, checker.cacheable +} diff --git a/plan/cacheable_checker_test.go b/plan/cacheable_checker_test.go new file mode 100644 index 0000000000000..e3248ba6bd841 --- /dev/null +++ b/plan/cacheable_checker_test.go @@ -0,0 +1,76 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/model" +) + +var _ = Suite(&testCacheableSuite{}) + +type testCacheableSuite struct { +} + +func (s *testCacheableSuite) TestCacheable(c *C) { + // test non-SelectStmt + var stmt ast.Node = &ast.DeleteStmt{} + c.Assert(Cacheable(stmt), IsFalse) + + stmt = &ast.InsertStmt{} + c.Assert(Cacheable(stmt), IsFalse) + + stmt = &ast.UnionStmt{} + c.Assert(Cacheable(stmt), IsFalse) + + stmt = &ast.UpdateStmt{} + c.Assert(Cacheable(stmt), IsFalse) + + stmt = &ast.ShowStmt{} + c.Assert(Cacheable(stmt), IsFalse) + + stmt = &ast.LoadDataStmt{} + c.Assert(Cacheable(stmt), IsFalse) + + // test SelectStmt + whereExpr := &ast.FuncCallExpr{} + stmt = &ast.SelectStmt{ + Where: whereExpr, + } + c.Assert(Cacheable(stmt), IsTrue) + + whereExpr.FnName = model.NewCIStr("now") + c.Assert(Cacheable(stmt), IsFalse) + + whereExpr.FnName = model.NewCIStr("current_timestamp") + c.Assert(Cacheable(stmt), IsFalse) + + whereExpr.FnName = model.NewCIStr("utc_time") + c.Assert(Cacheable(stmt), IsFalse) + + whereExpr.FnName = model.NewCIStr("curtime") + c.Assert(Cacheable(stmt), IsFalse) + + whereExpr.FnName = model.NewCIStr("current_time") + c.Assert(Cacheable(stmt), IsFalse) + + whereExpr.FnName = model.NewCIStr("rand") + c.Assert(Cacheable(stmt), IsTrue) + + stmt = &ast.SelectStmt{ + Where: &ast.ExistsSubqueryExpr{}, + } + c.Assert(Cacheable(stmt), IsFalse) +} diff --git a/session.go b/session.go index 0e06eaee713b1..449c92c6e1a42 100644 --- a/session.go +++ b/session.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/plan/cache" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" @@ -649,57 +650,102 @@ func (s *session) SetProcessInfo(sql string) { s.processInfo.Store(pi) } -func (s *session) Execute(sql string) ([]ast.RecordSet, error) { - s.PrepareTxnCtx() - startTS := time.Now() +func (s *session) executeStatement(connID uint64, stmtNode ast.StmtNode, stmt ast.Statement, recordSets []ast.RecordSet) ([]ast.RecordSet, error) { + s.SetValue(context.QueryString, stmt.OriginText()) - charset, collation := s.sessionVars.GetCharsetInfo() - connID := s.sessionVars.ConnectionID - rawStmts, err := s.ParseSQL(sql, charset, collation) + startTS := time.Now() + recordSet, err := runStmt(s, stmt) if err != nil { - log.Warnf("[%d] parse error:\n%v\n%s", connID, err, sql) + if !kv.ErrKeyExists.Equal(err) { + log.Warnf("[%d] session error:\n%v\n%s", connID, errors.ErrorStack(err), s) + } return nil, errors.Trace(err) } - sessionExecuteParseDuration.Observe(time.Since(startTS).Seconds()) + sessionExecuteRunDuration.Observe(time.Since(startTS).Seconds()) - var rs []ast.RecordSet - for _, rst := range rawStmts { - s.PrepareTxnCtx() - startTS := time.Now() - // Some executions are done in compile stage, so we reset them before compile. - executor.ResetStmtCtx(s, rst) - st, err1 := Compile(s, rst) - if err1 != nil { - log.Warnf("[%d] compile error:\n%v\n%s", connID, err1, sql) - err2 := s.RollbackTxn() - terror.Log(errors.Trace(err2)) - return nil, errors.Trace(err1) + if recordSet != nil { + recordSets = append(recordSets, recordSet) + } + logCrucialStmt(stmtNode, s.sessionVars.User) + return recordSets, nil +} + +func (s *session) Execute(sql string) (recordSets []ast.RecordSet, err error) { + s.PrepareTxnCtx() + var ( + cacheKey cache.Key + cacheValue cache.Value + useCachedPlan = false + connID = s.sessionVars.ConnectionID + ) + + if cache.PlanCacheEnabled { + schemaVersion := sessionctx.GetDomain(s).InfoSchema().SchemaMetaVersion() + readOnly := s.Txn() == nil || s.Txn().IsReadOnly() + + cacheKey = cache.NewSQLCacheKey(s.sessionVars, sql, schemaVersion, readOnly) + cacheValue, useCachedPlan = cache.GlobalPlanCache.Get(cacheKey) + } + + if useCachedPlan { + stmtNode := cacheValue.(*cache.SQLCacheValue).StmtNode + stmt := &executor.ExecStmt{ + InfoSchema: executor.GetInfoSchema(s), + Plan: cacheValue.(*cache.SQLCacheValue).Plan, + Expensive: cacheValue.(*cache.SQLCacheValue).Expensive, + Text: stmtNode.Text(), } - sessionExecuteCompileDuration.Observe(time.Since(startTS).Seconds()) - s.SetValue(context.QueryString, st.OriginText()) + s.PrepareTxnCtx() + executor.ResetStmtCtx(s, stmtNode) + if recordSets, err = s.executeStatement(connID, stmtNode, stmt, recordSets); err != nil { + return nil, errors.Trace(err) + } + } else { + charset, collation := s.sessionVars.GetCharsetInfo() - startTS = time.Now() - r, err := runStmt(s, st) + // Step1: Compile query string to abstract syntax trees(ASTs). + startTS := time.Now() + stmtNodes, err := s.ParseSQL(sql, charset, collation) if err != nil { - if !kv.ErrKeyExists.Equal(err) { - log.Warnf("[%d] session error:\n%v\n%s", connID, errors.ErrorStack(err), s) - } + log.Warnf("[%d] parse error:\n%v\n%s", connID, err, sql) return nil, errors.Trace(err) } - sessionExecuteRunDuration.Observe(time.Since(startTS).Seconds()) - if r != nil { - rs = append(rs, r) - } + sessionExecuteParseDuration.Observe(time.Since(startTS).Seconds()) - logCrucialStmt(rst, s.sessionVars.User) + compiler := executor.Compiler{} + for _, stmtNode := range stmtNodes { + s.PrepareTxnCtx() + + // Step2: Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). + startTS = time.Now() + // Some executions are done in compile stage, so we reset them before compile. + executor.ResetStmtCtx(s, stmtNode) + stmt, err := compiler.Compile(s, stmtNode) + if err != nil { + log.Warnf("[%d] compile error:\n%v\n%s", connID, err, sql) + terror.Log(errors.Trace(s.RollbackTxn())) + return nil, errors.Trace(err) + } + sessionExecuteCompileDuration.Observe(time.Since(startTS).Seconds()) + + // Step3: Cache the physical plan if possible. + if cache.PlanCacheEnabled && stmt.Cacheable && len(stmtNodes) == 1 { + cache.GlobalPlanCache.Put(cacheKey, cache.NewSQLCacheValue(stmtNode, stmt.Plan, stmt.Expensive)) + } + + // Step4: Execute the physical plan. + if recordSets, err = s.executeStatement(connID, stmtNode, stmt, recordSets); err != nil { + return nil, errors.Trace(err) + } + } } - if s.sessionVars.ClientCapability&mysql.ClientMultiResults == 0 && len(rs) > 1 { + if s.sessionVars.ClientCapability&mysql.ClientMultiResults == 0 && len(recordSets) > 1 { // return the first recordset if client doesn't support ClientMultiResults. - rs = rs[:1] + recordSets = recordSets[:1] } - return rs, nil + return recordSets, nil } // PrepareStmt is used for executing prepare statement in binary protocol diff --git a/tidb-server/main.go b/tidb-server/main.go index f55ab19b1135b..a869ccdeacc6b 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/plan/cache" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/sessionctx/binloginfo" @@ -316,6 +317,13 @@ func setGlobalVars() { plan.JoinConcurrency = cfg.Performance.JoinConcurrency plan.AllowCartesianProduct = cfg.Performance.CrossJoin privileges.SkipWithGrant = cfg.Security.SkipGrantTable + + cache.PlanCacheEnabled = cfg.PlanCache.Enabled + if cache.PlanCacheEnabled { + cache.PlanCacheCapacity = cfg.PlanCache.Capacity + cache.PlanCacheShards = cfg.PlanCache.Shards + cache.GlobalPlanCache = cache.NewShardedLRUCache(cache.PlanCacheCapacity, cache.PlanCacheShards) + } } func setupLog() { diff --git a/tidb.go b/tidb.go index c855ad7971304..7e1ddf3c041f6 100644 --- a/tidb.go +++ b/tidb.go @@ -146,13 +146,10 @@ func Parse(ctx context.Context, src string) ([]ast.StmtNode, error) { } // Compile is safe for concurrent use by multiple goroutines. -func Compile(ctx context.Context, rawStmt ast.StmtNode) (ast.Statement, error) { +func Compile(ctx context.Context, stmtNode ast.StmtNode) (ast.Statement, error) { compiler := executor.Compiler{} - st, err := compiler.Compile(ctx, rawStmt) - if err != nil { - return nil, errors.Trace(err) - } - return st, nil + stmt, err := compiler.Compile(ctx, stmtNode) + return stmt, errors.Trace(err) } // runStmt executes the ast.Statement and commit or rollback the current transaction. diff --git a/util/types/range.go b/util/types/range.go index 0676282549d3f..1d9ceca96705f 100644 --- a/util/types/range.go +++ b/util/types/range.go @@ -122,6 +122,23 @@ type IndexRange struct { HighExclude bool // High value is exclusive. } +// Clone clones a IndexRange. +func (ir *IndexRange) Clone() *IndexRange { + newRange := &IndexRange{ + LowVal: make([]Datum, 0, len(ir.LowVal)), + HighVal: make([]Datum, 0, len(ir.HighVal)), + LowExclude: ir.LowExclude, + HighExclude: ir.HighExclude, + } + for i, length := 0, len(ir.LowVal); i < length; i++ { + newRange.LowVal = append(newRange.LowVal, ir.LowVal[i]) + } + for i, length := 0, len(ir.HighVal); i < length; i++ { + newRange.HighVal = append(newRange.HighVal, ir.HighVal[i]) + } + return newRange +} + // IsPoint returns if the index range is a point. func (ir *IndexRange) IsPoint(sc *variable.StatementContext) bool { if len(ir.LowVal) != len(ir.HighVal) {