diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 55c4249803d4d..b0ef87f4ab7e1 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -52,6 +52,7 @@ go_library( "//sessionctx/sessionstates", "//sessionctx/variable", "//statistics/handle", + "//store/helper", "//telemetry", "//ttl/ttlworker", "//types", diff --git a/domain/domain.go b/domain/domain.go index 844f9d0a6b0e6..111552dec542a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -63,6 +63,7 @@ import ( "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/types" @@ -107,7 +108,7 @@ func NewMockDomain() *Domain { do := &Domain{ infoCache: infoschema.NewCache(1), } - do.infoCache.Insert(infoschema.MockInfoSchema(nil), 1) + do.infoCache.Insert(infoschema.MockInfoSchema(nil), 0) return do } @@ -202,6 +203,12 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i if err != nil { return nil, false, 0, nil, err } + // fetch the commit timestamp of the schema diff + schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion) + if err != nil { + logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion)) + schemaTs = 0 + } if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil { return is, true, 0, nil, nil @@ -223,7 +230,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) if err == nil { - do.infoCache.Insert(is, startTS) + do.infoCache.Insert(is, uint64(schemaTs)) logutil.BgLogger().Info("diff load InfoSchema success", zap.Int64("currentSchemaVersion", currentSchemaVersion), zap.Int64("neededSchemaVersion", neededSchemaVersion), @@ -262,10 +269,27 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i zap.Duration("start time", time.Since(startTime))) is := newISBuilder.Build() - do.infoCache.Insert(is, startTS) + do.infoCache.Insert(is, uint64(schemaTs)) return is, false, currentSchemaVersion, nil, nil } +// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff +func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) { + tikvStore, ok := do.Store().(helper.Storage) + if ok { + helper := helper.NewHelper(tikvStore) + data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version)) + if err != nil { + return 0, err + } + if len(data.Info.Writes) == 0 { + return 0, errors.Errorf("There is no Write MVCC info for the schema version") + } + return int64(data.Info.Writes[0].CommitTs), nil + } + return 0, errors.Errorf("cannot get store from domain") +} + func (do *Domain) sysFacHack() (pools.Resource, error) { // TODO: Here we create new sessions with sysFac in DDL, // which will use `do` as Domain instead of call `domap.Get`. diff --git a/infoschema/cache.go b/infoschema/cache.go index 24c16f88b7a32..da351c81bf2d2 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -19,6 +19,8 @@ import ( "sync" infoschema_metrics "github.com/pingcap/tidb/infoschema/metrics" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) // InfoCache handles information schema, including getting and setting. @@ -26,22 +28,27 @@ import ( // It only promised to cache the infoschema, if it is newer than all the cached. type InfoCache struct { mu sync.RWMutex - // cache is sorted by SchemaVersion in descending order - cache []InfoSchema - // record SnapshotTS of the latest schema Insert. - maxUpdatedSnapshotTS uint64 + // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order + cache []schemaAndTimestamp +} + +type schemaAndTimestamp struct { + infoschema InfoSchema + timestamp int64 } // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { - return &InfoCache{cache: make([]InfoSchema, 0, capacity)} + return &InfoCache{ + cache: make([]schemaAndTimestamp, 0, capacity), + } } // Reset resets the cache. func (h *InfoCache) Reset(capacity int) { h.mu.Lock() defer h.mu.Unlock() - h.cache = make([]InfoSchema, 0, capacity) + h.cache = make([]schemaAndTimestamp, 0, capacity) } // GetLatest gets the newest information schema. @@ -51,18 +58,44 @@ func (h *InfoCache) GetLatest() InfoSchema { infoschema_metrics.GetLatestCounter.Inc() if len(h.cache) > 0 { infoschema_metrics.HitLatestCounter.Inc() - return h.cache[0] + return h.cache[0].infoschema } return nil } +func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { + logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts)) + // search one by one instead of binary search, because the timestamp of a schema could be 0 + // this is ok because the size of h.cache is small (currently set to 16) + // moreover, the most likely hit element in the array is the first one in steady mode + // thus it may have better performance than binary search + for i, is := range h.cache { + if is.timestamp == 0 || (i > 0 && h.cache[i-1].infoschema.SchemaMetaVersion() != is.infoschema.SchemaMetaVersion()+1) { + // the schema version doesn't have a timestamp or there is a gap in the schema cache + // ignore all the schema cache equals or less than this version in search by timestamp + break + } + if ts >= uint64(is.timestamp) { + // found the largest version before the given ts + return is.infoschema, true + } + } + + logutil.BgLogger().Debug("SCHEMA CACHE no schema found") + return nil, false +} + // GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() + return h.getByVersionNoLock(version) +} + +func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema { infoschema_metrics.GetVersionCounter.Inc() i := sort.Search(len(h.cache), func(i int) bool { - return h.cache[i].SchemaMetaVersion() <= version + return h.cache[i].infoschema.SchemaMetaVersion() <= version }) // `GetByVersion` is allowed to load the latest schema that is less than argument `version`. @@ -83,26 +116,24 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema { // } // ``` - if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) { + if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) { infoschema_metrics.HitVersionCounter.Inc() - return h.cache[i] + return h.cache[i].infoschema } return nil } // GetBySnapshotTS gets the information schema based on snapshotTS. -// If the snapshotTS is new than maxUpdatedSnapshotTS, that's mean it can directly use -// the latest infoschema. otherwise, will return nil. +// It searches the schema cache and find the schema with max schema ts that equals or smaller than given snapshot ts +// Where the schema ts is the commitTs of the txn creates the schema diff func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() infoschema_metrics.GetTSCounter.Inc() - if snapshotTS >= h.maxUpdatedSnapshotTS { - if len(h.cache) > 0 { - infoschema_metrics.HitTSCounter.Inc() - return h.cache[0] - } + if schema, ok := h.getSchemaByTimestampNoLock(snapshotTS); ok { + infoschema_metrics.HitTSCounter.Inc() + return schema } return nil } @@ -110,21 +141,25 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema { // Insert will **TRY** to insert the infoschema into the cache. // It only promised to cache the newest infoschema. // It returns 'true' if it is cached, 'false' otherwise. -func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { +// schemaTs is the commitTs of the txn creates the schema diff, which indicates since when the schema version is taking effect +func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool { + logutil.BgLogger().Debug("INSERT SCHEMA", zap.Uint64("schema ts", schemaTS), zap.Int64("schema version", is.SchemaMetaVersion())) h.mu.Lock() defer h.mu.Unlock() version := is.SchemaMetaVersion() + + // assume this is the timestamp order as well i := sort.Search(len(h.cache), func(i int) bool { - return h.cache[i].SchemaMetaVersion() <= version + return h.cache[i].infoschema.SchemaMetaVersion() <= version }) - if h.maxUpdatedSnapshotTS < snapshotTS { - h.maxUpdatedSnapshotTS = snapshotTS - } - // cached entry - if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version { + // update timestamp if it is not 0 and cached one is 0 + if schemaTS > 0 && h.cache[i].timestamp == 0 { + h.cache[i].timestamp = int64(schemaTS) + } return true } @@ -132,14 +167,21 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { // has free space, grown the slice h.cache = h.cache[:len(h.cache)+1] copy(h.cache[i+1:], h.cache[i:]) - h.cache[i] = is - return true + h.cache[i] = schemaAndTimestamp{ + infoschema: is, + timestamp: int64(schemaTS), + } } else if i < len(h.cache) { // drop older schema copy(h.cache[i+1:], h.cache[i:]) - h.cache[i] = is - return true + h.cache[i] = schemaAndTimestamp{ + infoschema: is, + timestamp: int64(schemaTS), + } + } else { + // older than all cached schemas, refuse to cache it + return false } - // older than all cached schemas, refuse to cache it - return false + + return true } diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 83506bc4794d8..b05ffa531fae5 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -42,6 +42,7 @@ func TestInsert(t *testing.T) { ic.Insert(is5, 5) require.Equal(t, is5, ic.GetByVersion(5)) require.Equal(t, is2, ic.GetByVersion(2)) + // there is a gap in schema cache, so don't use this version require.Nil(t, ic.GetBySnapshotTS(2)) require.Equal(t, is5, ic.GetBySnapshotTS(10)) @@ -59,7 +60,9 @@ func TestInsert(t *testing.T) { require.Equal(t, is5, ic.GetByVersion(5)) require.Equal(t, is2, ic.GetByVersion(2)) require.Nil(t, ic.GetByVersion(0)) + // there is a gap in schema cache, so don't use this version require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is5, ic.GetBySnapshotTS(5)) require.Equal(t, is6, ic.GetBySnapshotTS(10)) // replace 2, drop 2 @@ -91,7 +94,7 @@ func TestInsert(t *testing.T) { require.Nil(t, ic.GetByVersion(2)) require.Nil(t, ic.GetByVersion(0)) require.Nil(t, ic.GetBySnapshotTS(2)) - require.Nil(t, ic.GetBySnapshotTS(5)) + require.Equal(t, is5, ic.GetBySnapshotTS(5)) require.Equal(t, is6, ic.GetBySnapshotTS(10)) } @@ -129,3 +132,36 @@ func TestGetLatest(t *testing.T) { ic.Insert(is0, 0) require.Equal(t, is2, ic.GetLatest()) } + +func TestGetByTimestamp(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + require.Nil(t, ic.GetLatest()) + + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1, 1) + require.Nil(t, ic.GetBySnapshotTS(0)) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + require.Equal(t, is1, ic.GetBySnapshotTS(2)) + + is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) + ic.Insert(is3, 3) + require.Equal(t, is3, ic.GetLatest()) + require.Nil(t, ic.GetBySnapshotTS(0)) + // there is a gap, no schema returned for ts 2 + require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is3, ic.GetBySnapshotTS(3)) + require.Equal(t, is3, ic.GetBySnapshotTS(4)) + + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + // schema version 2 doesn't have timestamp set + // thus all schema before ver 2 cannot be searched by timestamp anymore + // because the ts of ver 2 is not accurate + ic.Insert(is2, 0) + require.Equal(t, is3, ic.GetLatest()) + require.Nil(t, ic.GetBySnapshotTS(0)) + require.Nil(t, ic.GetBySnapshotTS(1)) + require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is3, ic.GetBySnapshotTS(3)) + require.Equal(t, is3, ic.GetBySnapshotTS(4)) +} diff --git a/meta/meta.go b/meta/meta.go index 9de7cd5909fbd..d9bb14076320b 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -441,6 +441,12 @@ func (m *Meta) GetSchemaVersionWithNonEmptyDiff() (int64, error) { return v, err } +// EncodeSchemaDiffKey returns the raw kv key for a schema diff +func (m *Meta) EncodeSchemaDiffKey(schemaVersion int64) kv.Key { + diffKey := m.schemaDiffKey(schemaVersion) + return m.txn.EncodeStringDataKey(diffKey) +} + // GetSchemaVersion gets current global schema version. func (m *Meta) GetSchemaVersion() (int64, error) { return m.txn.GetInt64(mSchemaVersionKey) diff --git a/structure/string.go b/structure/string.go index bdec023107fab..00d0195b59f05 100644 --- a/structure/string.go +++ b/structure/string.go @@ -27,13 +27,13 @@ func (t *TxStructure) Set(key []byte, value []byte) error { if t.readWriter == nil { return ErrWriteOnSnapshot } - ek := t.encodeStringDataKey(key) + ek := t.EncodeStringDataKey(key) return t.readWriter.Set(ek, value) } // Get gets the string value of a key. func (t *TxStructure) Get(key []byte) ([]byte, error) { - ek := t.encodeStringDataKey(key) + ek := t.EncodeStringDataKey(key) value, err := t.reader.Get(context.TODO(), ek) if kv.ErrNotExist.Equal(err) { err = nil @@ -58,7 +58,7 @@ func (t *TxStructure) Inc(key []byte, step int64) (int64, error) { if t.readWriter == nil { return 0, ErrWriteOnSnapshot } - ek := t.encodeStringDataKey(key) + ek := t.EncodeStringDataKey(key) // txn Inc will lock this key, so we don't lock it here. n, err := kv.IncInt64(t.readWriter, ek, step) if kv.ErrNotExist.Equal(err) { @@ -72,7 +72,7 @@ func (t *TxStructure) Clear(key []byte) error { if t.readWriter == nil { return ErrWriteOnSnapshot } - ek := t.encodeStringDataKey(key) + ek := t.EncodeStringDataKey(key) err := t.readWriter.Delete(ek) if kv.ErrNotExist.Equal(err) { err = nil diff --git a/structure/type.go b/structure/type.go index 8e1437c2c4e4b..e92128be6eded 100644 --- a/structure/type.go +++ b/structure/type.go @@ -43,7 +43,8 @@ const ( // Make linter happy, since encodeHashMetaKey is unused in this repo. var _ = (&TxStructure{}).encodeHashMetaKey -func (t *TxStructure) encodeStringDataKey(key []byte) kv.Key { +// EncodeStringDataKey will encode string key. +func (t *TxStructure) EncodeStringDataKey(key []byte) kv.Key { // for codec Encode, we may add extra bytes data, so here and following encode // we will use extra length like 4 for a little optimization. ek := make([]byte, 0, len(t.prefix)+len(key)+24)