Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: change data structure for lookup table by ID. #2006

Merged
merged 5 commits into from
Nov 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 109 additions & 70 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package infoschema

import (
"sort"

"github.com/juju/errors"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
Expand All @@ -37,7 +39,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error {
b.applyDropSchema(diff.SchemaID)
return nil
}
roDBInfo, ok := b.is.schemas[diff.SchemaID]
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
if !ok {
return ErrDatabaseNotExists
}
Expand All @@ -54,15 +56,18 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error {
oldTableID = diff.TableID
newTableID = diff.TableID
}
b.copySchemaTables(roDBInfo.Name.L)
b.copySortedTables(oldTableID, newTableID)

// We try to reuse the old allocator, so the cached auto ID can be reused.
var alloc autoid.Allocator
if oldTableID != 0 {
if tableIDIsValid(oldTableID) {
if oldTableID == newTableID {
alloc, _ = b.is.AllocByID(oldTableID)
}
b.applyDropTable(roDBInfo.Name.L, oldTableID)
b.applyDropTable(roDBInfo, oldTableID)
}
if newTableID != 0 {
if tableIDIsValid(newTableID) {
// All types except DropTable.
err := b.applyCreateTable(m, roDBInfo, newTableID, alloc)
if err != nil {
Expand All @@ -74,22 +79,40 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error {
return nil
}

// CopySortedTables copies sortedTables for old table and new table for later modification.
func (b *Builder) copySortedTables(oldTableID, newTableID int64) {
buckets := b.is.sortedTablesBuckets
if tableIDIsValid(oldTableID) {
bucketIdx := tableBucketIdx(oldTableID)
oldSortedTables := buckets[bucketIdx]
newSortedTables := make(sortedTables, len(oldSortedTables))
copy(newSortedTables, oldSortedTables)
buckets[bucketIdx] = newSortedTables
}
if tableIDIsValid(newTableID) && newTableID != oldTableID {
oldSortedTables := buckets[tableBucketIdx(newTableID)]
newSortedTables := make(sortedTables, len(oldSortedTables), len(oldSortedTables)+1)
copy(newSortedTables, oldSortedTables)
buckets[tableBucketIdx(newTableID)] = newSortedTables
}
}

// updateDBInfo clones a new DBInfo from old DBInfo, and update on the new one.
func (b *Builder) updateDBInfo(roDBInfo *model.DBInfo, oldTableID, newTableID int64) {
newDbInfo := new(model.DBInfo)
*newDbInfo = *roDBInfo
newDbInfo := *roDBInfo
newDbInfo.Tables = make([]*model.TableInfo, 0, len(roDBInfo.Tables))
if newTableID != 0 {
if tableIDIsValid(newTableID) {
// All types except DropTable.
newTblInfo := b.is.tables[newTableID].Meta()
newDbInfo.Tables = append(newDbInfo.Tables, newTblInfo)
if newTbl, ok := b.is.TableByID(newTableID); ok {
newDbInfo.Tables = append(newDbInfo.Tables, newTbl.Meta())
}
}
for _, tblInfo := range roDBInfo.Tables {
if tblInfo.ID != oldTableID && tblInfo.ID != newTableID {
newDbInfo.Tables = append(newDbInfo.Tables, tblInfo)
}
}
b.is.schemas[newDbInfo.ID] = newDbInfo
b.is.schemaMap[roDBInfo.Name.L].dbInfo = &newDbInfo
}

func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error {
Expand All @@ -102,20 +125,18 @@ func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error
// full load.
return ErrDatabaseNotExists
}
b.is.schemas[di.ID] = di
b.is.schemaNameToID[di.Name.L] = di.ID
b.is.schemaMap[di.Name.L] = &schemaTables{dbInfo: di, tables: make(map[string]table.Table)}
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) {
di, ok := b.is.schemas[schemaID]
di, ok := b.is.SchemaByID(schemaID)
if !ok {
return
}
delete(b.is.schemas, di.ID)
delete(b.is.schemaNameToID, di.Name.L)
delete(b.is.schemaMap, di.Name.L)
for _, tbl := range di.Tables {
b.applyDropTable(di.Name.L, tbl.ID)
b.applyDropTable(di, tbl.ID)
}
}

Expand All @@ -136,58 +157,57 @@ func (b *Builder) applyCreateTable(m *meta.Meta, roDBInfo *model.DBInfo, tableID
if err != nil {
return errors.Trace(err)
}
b.is.tables[tblInfo.ID] = tbl
tn := makeTableName(roDBInfo.Name.L, tblInfo.Name.L)
b.is.tableNameToID[string(tn)] = tblInfo.ID
tableNames := b.is.schemaMap[roDBInfo.Name.L]
tableNames.tables[tblInfo.Name.L] = tbl
bucketIdx := tableBucketIdx(tableID)
sortedTables := b.is.sortedTablesBuckets[bucketIdx]
sortedTables = append(sortedTables, tbl)
sort.Sort(sortedTables)
b.is.sortedTablesBuckets[bucketIdx] = sortedTables
return nil
}

func (b *Builder) applyDropTable(schemaName string, tableID int64) {
tbl, ok := b.is.tables[tableID]
if !ok {
func (b *Builder) applyDropTable(di *model.DBInfo, tableID int64) {
bucketIdx := tableBucketIdx(tableID)
sortedTables := b.is.sortedTablesBuckets[bucketIdx]
idx := sortedTables.searchTable(tableID)
if idx == -1 {
return
}
tblInfo := tbl.Meta()
delete(b.is.tables, tblInfo.ID)
tn := makeTableName(schemaName, tblInfo.Name.L)
delete(b.is.tableNameToID, string(tn))
if tableNames, ok := b.is.schemaMap[di.Name.L]; ok {
delete(tableNames.tables, sortedTables[idx].Meta().Name.L)
}
// Remove the table in sorted table slice.
b.is.sortedTablesBuckets[bucketIdx] = append(sortedTables[0:idx], sortedTables[idx+1:]...)
}

// InitWithOldInfoSchema initializes an empty new InfoSchema by copies all the data from old InfoSchema.
func (b *Builder) InitWithOldInfoSchema() *Builder {
oldIS := b.handle.Get().(*infoSchema)
b.is.schemaMetaVersion = oldIS.schemaMetaVersion
b.copySchemaNames(oldIS)
b.copyTableNames(oldIS)
b.copySchemas(oldIS)
b.copyTables(oldIS)
b.copySchemasMap(oldIS)
copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets)
return b
}

func (b *Builder) copySchemaNames(oldIS *infoSchema) {
for k, v := range oldIS.schemaNameToID {
b.is.schemaNameToID[k] = v
func (b *Builder) copySchemasMap(oldIS *infoSchema) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/schemasMap/schemaMap/

for k, v := range oldIS.schemaMap {
b.is.schemaMap[k] = v
}
}

func (b *Builder) copyTableNames(oldIS *infoSchema) {
b.is.tableNameToID = make(map[string]int64, len(oldIS.tableNameToID))
for k, v := range oldIS.tableNameToID {
b.is.tableNameToID[k] = v
// When a table in the database has changed, we should create a new schemaTables instance, then do modifications
// on the new one. Because old schemaTables must be read-only.
func (b *Builder) copySchemaTables(dbName string) {
oldSchemaTables := b.is.schemaMap[dbName]
newSchemaTables := &schemaTables{
dbInfo: oldSchemaTables.dbInfo,
tables: make(map[string]table.Table, len(oldSchemaTables.tables)),
}
}

func (b *Builder) copySchemas(oldIS *infoSchema) {
for k, v := range oldIS.schemas {
b.is.schemas[k] = v
}
}

func (b *Builder) copyTables(oldIS *infoSchema) {
b.is.tables = make(map[int64]table.Table, len(oldIS.tables))
for k, v := range oldIS.tables {
b.is.tables[k] = v
for k, v := range oldSchemaTables.tables {
newSchemaTables.tables[k] = v
}
b.is.schemaMap[dbName] = newSchemaTables
}

// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo and schema version.
Expand All @@ -199,47 +219,60 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, schemaVersion int64)
info := b.is
info.schemaMetaVersion = schemaVersion
for _, di := range dbInfos {
info.schemas[di.ID] = di
info.schemaNameToID[di.Name.L] = di.ID
schTbls := &schemaTables{
dbInfo: di,
tables: make(map[string]table.Table, len(di.Tables)),
}
info.schemaMap[di.Name.L] = schTbls
for _, t := range di.Tables {
alloc := autoid.NewAllocator(b.handle.store, di.ID)
var tbl table.Table
tbl, err = table.TableFromMeta(alloc, t)
if err != nil {
return nil, errors.Trace(err)
}
info.tables[t.ID] = tbl
tname := makeTableName(di.Name.L, t.Name.L)
info.tableNameToID[string(tname)] = t.ID
schTbls.tables[t.Name.L] = tbl
sortedTables := info.sortedTablesBuckets[tableBucketIdx(t.ID)]
info.sortedTablesBuckets[tableBucketIdx(t.ID)] = append(sortedTables, tbl)
}
}
for _, v := range info.sortedTablesBuckets {
sort.Sort(v)
}
return b, nil
}

func (b *Builder) initMemorySchemas() error {
info := b.is
info.schemaNameToID[infoSchemaDB.Name.L] = infoSchemaDB.ID
info.schemas[infoSchemaDB.ID] = infoSchemaDB
infoSchemaTblNames := &schemaTables{
dbInfo: infoSchemaDB,
tables: make(map[string]table.Table, len(infoSchemaDB.Tables)),
}

info.schemaMap[infoSchemaDB.Name.L] = infoSchemaTblNames
for _, t := range infoSchemaDB.Tables {
tbl := b.handle.memSchema.nameToTable[t.Name.L]
info.tables[t.ID] = tbl
tname := makeTableName(infoSchemaDB.Name.L, t.Name.L)
info.tableNameToID[string(tname)] = t.ID
infoSchemaTblNames.tables[t.Name.L] = tbl
bucketIdx := tableBucketIdx(t.ID)
info.sortedTablesBuckets[bucketIdx] = append(info.sortedTablesBuckets[bucketIdx], tbl)
}

perfHandle := b.handle.memSchema.perfHandle
psDB := perfHandle.GetDBMeta()
perfSchemaDB := perfHandle.GetDBMeta()
perfSchemaTblNames := &schemaTables{
dbInfo: perfSchemaDB,
tables: make(map[string]table.Table, len(perfSchemaDB.Tables)),
}
info.schemaMap[perfSchemaDB.Name.L] = perfSchemaTblNames

info.schemaNameToID[psDB.Name.L] = psDB.ID
info.schemas[psDB.ID] = psDB
for _, t := range psDB.Tables {
for _, t := range perfSchemaDB.Tables {
tbl, ok := perfHandle.GetTable(t.Name.O)
if !ok {
return ErrTableNotExists.Gen("table `%s` is missing.", t.Name)
}
info.tables[t.ID] = tbl
tname := makeTableName(psDB.Name.L, t.Name.L)
info.tableNameToID[string(tname)] = t.ID
perfSchemaTblNames.tables[t.Name.L] = tbl
bucketIdx := tableBucketIdx(t.ID)
info.sortedTablesBuckets[bucketIdx] = append(info.sortedTablesBuckets[bucketIdx], tbl)
}
return nil
}
Expand All @@ -259,10 +292,16 @@ func NewBuilder(handle *Handle) *Builder {
b := new(Builder)
b.handle = handle
b.is = &infoSchema{
schemaNameToID: map[string]int64{},
tableNameToID: map[string]int64{},
schemas: map[int64]*model.DBInfo{},
tables: map[int64]table.Table{},
schemaMap: map[string]*schemaTables{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
}
return b
}

func tableBucketIdx(tableID int64) int {
return int(tableID % bucketCount)
}

func tableIDIsValid(tableID int64) bool {
return tableID != 0
}
Loading