Skip to content

Commit

Permalink
schemastore: fix truncate table and gc (#344)
Browse files Browse the repository at this point in the history
* add some log

* add truncate table test

* fix gc and and tests
  • Loading branch information
lidezhu authored Sep 27, 2024
1 parent ec39652 commit ef5af30
Show file tree
Hide file tree
Showing 8 changed files with 745 additions and 564 deletions.
16 changes: 10 additions & 6 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func writeSchemaSnapshotAndMeta(
db *pebble.DB,
tiStore kv.Storage,
snapTs uint64,
onlyTableID bool,
needTableInfo bool,
) (map[int64]*BasicDatabaseInfo, map[int64]*BasicTableInfo, error) {
meta := logpuller.GetSnapshotMeta(tiStore, snapTs)
start := time.Now()
Expand All @@ -407,8 +407,12 @@ func writeSchemaSnapshotAndMeta(
log.Fatal("list databases failed", zap.Error(err))
}

databaseMap := make(map[int64]*BasicDatabaseInfo)
tablesInKVSnap := make(map[int64]*BasicTableInfo)
var databaseMap map[int64]*BasicDatabaseInfo
var tablesInKVSnap map[int64]*BasicTableInfo
if needTableInfo {
databaseMap = make(map[int64]*BasicDatabaseInfo)
tablesInKVSnap = make(map[int64]*BasicTableInfo)
}
for _, dbInfo := range dbInfos {
if filter.IsSysSchema(dbInfo.Name.O) {
continue
Expand All @@ -422,15 +426,15 @@ func writeSchemaSnapshotAndMeta(
log.Fatal("get tables failed", zap.Error(err))
}
var tables map[int64]bool
if !onlyTableID {
if needTableInfo {
tables = make(map[int64]bool)
}
for _, rawTable := range rawTables {
if !isTableRawKey(rawTable.Field) {
continue
}
tableID, tableName := writeTableInfoToBatch(batch, snapTs, dbInfo, rawTable.Value)
if !onlyTableID {
if needTableInfo {
tablesInKVSnap[tableID] = &BasicTableInfo{
SchemaID: dbInfo.ID,
Name: tableName,
Expand All @@ -447,7 +451,7 @@ func writeSchemaSnapshotAndMeta(
batch = db.NewBatch()
}
}
if !onlyTableID {
if needTableInfo {
databaseInfo := &BasicDatabaseInfo{
Name: dbInfo.Name.O,
Tables: tables,
Expand Down
67 changes: 43 additions & 24 deletions logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package schemastore

import (
"errors"
"fmt"
"math"
"sort"
"sync"
Expand Down Expand Up @@ -103,7 +104,12 @@ func (v *versionedTableInfoStore) getTableInfo(ts uint64) (*common.TableInfo, er
}

if ts >= v.deleteVersion {
return nil, errors.New("table info deleted")
log.Error("table info deleted",
zap.Any("ts", ts),
zap.Any("tableID", v.tableID),
zap.Any("infos", v.infos),
zap.Any("deleteVersion", v.deleteVersion))
return nil, fmt.Errorf("table info deleted %d", v.tableID)
}

target := sort.Search(len(v.infos), func(i int) bool {
Expand All @@ -120,25 +126,33 @@ func (v *versionedTableInfoStore) getTableInfo(ts uint64) (*common.TableInfo, er
return v.infos[target-1].info, nil
}

// only keep one item with the largest version <= gcTS
func (v *versionedTableInfoStore) gc(gcTs uint64) {
// only keep one item with the largest version <= gcTS, return whether the store should be totally removed
func (v *versionedTableInfoStore) gc(gcTs uint64) bool {
v.mu.Lock()
defer v.mu.Unlock()
if !v.initialized {
return
return false
}
if len(v.infos) == 0 {
log.Fatal("no table info found", zap.Int64("tableID", v.tableID))
}

if gcTs >= v.deleteVersion {
return true
}

target := sort.Search(len(v.infos), func(i int) bool {
return v.infos[i].version > gcTs
})
if target == 0 {
return
return false
}

v.infos = v.infos[target-1:]
if len(v.infos) == 0 {
log.Panic("should not happen")
}
return false
}

func assertEmpty(infos []*tableInfoItem, event PersistedDDLEvent) {
Expand All @@ -148,7 +162,7 @@ func assertEmpty(infos []*tableInfoItem, event PersistedDDLEvent) {
zap.Any("lastVersion", infos[len(infos)-1].version),
zap.Any("lastTableInfoVersion", infos[len(infos)-1].info.Version),
zap.String("query", event.Query),
zap.Int64("tableID", event.TableID),
zap.Int64("tableID", event.CurrentTableID),
zap.Uint64("finishedTs", event.FinishedTs),
zap.Int64("schemaVersion", event.SchemaVersion))
}
Expand Down Expand Up @@ -192,22 +206,16 @@ func (v *versionedTableInfoStore) applyDDL(event PersistedDDLEvent) {
}

// lock must be hold by the caller
// TODO: filter old ddl: there may be some pending ddls which is also written to disk and applied to table info store already
func (v *versionedTableInfoStore) doApplyDDL(event PersistedDDLEvent) {
if len(v.infos) != 0 && uint64(event.FinishedTs) <= v.infos[len(v.infos)-1].version {
log.Panic("ddl job finished ts should be monotonically increasing")
}
if len(v.infos) > 0 {
// TODO: FinishedTS is not enough, need schema version. But currently there should be no duplicate ddl,
// so the following check is useless
if uint64(event.FinishedTs) <= v.infos[len(v.infos)-1].version {
log.Info("ignore job",
zap.Int64("tableID", int64(v.tableID)),
zap.String("query", event.Query),
zap.Uint64("finishedTS", event.FinishedTs),
zap.Any("infosLen", len(v.infos)))
return
}
// TODO: add a unit test
// TODO: whether need add schema version check
if len(v.infos) != 0 && event.FinishedTs <= v.infos[len(v.infos)-1].version {
log.Warn("already applied ddl, ignore it.",
zap.Int64("tableID", v.tableID),
zap.String("query", event.Query),
zap.Uint64("finishedTS", event.FinishedTs),
zap.Int("infosLen", len(v.infos)))
return
}

switch model.ActionType(event.Type) {
Expand All @@ -221,18 +229,29 @@ func (v *versionedTableInfoStore) doApplyDDL(event PersistedDDLEvent) {
break
}
assertEmpty(v.infos, event)
info := common.WrapTableInfo(event.SchemaID, event.SchemaName, event.FinishedTs, event.TableInfo)
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
case model.ActionRenameTable,
model.ActionAddColumn,
model.ActionDropColumn:
assertNonEmpty(v.infos, event)
info := common.WrapTableInfo(event.SchemaID, event.SchemaName, event.FinishedTs, event.TableInfo)
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
case model.ActionDropTable, model.ActionTruncateTable:
case model.ActionDropTable:
v.deleteVersion = uint64(event.FinishedTs)
case model.ActionTruncateTable:
if v.tableID == event.CurrentTableID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
} else {
if v.tableID != event.PrevTableID {
log.Panic("should not happen")
}
v.deleteVersion = uint64(event.FinishedTs)
}
default:
// TODO: idenitify unexpected ddl or specify all expected ddl
}
Expand Down
Loading

0 comments on commit ef5af30

Please sign in to comment.