Skip to content

Commit

Permalink
in memory allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 committed May 11, 2021
1 parent 5460b5c commit 93a49ca
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 14 deletions.
117 changes: 117 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,3 +1590,120 @@ func (s *testSuite10) TestBinaryLiteralInsertToSet(c *C) {
tk.MustExec("insert into bintest(h) values(0x61)")
tk.MustQuery("select * from bintest").Check(testkit.Rows("a"))
}

var _ = SerialSuites(&testSuite13{&baseTestSuite{}})

type testSuite13 struct {
*baseTestSuite
}

func (s *testSuite13) TestGlobalTempTableAutoInc(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists temp_test")
tk.MustExec("create global temporary table temp_test(id int primary key auto_increment) on commit delete rows")
defer tk.MustExec("drop table if exists temp_test")

// Data is cleared after transaction auto commits.
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select * from temp_test").Check(testkit.Rows())

// Data is not cleared inside a transaction.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select * from temp_test").Check(testkit.Rows("1"))
tk.MustExec("commit")

// AutoID allocator is cleared.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select * from temp_test").Check(testkit.Rows("1"))
// Test whether auto-inc is incremental
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2"))
tk.MustExec("commit")

// multi-value insert
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2"))
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2", "3", "4"))
tk.MustExec("commit")

// rebase
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(10)")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("10", "11"))
tk.MustExec("insert into temp_test(id) values(20), (30)")
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("10", "11", "20", "30", "31", "32"))
tk.MustExec("commit")
}

func (s *testSuite13) TestGlobalTempTableRowID(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists temp_test")
tk.MustExec("create global temporary table temp_test(id int) on commit delete rows")
defer tk.MustExec("drop table if exists temp_test")

// Data is cleared after transaction auto commits.
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows())

// Data is not cleared inside a transaction.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows("1"))
tk.MustExec("commit")

// AutoID allocator is cleared.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows("1"))
// Test whether row id is incremental
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2"))
tk.MustExec("commit")

// multi-value insert
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2"))
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2", "3", "4"))
tk.MustExec("commit")
}

func (s *testSuite13) TestGlobalTempTableParallel(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists temp_test")
tk.MustExec("create global temporary table temp_test(id int primary key auto_increment) on commit delete rows")
defer tk.MustExec("drop table if exists temp_test")

threads := 8
loops := 1
wg := sync.WaitGroup{}
wg.Add(threads)

insertFunc := func() {
defer wg.Done()
newTk := testkit.NewTestKitWithInit(c, s.store)
newTk.MustExec("begin")
for i := 0; i < loops; i++ {
newTk.MustExec("insert temp_test value(0)")
newTk.MustExec("insert temp_test value(0), (0)")
}
maxID := strconv.Itoa(loops * 3)
newTk.MustQuery("select max(id) from temp_test").Check(testkit.Rows(maxID))
newTk.MustExec("commit")
}

for i := 0; i < threads; i++ {
go insertFunc()
}
wg.Wait()
}
24 changes: 23 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/twmb/murmur3"
atomic2 "go.uber.org/atomic"
Expand Down Expand Up @@ -174,7 +176,9 @@ type TransactionContext struct {
// TableDeltaMap lock to prevent potential data race
tdmLock sync.Mutex

GlobalTemporaryTables map[int64]struct{}
// GlobalTemporaryTables is used to store transaction-specific information for global temporary tables.
// It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends.
GlobalTemporaryTables map[int64]tableutil.TempTable
}

// GetShard returns the shard prefix for the next `count` rowids.
Expand Down Expand Up @@ -1445,6 +1449,24 @@ func (s *SessionVars) LazyCheckKeyNotExists() bool {
return s.PresumeKeyNotExists || (s.TxnCtx.IsPessimistic && !s.StmtCtx.DupKeyAsWarning)
}

// GetTemporaryTable returns a TempTable by tableInfo.
func (s *SessionVars) GetTemporaryTable(tblInfo *model.TableInfo) tableutil.TempTable {
if tblInfo.TempTableType == model.TempTableGlobal {
if s.TxnCtx.GlobalTemporaryTables == nil {
s.TxnCtx.GlobalTemporaryTables = make(map[int64]tableutil.TempTable)
}
globalTempTables := s.TxnCtx.GlobalTemporaryTables
globalTempTable, ok := globalTempTables[tblInfo.ID]
if !ok {
globalTempTable = tableutil.TempTableFromMeta(tblInfo)
globalTempTables[tblInfo.ID] = globalTempTable
}
return globalTempTable
}
// TODO: check local temporary tables
return nil
}

