Skip to content

Commit

Permalink
infoschema: change data structure for lookup table by ID. (#2006)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Nov 18, 2016
1 parent 85f5d30 commit abffed3
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 124 deletions.
179 changes: 109 additions & 70 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package infoschema

import (
"sort"

"github.com/juju/errors"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
Expand All @@ -37,7 +39,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error {
b.applyDropSchema(diff.SchemaID)
return nil
}
roDBInfo, ok := b.is.schemas[diff.SchemaID]
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
if !ok {
return ErrDatabaseNotExists
}
Expand All @@ -54,15 +56,18 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error {
oldTableID = diff.TableID
newTableID = diff.TableID
}
b.copySchemaTables(roDBInfo.Name.L)
b.copySortedTables(oldTableID, newTableID)

// We try to reuse the old allocator, so the cached auto ID can be reused.
var alloc autoid.Allocator
if oldTableID != 0 {
if tableIDIsValid(oldTableID) {
if oldTableID == newTableID {
alloc, _ = b.is.AllocByID(oldTableID)
}
b.applyDropTable(roDBInfo.Name.L, oldTableID)
b.applyDropTable(roDBInfo, oldTableID)
}
if newTableID != 0 {
if tableIDIsValid(newTableID) {
// All types except DropTable.
err := b.applyCreateTable(m, roDBInfo, newTableID, alloc)
if err != nil {
Expand All @@ -74,22 +79,40 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error {
return nil
}

// CopySortedTables copies sortedTables for old table and new table for later modification.
func (b *Builder) copySortedTables(oldTableID, newTableID int64) {
buckets := b.is.sortedTablesBuckets
if tableIDIsValid(oldTableID) {
bucketIdx := tableBucketIdx(oldTableID)
oldSortedTables := buckets[bucketIdx]
newSortedTables := make(sortedTables, len(oldSortedTables))
copy(newSortedTables, oldSortedTables)
buckets[bucketIdx] = newSortedTables
}
if tableIDIsValid(newTableID) && newTableID != oldTableID {
oldSortedTables := buckets[tableBucketIdx(newTableID)]
newSortedTables := make(sortedTables, len(oldSortedTables), len(oldSortedTables)+1)
copy(newSortedTables, oldSortedTables)
buckets[tableBucketIdx(newTableID)] = newSortedTables
}
}

// updateDBInfo clones a new DBInfo from old DBInfo, and update on the new one.
func (b *Builder) updateDBInfo(roDBInfo *model.DBInfo, oldTableID, newTableID int64) {
newDbInfo := new(model.DBInfo)
*newDbInfo = *roDBInfo
newDbInfo := *roDBInfo
newDbInfo.Tables = make([]*model.TableInfo, 0, len(roDBInfo.Tables))
if newTableID != 0 {
if tableIDIsValid(newTableID) {
// All types except DropTable.
newTblInfo := b.is.tables[newTableID].Meta()
newDbInfo.Tables = append(newDbInfo.Tables, newTblInfo)
if newTbl, ok := b.is.TableByID(newTableID); ok {
newDbInfo.Tables = append(newDbInfo.Tables, newTbl.Meta())
}
}
for _, tblInfo := range roDBInfo.Tables {
if tblInfo.ID != oldTableID && tblInfo.ID != newTableID {
newDbInfo.Tables = append(newDbInfo.Tables, tblInfo)
}
}
b.is.schemas[newDbInfo.ID] = newDbInfo
b.is.schemaMap[roDBInfo.Name.L].dbInfo = &newDbInfo
}

func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error {
Expand All @@ -102,20 +125,18 @@ func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error
// full load.
return ErrDatabaseNotExists
}
b.is.schemas[di.ID] = di
b.is.schemaNameToID[di.Name.L] = di.ID
b.is.schemaMap[di.Name.L] = &schemaTables{dbInfo: di, tables: make(map[string]table.Table)}
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) {
di, ok := b.is.schemas[schemaID]
di, ok := b.is.SchemaByID(schemaID)
if !ok {
return
}
delete(b.is.schemas, di.ID)
delete(b.is.schemaNameToID, di.Name.L)
delete(b.is.schemaMap, di.Name.L)
for _, tbl := range di.Tables {
b.applyDropTable(di.Name.L, tbl.ID)
b.applyDropTable(di, tbl.ID)
}
}

Expand All @@ -136,58 +157,57 @@ func (b *Builder) applyCreateTable(m *meta.Meta, roDBInfo *model.DBInfo, tableID
if err != nil {
return errors.Trace(err)
}
b.is.tables[tblInfo.ID] = tbl
tn := makeTableName(roDBInfo.Name.L, tblInfo.Name.L)
b.is.tableNameToID[string(tn)] = tblInfo.ID
tableNames := b.is.schemaMap[roDBInfo.Name.L]
tableNames.tables[tblInfo.Name.L] = tbl
bucketIdx := tableBucketIdx(tableID)
sortedTables := b.is.sortedTablesBuckets[bucketIdx]
sortedTables = append(sortedTables, tbl)
sort.Sort(sortedTables)
b.is.sortedTablesBuckets[bucketIdx] = sortedTables
return nil
}

func (b *Builder) applyDropTable(schemaName string, tableID int64) {
tbl, ok := b.is.tables[tableID]
if !ok {
func (b *Builder) applyDropTable(di *model.DBInfo, tableID int64) {
bucketIdx := tableBucketIdx(tableID)
sortedTables := b.is.sortedTablesBuckets[bucketIdx]
idx := sortedTables.searchTable(tableID)
if idx == -1 {
return
}
tblInfo := tbl.Meta()
delete(b.is.tables, tblInfo.ID)
tn := makeTableName(schemaName, tblInfo.Name.L)
delete(b.is.tableNameToID, string(tn))
if tableNames, ok := b.is.schemaMap[di.Name.L]; ok {
delete(tableNames.tables, sortedTables[idx].Meta().Name.L)
}
// Remove the table in sorted table slice.
b.is.sortedTablesBuckets[bucketIdx] = append(sortedTables[0:idx], sortedTables[idx+1:]...)
}

// InitWithOldInfoSchema initializes an empty new InfoSchema by copies all the data from old InfoSchema.
func (b *Builder) InitWithOldInfoSchema() *Builder {
oldIS := b.handle.Get().(*infoSchema)
b.is.schemaMetaVersion = oldIS.schemaMetaVersion
b.copySchemaNames(oldIS)
b.copyTableNames(oldIS)
b.copySchemas(oldIS)
b.copyTables(oldIS)
b.copySchemasMap(oldIS)
copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets)
return b
}

func (b *Builder) copySchemaNames(oldIS *infoSchema) {
for k, v := range oldIS.schemaNameToID {
b.is.schemaNameToID[k] = v
func (b *Builder) copySchemasMap(oldIS *infoSchema) {
for k, v := range oldIS.schemaMap {
b.is.schemaMap[k] = v
}
}

func (b *Builder) copyTableNames(oldIS *infoSchema) {
b.is.tableNameToID = make(map[string]int64, len(oldIS.tableNameToID))
for k, v := range oldIS.tableNameToID {
b.is.tableNameToID[k] = v
// When a table in the database has changed, we should create a new schemaTables instance, then do modifications
// on the new one. Because old schemaTables must be read-only.
func (b *Builder) copySchemaTables(dbName string) {
oldSchemaTables := b.is.schemaMap[dbName]
newSchemaTables := &schemaTables{
dbInfo: oldSchemaTables.dbInfo,
tables: make(map[string]table.Table, len(oldSchemaTables.tables)),
}
}

func (b *Builder) copySchemas(oldIS *infoSchema) {
for k, v := range oldIS.schemas {
b.is.schemas[k] = v
}
}

func (b *Builder) copyTables(oldIS *infoSchema) {
b.is.tables = make(map[int64]table.Table, len(oldIS.tables))
for k, v := range oldIS.tables {
b.is.tables[k] = v
for k, v := range oldSchemaTables.tables {
newSchemaTables.tables[k] = v
}
b.is.schemaMap[dbName] = newSchemaTables
}

// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo and schema version.
Expand All @@ -199,47 +219,60 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, schemaVersion int64)
info := b.is
info.schemaMetaVersion = schemaVersion
for _, di := range dbInfos {
info.schemas[di.ID] = di
info.schemaNameToID[di.Name.L] = di.ID
schTbls := &schemaTables{
dbInfo: di,
tables: make(map[string]table.Table, len(di.Tables)),
}
info.schemaMap[di.Name.L] = schTbls
for _, t := range di.Tables {
alloc := autoid.NewAllocator(b.handle.store, di.ID)
var tbl table.Table
tbl, err = table.TableFromMeta(alloc, t)
if err != nil {
return nil, errors.Trace(err)
}
info.tables[t.ID] = tbl
tname := makeTableName(di.Name.L, t.Name.L)
info.tableNameToID[string(tname)] = t.ID
schTbls.tables[t.Name.L] = tbl
sortedTables := info.sortedTablesBuckets[tableBucketIdx(t.ID)]
info.sortedTablesBuckets[tableBucketIdx(t.ID)] = append(sortedTables, tbl)
}
}
for _, v := range info.sortedTablesBuckets {
sort.Sort(v)
}
return b, nil
}

func (b *Builder) initMemorySchemas() error {
info := b.is
info.schemaNameToID[infoSchemaDB.Name.L] = infoSchemaDB.ID
info.schemas[infoSchemaDB.ID] = infoSchemaDB
infoSchemaTblNames := &schemaTables{
dbInfo: infoSchemaDB,
tables: make(map[string]table.Table, len(infoSchemaDB.Tables)),
}

info.schemaMap[infoSchemaDB.Name.L] = infoSchemaTblNames
for _, t := range infoSchemaDB.Tables {
tbl := b.handle.memSchema.nameToTable[t.Name.L]
info.tables[t.ID] = tbl
tname := makeTableName(infoSchemaDB.Name.L, t.Name.L)
info.tableNameToID[string(tname)] = t.ID
infoSchemaTblNames.tables[t.Name.L] = tbl
bucketIdx := tableBucketIdx(t.ID)
info.sortedTablesBuckets[bucketIdx] = append(info.sortedTablesBuckets[bucketIdx], tbl)
}

perfHandle := b.handle.memSchema.perfHandle
psDB := perfHandle.GetDBMeta()
perfSchemaDB := perfHandle.GetDBMeta()
perfSchemaTblNames := &schemaTables{
dbInfo: perfSchemaDB,
tables: make(map[string]table.Table, len(perfSchemaDB.Tables)),
}
info.schemaMap[perfSchemaDB.Name.L] = perfSchemaTblNames

info.schemaNameToID[psDB.Name.L] = psDB.ID
info.schemas[psDB.ID] = psDB
for _, t := range psDB.Tables {
for _, t := range perfSchemaDB.Tables {
tbl, ok := perfHandle.GetTable(t.Name.O)
if !ok {
return ErrTableNotExists.Gen("table `%s` is missing.", t.Name)
}
info.tables[t.ID] = tbl
tname := makeTableName(psDB.Name.L, t.Name.L)
info.tableNameToID[string(tname)] = t.ID
perfSchemaTblNames.tables[t.Name.L] = tbl
bucketIdx := tableBucketIdx(t.ID)
info.sortedTablesBuckets[bucketIdx] = append(info.sortedTablesBuckets[bucketIdx], tbl)
}
return nil
}
Expand All @@ -259,10 +292,16 @@ func NewBuilder(handle *Handle) *Builder {
b := new(Builder)
b.handle = handle
b.is = &infoSchema{
schemaNameToID: map[string]int64{},
tableNameToID: map[string]int64{},
schemas: map[int64]*model.DBInfo{},
tables: map[int64]table.Table{},
schemaMap: map[string]*schemaTables{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
}
return b
}

func tableBucketIdx(tableID int64) int {
return int(tableID % bucketCount)
}

func tableIDIsValid(tableID int64) bool {
return tableID != 0
}
Loading

0 comments on commit abffed3

Please sign in to comment.