diff --git a/infoschema/cache.go b/infoschema/cache.go index 34cc08eca2231..3ceab0bffb00d 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -15,6 +15,7 @@ package infoschema import ( + "fmt" "sort" "sync" @@ -36,22 +37,27 @@ var ( // It only promised to cache the infoschema, if it is newer than all the cached. type InfoCache struct { mu sync.RWMutex - // cache is sorted by SchemaVersion in descending order - cache []InfoSchema - // record SnapshotTS of the latest schema Insert. - maxUpdatedSnapshotTS uint64 + // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order + cache []schemaAndTimestamp +} + +type schemaAndTimestamp struct { + infoschema InfoSchema + timestamp int64 } // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { - return &InfoCache{cache: make([]InfoSchema, 0, capacity)} + return &InfoCache{ + cache: make([]schemaAndTimestamp, 0, capacity), + } } // Reset resets the cache. func (h *InfoCache) Reset(capacity int) { h.mu.Lock() defer h.mu.Unlock() - h.cache = make([]InfoSchema, 0, capacity) + h.cache = make([]schemaAndTimestamp, 0, capacity) } // GetLatest gets the newest information schema. @@ -61,18 +67,40 @@ func (h *InfoCache) GetLatest() InfoSchema { getLatestCounter.Inc() if len(h.cache) > 0 { hitLatestCounter.Inc() - return h.cache[0] + return h.cache[0].infoschema } return nil } +// GetSchemaByTimestamp returns the schema used at the specific timestamp +func (h *InfoCache) GetSchemaByTimestamp(ts uint64) (InfoSchema, error) { + h.mu.RLock() + defer h.mu.RUnlock() + return h.getSchemaByTimestampNoLock(ts) +} + +func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, error) { + i := sort.Search(len(h.cache), func(i int) bool { + return uint64(h.cache[i].timestamp) <= ts + }) + if i < len(h.cache) { + return h.cache[i].infoschema, nil + } + + return nil, fmt.Errorf("no schema cached for timestamp %d", ts) +} + // GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() + return h.getByVersionNoLock(version) +} + +func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema { getVersionCounter.Inc() i := sort.Search(len(h.cache), func(i int) bool { - return h.cache[i].SchemaMetaVersion() <= version + return h.cache[i].infoschema.SchemaMetaVersion() <= version }) // `GetByVersion` is allowed to load the latest schema that is less than argument `version`. @@ -93,9 +121,9 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema { // } // ``` - if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) { + if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) { hitVersionCounter.Inc() - return h.cache[i] + return h.cache[i].infoschema } return nil } @@ -108,11 +136,9 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema { defer h.mu.RUnlock() getTSCounter.Inc() - if snapshotTS >= h.maxUpdatedSnapshotTS { - if len(h.cache) > 0 { - hitTSCounter.Inc() - return h.cache[0] - } + if schema, err := h.getSchemaByTimestampNoLock(snapshotTS); err == nil { + hitTSCounter.Inc() + return schema } return nil } @@ -125,16 +151,17 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { defer h.mu.Unlock() version := is.SchemaMetaVersion() + + // assume this is the timestamp order as well i := sort.Search(len(h.cache), func(i int) bool { - return h.cache[i].SchemaMetaVersion() <= version + return h.cache[i].infoschema.SchemaMetaVersion() <= version }) - if h.maxUpdatedSnapshotTS < snapshotTS { - h.maxUpdatedSnapshotTS = snapshotTS - } - // cached entry - if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version { + if h.cache[i].timestamp > int64(snapshotTS) { + h.cache[i].timestamp = int64(snapshotTS) + } return true } @@ -142,12 +169,18 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { // has free space, grown the slice h.cache = h.cache[:len(h.cache)+1] copy(h.cache[i+1:], h.cache[i:]) - h.cache[i] = is + h.cache[i] = schemaAndTimestamp{ + infoschema: is, + timestamp: int64(snapshotTS), + } return true } else if i < len(h.cache) { // drop older schema copy(h.cache[i+1:], h.cache[i:]) - h.cache[i] = is + h.cache[i] = schemaAndTimestamp{ + infoschema: is, + timestamp: int64(snapshotTS), + } return true } // older than all cached schemas, refuse to cache it diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 83506bc4794d8..5d6e0e7f4e1b1 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -42,7 +42,7 @@ func TestInsert(t *testing.T) { ic.Insert(is5, 5) require.Equal(t, is5, ic.GetByVersion(5)) require.Equal(t, is2, ic.GetByVersion(2)) - require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) require.Equal(t, is5, ic.GetBySnapshotTS(10)) // older @@ -59,7 +59,7 @@ func TestInsert(t *testing.T) { require.Equal(t, is5, ic.GetByVersion(5)) require.Equal(t, is2, ic.GetByVersion(2)) require.Nil(t, ic.GetByVersion(0)) - require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) require.Equal(t, is6, ic.GetBySnapshotTS(10)) // replace 2, drop 2 @@ -91,7 +91,7 @@ func TestInsert(t *testing.T) { require.Nil(t, ic.GetByVersion(2)) require.Nil(t, ic.GetByVersion(0)) require.Nil(t, ic.GetBySnapshotTS(2)) - require.Nil(t, ic.GetBySnapshotTS(5)) + require.Equal(t, is5, ic.GetBySnapshotTS(5)) require.Equal(t, is6, ic.GetBySnapshotTS(10)) } @@ -129,3 +129,61 @@ func TestGetLatest(t *testing.T) { ic.Insert(is0, 0) require.Equal(t, is2, ic.GetLatest()) } + +func TestGetByTimestamp(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + require.Nil(t, ic.GetLatest()) + + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1, 1) + require.Equal(t, is1, ic.GetLatest()) + _, err := ic.GetSchemaByTimestamp(0) + require.NotNil(t, err) + schema, err := ic.GetSchemaByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + schema, err = ic.GetSchemaByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(2)) + + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2, 2) + require.Equal(t, is2, ic.GetLatest()) + _, err = ic.GetSchemaByTimestamp(0) + require.NotNil(t, err) + schema, err = ic.GetSchemaByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + schema, err = ic.GetSchemaByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) + schema, err = ic.GetSchemaByTimestamp(3) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(3)) + + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0, 0) + require.Equal(t, is2, ic.GetLatest()) + schema, err = ic.GetSchemaByTimestamp(0) + require.Nil(t, err) + require.Equal(t, int64(0), schema.SchemaMetaVersion()) + require.Equal(t, is0, ic.GetBySnapshotTS(0)) + schema, err = ic.GetSchemaByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + schema, err = ic.GetSchemaByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) + schema, err = ic.GetSchemaByTimestamp(3) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(3)) +}