From 2dcbe9ebe5e0dd4e7a1b8fe09695155027cc6c63 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Fri, 11 Jun 2021 22:42:35 +0800 Subject: [PATCH] cherry pick #25322 to release-5.1 Signed-off-by: ti-srebot --- ddl/ddl_test.go | 2 +- ddl/util/syncer_test.go | 4 +-- domain/domain.go | 8 +++-- infoschema/cache.go | 33 +++++++++++++++--- infoschema/cache_test.go | 75 ++++++++++++++++++++++++---------------- metrics/domain.go | 2 +- owner/manager_test.go | 8 ++--- 7 files changed, 87 insertions(+), 45 deletions(-) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 79635bfc0933b..f8ee530826435 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -89,7 +89,7 @@ func TestT(t *testing.T) { func testNewDDLAndStart(ctx context.Context, c *C, options ...Option) *ddl { // init infoCache and a stub infoSchema ic := infoschema.NewCache(2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) options = append(options, WithInfoCache(ic)) d := newDDL(ctx, options...) err := d.Start(nil) diff --git a/ddl/util/syncer_test.go b/ddl/util/syncer_test.go index 5a9d41d47e3b8..cd3b13ea40fd1 100644 --- a/ddl/util/syncer_test.go +++ b/ddl/util/syncer_test.go @@ -71,7 +71,7 @@ func TestSyncerSimple(t *testing.T) { cli := clus.RandClient() ctx := goctx.Background() ic := infoschema.NewCache(2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( ctx, WithEtcdClient(cli), @@ -115,7 +115,7 @@ func TestSyncerSimple(t *testing.T) { } ic2 := infoschema.NewCache(2) - ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d1 := NewDDL( ctx, WithEtcdClient(cli), diff --git a/domain/domain.go b/domain/domain.go index f7cbb26078921..1d66bbf2656f5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -125,7 +125,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) if err == nil { - do.infoCache.Insert(is) + do.infoCache.Insert(is, startTS) logutil.BgLogger().Info("diff load InfoSchema success", zap.Int64("currentSchemaVersion", currentSchemaVersion), zap.Int64("neededSchemaVersion", neededSchemaVersion), @@ -158,7 +158,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i zap.Duration("start time", time.Since(startTime))) is := newISBuilder.Build() - do.infoCache.Insert(is) + do.infoCache.Insert(is, startTS) return is, false, currentSchemaVersion, nil, nil } @@ -290,6 +290,10 @@ func (do *Domain) InfoSchema() infoschema.InfoSchema { // GetSnapshotInfoSchema gets a snapshot information schema. func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchema, error) { + // if the snapshotTS is new enough, we can get infoschema directly through sanpshotTS. + if is := do.infoCache.GetBySnapshotTS(snapshotTS); is != nil { + return is, nil + } is, _, _, _, err := do.loadInfoSchema(snapshotTS) return is, err } diff --git a/infoschema/cache.go b/infoschema/cache.go index 4c3371b1bc354..34dba0cf95e42 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -27,6 +27,8 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by SchemaVersion in descending order cache []InfoSchema + // record SnapshotTS of the latest schema Insert. + maxUpdatedSnapshotTS uint64 } // NewCache creates a new InfoCache. @@ -38,9 +40,9 @@ func NewCache(capcity int) *InfoCache { func (h *InfoCache) GetLatest() InfoSchema { h.mu.RLock() defer h.mu.RUnlock() - metrics.InfoCacheCounters.WithLabelValues("get").Inc() + metrics.InfoCacheCounters.WithLabelValues("get", "latest").Inc() if len(h.cache) > 0 { - metrics.InfoCacheCounters.WithLabelValues("hit").Inc() + metrics.InfoCacheCounters.WithLabelValues("hit", "latest").Inc() return h.cache[0] } return nil @@ -50,21 +52,38 @@ func (h *InfoCache) GetLatest() InfoSchema { func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() - metrics.InfoCacheCounters.WithLabelValues("get").Inc() + metrics.InfoCacheCounters.WithLabelValues("get", "version").Inc() i := sort.Search(len(h.cache), func(i int) bool { return h.cache[i].SchemaMetaVersion() <= version }) if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { - metrics.InfoCacheCounters.WithLabelValues("hit").Inc() + metrics.InfoCacheCounters.WithLabelValues("hit", "version").Inc() return h.cache[i] } return nil } +// GetBySnapshotTS gets the information schema based on snapshotTS. +// If the snapshotTS is new than maxUpdatedSnapshotTS, that's mean it can directly use +// the latest infoschema. otherwise, will return nil. +func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema { + h.mu.RLock() + defer h.mu.RUnlock() + + metrics.InfoCacheCounters.WithLabelValues("get", "ts").Inc() + if snapshotTS >= h.maxUpdatedSnapshotTS { + if len(h.cache) > 0 { + metrics.InfoCacheCounters.WithLabelValues("hit", "ts").Inc() + return h.cache[0] + } + } + return nil +} + // Insert will **TRY** to insert the infoschema into the cache. // It only promised to cache the newest infoschema. // It returns 'true' if it is cached, 'false' otherwise. -func (h *InfoCache) Insert(is InfoSchema) bool { +func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { h.mu.Lock() defer h.mu.Unlock() @@ -73,6 +92,10 @@ func (h *InfoCache) Insert(is InfoSchema) bool { return h.cache[i].SchemaMetaVersion() <= version }) + if h.maxUpdatedSnapshotTS < snapshotTS { + h.maxUpdatedSnapshotTS = snapshotTS + } + // cached entry if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { return true diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index a8e9ddcc0df5a..404ae5afcefbf 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -33,63 +33,78 @@ func (s *testInfoCacheSuite) TestInsert(c *C) { c.Assert(ic, NotNil) is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) - ic.Insert(is2) - c.Assert(ic.GetByVersion(2), NotNil) + ic.Insert(is2, 2) + c.Assert(ic.GetByVersion(2), DeepEquals, is2) + c.Assert(ic.GetBySnapshotTS(2), DeepEquals, is2) + c.Assert(ic.GetBySnapshotTS(10), DeepEquals, is2) + c.Assert(ic.GetBySnapshotTS(0), IsNil) // newer is5 := infoschema.MockInfoSchemaWithSchemaVer(nil, 5) - ic.Insert(is5) - c.Assert(ic.GetByVersion(5), NotNil) - c.Assert(ic.GetByVersion(2), NotNil) + ic.Insert(is5, 5) + c.Assert(ic.GetByVersion(5), DeepEquals, is5) + c.Assert(ic.GetByVersion(2), DeepEquals, is2) + c.Assert(ic.GetBySnapshotTS(2), IsNil) + c.Assert(ic.GetBySnapshotTS(10), DeepEquals, is5) // older is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) - ic.Insert(is0) - c.Assert(ic.GetByVersion(5), NotNil) - c.Assert(ic.GetByVersion(2), NotNil) - c.Assert(ic.GetByVersion(0), NotNil) + ic.Insert(is0, 0) + c.Assert(ic.GetByVersion(5), DeepEquals, is5) + c.Assert(ic.GetByVersion(2), DeepEquals, is2) + c.Assert(ic.GetByVersion(0), DeepEquals, is0) // replace 5, drop 0 is6 := infoschema.MockInfoSchemaWithSchemaVer(nil, 6) - ic.Insert(is6) - c.Assert(ic.GetByVersion(6), NotNil) - c.Assert(ic.GetByVersion(5), NotNil) - c.Assert(ic.GetByVersion(2), NotNil) + ic.Insert(is6, 6) + c.Assert(ic.GetByVersion(6), DeepEquals, is6) + c.Assert(ic.GetByVersion(5), DeepEquals, is5) + c.Assert(ic.GetByVersion(2), DeepEquals, is2) c.Assert(ic.GetByVersion(0), IsNil) + c.Assert(ic.GetBySnapshotTS(2), IsNil) + c.Assert(ic.GetBySnapshotTS(10), DeepEquals, is6) // replace 2, drop 2 is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) - ic.Insert(is3) - c.Assert(ic.GetByVersion(6), NotNil) - c.Assert(ic.GetByVersion(5), NotNil) - c.Assert(ic.GetByVersion(3), NotNil) + ic.Insert(is3, 3) + c.Assert(ic.GetByVersion(6), DeepEquals, is6) + c.Assert(ic.GetByVersion(5), DeepEquals, is5) + c.Assert(ic.GetByVersion(3), DeepEquals, is3) c.Assert(ic.GetByVersion(2), IsNil) c.Assert(ic.GetByVersion(0), IsNil) + c.Assert(ic.GetBySnapshotTS(2), IsNil) + c.Assert(ic.GetBySnapshotTS(10), DeepEquals, is6) // insert 2, but failed silently - ic.Insert(is2) - c.Assert(ic.GetByVersion(6), NotNil) - c.Assert(ic.GetByVersion(5), NotNil) - c.Assert(ic.GetByVersion(3), NotNil) + ic.Insert(is2, 2) + c.Assert(ic.GetByVersion(6), DeepEquals, is6) + c.Assert(ic.GetByVersion(5), DeepEquals, is5) + c.Assert(ic.GetByVersion(3), DeepEquals, is3) c.Assert(ic.GetByVersion(2), IsNil) c.Assert(ic.GetByVersion(0), IsNil) + c.Assert(ic.GetBySnapshotTS(2), IsNil) + c.Assert(ic.GetBySnapshotTS(10), DeepEquals, is6) // insert 5, but it is already in - ic.Insert(is5) - c.Assert(ic.GetByVersion(6), NotNil) - c.Assert(ic.GetByVersion(5), NotNil) - c.Assert(ic.GetByVersion(3), NotNil) + ic.Insert(is5, 5) + c.Assert(ic.GetByVersion(6), DeepEquals, is6) + c.Assert(ic.GetByVersion(5), DeepEquals, is5) + c.Assert(ic.GetByVersion(3), DeepEquals, is3) c.Assert(ic.GetByVersion(2), IsNil) c.Assert(ic.GetByVersion(0), IsNil) + c.Assert(ic.GetBySnapshotTS(2), IsNil) + c.Assert(ic.GetBySnapshotTS(5), IsNil) + c.Assert(ic.GetBySnapshotTS(10), DeepEquals, is6) + } func (s *testInfoCacheSuite) TestGetByVersion(c *C) { ic := infoschema.NewCache(2) c.Assert(ic, NotNil) is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) - ic.Insert(is1) + ic.Insert(is1, 1) is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) - ic.Insert(is3) + ic.Insert(is3, 3) c.Assert(ic.GetByVersion(1), Equals, is1) c.Assert(ic.GetByVersion(3), Equals, is3) @@ -104,16 +119,16 @@ func (s *testInfoCacheSuite) TestGetLatest(c *C) { c.Assert(ic.GetLatest(), IsNil) is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) - ic.Insert(is1) + ic.Insert(is1, 1) c.Assert(ic.GetLatest(), Equals, is1) // newer change the newest is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) - ic.Insert(is2) + ic.Insert(is2, 2) c.Assert(ic.GetLatest(), Equals, is2) // older schema doesn't change the newest is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) - ic.Insert(is0) + ic.Insert(is0, 0) c.Assert(ic.GetLatest(), Equals, is2) } diff --git a/metrics/domain.go b/metrics/domain.go index a05b25dd6a46a..3bc9782afe420 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -45,7 +45,7 @@ var ( Subsystem: "domain", Name: "infocache_counters", Help: "Counters of infoCache: get/hit.", - }, []string{LblType}) + }, []string{LblAction, LblType}) // InfoCacheCounterGet is the total number of getting entry. InfoCacheCounterGet = "get" // InfoCacheCounterHit is the cache hit numbers for get. diff --git a/owner/manager_test.go b/owner/manager_test.go index e239419057291..17536cd16610a 100644 --- a/owner/manager_test.go +++ b/owner/manager_test.go @@ -74,7 +74,7 @@ func TestSingle(t *testing.T) { cli := clus.RandClient() ctx := goctx.Background() ic := infoschema.NewCache(2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( ctx, WithEtcdClient(cli), @@ -147,7 +147,7 @@ func TestCluster(t *testing.T) { cli := clus.Client(0) ic := infoschema.NewCache(2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( goctx.Background(), WithEtcdClient(cli), @@ -165,7 +165,7 @@ func TestCluster(t *testing.T) { } cli1 := clus.Client(1) ic2 := infoschema.NewCache(2) - ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d1 := NewDDL( goctx.Background(), WithEtcdClient(cli1), @@ -200,7 +200,7 @@ func TestCluster(t *testing.T) { // d3 (not owner) stop cli3 := clus.Client(3) ic3 := infoschema.NewCache(2) - ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d3 := NewDDL( goctx.Background(), WithEtcdClient(cli3),