// special session variables.
const (
SQLModeVar = "sql_mode"
Expand Down
73 changes: 60 additions & 13 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/generatedexpr"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tipb/go-binlog"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down Expand Up @@ -322,8 +324,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(sctx, meta.ID)
if m := t.Meta(); m.TempTableType == model.TempTableGlobal {
addTemporaryTable(sctx, m)
}

var colIDs, binlogColIDs []int64
Expand Down Expand Up @@ -588,12 +590,9 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column {
return pkCols
}

func addTemporaryTableID(sctx sessionctx.Context, id int64) {
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.GlobalTemporaryTables == nil {
txnCtx.GlobalTemporaryTables = make(map[int64]struct{})
}
txnCtx.GlobalTemporaryTables[id] = struct{}{}
func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) {
tempTable := sctx.GetSessionVars().GetTemporaryTable(tblInfo)
tempTable.SetModified(true)
}

// AddRecord implements table.Table AddRecord interface.
Expand All @@ -608,8 +607,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
fn.ApplyOn(&opt)
}

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(sctx, meta.ID)
if m := t.Meta(); m.TempTableType == model.TempTableGlobal {
addTemporaryTable(sctx, m)
}

var ctx context.Context
Expand Down Expand Up @@ -1010,8 +1009,8 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
return err
}

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(ctx, meta.ID)
if m := t.Meta(); m.TempTableType == model.TempTableGlobal {
addTemporaryTable(ctx, m)
}

// The table has non-public column and this column is doing the operation of "modify/change column".
Expand Down Expand Up @@ -1370,7 +1369,14 @@ func OverflowShardBits(recordID int64, shardRowIDBits uint64, typeBitsLength uin

// Allocators implements table.Table Allocators interface.
func (t *TableCommon) Allocators(ctx sessionctx.Context) autoid.Allocators {
if ctx == nil || ctx.GetSessionVars().IDAllocator == nil {
if ctx == nil {
return t.allocs
} else if ctx.GetSessionVars().IDAllocator == nil {
// Use an independent allocator for global temporary tables.
if t.meta.TempTableType == model.TempTableGlobal {
alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator()
return autoid.Allocators{alloc}
}
return t.allocs
}

Expand Down Expand Up @@ -1498,6 +1504,7 @@ func getDuplicateErrorHandleString(t table.Table, handle kv.Handle, row []types.
func init() {
table.TableFromMeta = TableFromMeta
table.MockTableFromMeta = MockTableFromMeta
tableutil.TempTableFromMeta = TempTableFromMeta
}

// sequenceCommon cache the sequence value.
Expand Down Expand Up @@ -1763,3 +1770,43 @@ func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.Co
}
return tsExec
}

// TemporaryTable is used to store transaction-specific or session-specific information for global / local temporary tables.
// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions.
type TemporaryTable struct {
// Whether it's modified in this transaction.
modified bool
// The stats of this table. So far it's always pseudo stats.
stats *statistics.Table
// The autoID allocator of this table.
autoIDAllocator autoid.Allocator
}

// TempTableFromMeta builds a TempTable from model.TableInfo.
func TempTableFromMeta(tblInfo *model.TableInfo) tableutil.TempTable {
return &TemporaryTable{
modified: false,
stats: statistics.PseudoTable(tblInfo),
autoIDAllocator: autoid.NewAllocatorFromTempTblInfo(tblInfo),
}
}

// GetAutoIDAllocator is implemented from TempTable.GetAutoIDAllocator.
func (t *TemporaryTable) GetAutoIDAllocator() autoid.Allocator {
return t.autoIDAllocator
}

// SetModified is implemented from TempTable.SetModified.
func (t *TemporaryTable) SetModified(modified bool) {
t.modified = modified
}

// GetModified is implemented from TempTable.GetModified.
func (t *TemporaryTable) GetModified() bool {
return t.modified
}

// GetStats is implemented from TempTable.GetStats.
func (t *TemporaryTable) GetStats() interface{} {
return t.stats
}
40 changes: 40 additions & 0 deletions util/tableutil/tableutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tableutil

import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/meta/autoid"
)

// TempTable is used to store transaction-specific or session-specific information for global / local temporary tables.
// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions.
type TempTable interface {
// GetAutoIDAllocator gets the autoID allocator of this table.
GetAutoIDAllocator() autoid.Allocator

// SetModified sets that the table is modified.
SetModified(bool)

// GetModified queries whether the table is modified.
GetModified() bool

// The stats of this table (*statistics.Table).
// Define the return type as interface{} here to avoid cycle imports.
GetStats() interface{}
}

// TempTableFromMeta builds a TempTable from *model.TableInfo.
// Currently, it is assigned to tables.TempTableFromMeta in tidb package's init function.
var TempTableFromMeta func(tblInfo *model.TableInfo) TempTable

0 comments on commit 93a49ca

Please sign in to comment.