From eadbc79f4a5c0dfec1a8c4d4d4eb60f360f0aa0b Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 2 Feb 2020 11:36:53 -0800 Subject: [PATCH] Adds in-memory cache option, improves scalability for IOT mode. This commit resolves #3227 In IOT mode, 10K nodes are connecting back to the proxies, putting a lot of pressure on the proxy cache. Before this commit, Proxy's only cache option were persistent sqlite-backed caches. The advantage of those caches that Proxies could continue working after reboots with Auth servers unavailable. The disadvantage is that sqlite backend breaks down on many concurrent reads due to performance issues. This commit introduces the new cache configuration option, 'in-memory': ```yaml teleport: cache: # default value sqlite, # the only supported values are sqlite or in-memory type: in-memory ``` This cache mode allows two m4.4xlarge proxies to handle 10K IOT mode connected nodes with no issues. The second part of the commit disables the cache reload on timer that caused inconsistent view results for 10K displayed nodes with servers disappearing from the view. The third part of the commit increases the channels buffering discovery requests 10x. The channels were overfilling in 10K nodes and nodes were disconnected. The logic now does not treat the channel overflow as a reason to close the connection. This is possible due to the changes in the discovery protocol that allow target nodes to handle missing entries, duplicate entries or conflicting values. --- lib/backend/buffer.go | 1 - lib/backend/lite/lite.go | 2 +- lib/backend/memory/memory.go | 6 ++++++ lib/cache/cache.go | 18 ++---------------- lib/config/configuration_test.go | 11 +++++++---- lib/config/fileconf.go | 21 +++++++++++++-------- lib/reversetunnel/conn.go | 7 +++++-- lib/service/cfg.go | 24 ++++++++++++++++++++---- lib/service/service.go | 3 +++ 9 files changed, 57 insertions(+), 36 deletions(-) diff --git a/lib/backend/buffer.go b/lib/backend/buffer.go index 3c005e3325792..f94b034521463 100644 --- a/lib/backend/buffer.go +++ b/lib/backend/buffer.go @@ -72,7 +72,6 @@ func (c *CircularBuffer) Reset() { defer c.Unlock() // could close mulitple times c.watchers.walk(func(w *BufferWatcher) { - c.Debugf("Closing watcher %p via reset.", w) w.closeWatcher() }) c.watchers = newWatcherTree() diff --git a/lib/backend/lite/lite.go b/lib/backend/lite/lite.go index 03aa63df348eb..4dcc5e18500da 100644 --- a/lib/backend/lite/lite.go +++ b/lib/backend/lite/lite.go @@ -49,7 +49,7 @@ const ( busyTimeout = 10000 ) -// GetName() is a part of backend API and it returns SQLite backend type +// GetName is a part of backend API and it returns SQLite backend type // as it appears in `storage/type` section of Teleport YAML func GetName() string { return BackendName diff --git a/lib/backend/memory/memory.go b/lib/backend/memory/memory.go index 0479adffa5f82..b799ad2e777a1 100644 --- a/lib/backend/memory/memory.go +++ b/lib/backend/memory/memory.go @@ -32,6 +32,12 @@ import ( log "github.com/sirupsen/logrus" ) +// GetName is a part of backend API and it returns in-memory backend type +// as it appears in `storage/type` section of Teleport YAML +func GetName() string { + return "in-memory" +} + const ( // defaultBTreeDegreee is a default degree of a B-Tree defaultBTreeDegree = 8 diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 9bf30b21e0820..8aabec266e8fe 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -155,8 +155,6 @@ type Config struct { Backend backend.Backend // RetryPeriod is a period between cache retries on failures RetryPeriod time.Duration - // ReloadPeriod is a period when cache performs full reload - ReloadPeriod time.Duration // EventsC is a channel for event notifications, // used in tests EventsC chan CacheEvent @@ -234,9 +232,6 @@ func (c *Config) CheckAndSetDefaults() error { if c.RetryPeriod == 0 { c.RetryPeriod = defaults.HighResPollingPeriod } - if c.ReloadPeriod == 0 { - c.ReloadPeriod = defaults.LowResPollingPeriod - } if c.Component == "" { c.Component = teleport.ComponentCache } @@ -331,10 +326,7 @@ func (c *Cache) update() { return } for { - // Reload period is here to protect against - // unknown cache going out of sync problems - // that we did not predict. - err := c.fetchAndWatch(retry, time.After(c.ReloadPeriod)) + err := c.fetchAndWatch(retry) if err != nil { c.setCacheState(err) if !c.isClosed() { @@ -438,7 +430,7 @@ func (c *Cache) notify(event CacheEvent) { // we assume that this cache will eventually end up in a correct state // potentially lagging behind the state of the database. // -func (c *Cache) fetchAndWatch(retry utils.Retry, reloadC <-chan time.Time) error { +func (c *Cache) fetchAndWatch(retry utils.Retry) error { watcher, err := c.Events.NewWatcher(c.ctx, services.Watch{ QueueSize: c.QueueSize, Name: c.Component, @@ -467,9 +459,6 @@ func (c *Cache) fetchAndWatch(retry utils.Retry, reloadC <-chan time.Time) error select { case <-watcher.Done(): return trace.ConnectionProblem(watcher.Error(), "watcher is closed") - case <-reloadC: - c.Debugf("Triggering scheduled reload.") - return nil case <-c.ctx.Done(): return trace.ConnectionProblem(c.ctx.Err(), "context is closing") case event := <-watcher.Events(): @@ -488,9 +477,6 @@ func (c *Cache) fetchAndWatch(retry utils.Retry, reloadC <-chan time.Time) error select { case <-watcher.Done(): return trace.ConnectionProblem(watcher.Error(), "watcher is closed") - case <-reloadC: - c.Debugf("Triggering scheduled reload.") - return nil case <-c.ctx.Done(): return trace.ConnectionProblem(c.ctx.Err(), "context is closing") case event := <-watcher.Events(): diff --git a/lib/config/configuration_test.go b/lib/config/configuration_test.go index f6027a42cdff6..0dd7e98f72910 100644 --- a/lib/config/configuration_test.go +++ b/lib/config/configuration_test.go @@ -29,6 +29,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib" "github.com/gravitational/teleport/lib/backend/lite" + "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/service" @@ -513,11 +514,13 @@ func (s *ConfigTestSuite) TestParseCachePolicy(c *check.C) { out *service.CachePolicy err error }{ - {in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, NeverExpires: true}}, - {in: &CachePolicy{EnabledFlag: "yes", TTL: "10h"}, out: &service.CachePolicy{Enabled: true, NeverExpires: false, TTL: 10 * time.Hour}}, - {in: &CachePolicy{EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false, NeverExpires: false, TTL: 10 * time.Hour}}, - {in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Enabled: false}}, + {in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, NeverExpires: true, Type: lite.GetName()}}, + {in: &CachePolicy{EnabledFlag: "yes", TTL: "10h"}, out: &service.CachePolicy{Enabled: true, NeverExpires: false, TTL: 10 * time.Hour, Type: lite.GetName()}}, + {in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false, NeverExpires: false, TTL: 10 * time.Hour, Type: memory.GetName()}}, + {in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, NeverExpires: true, Type: memory.GetName()}}, + {in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Type: lite.GetName(), Enabled: false}}, {in: &CachePolicy{EnabledFlag: "false", TTL: "zap"}, err: trace.BadParameter("bad format")}, + {in: &CachePolicy{Type: "memsql"}, err: trace.BadParameter("unsupported backend")}, } for i, tc := range tcs { comment := check.Commentf("test case #%v", i) diff --git a/lib/config/fileconf.go b/lib/config/fileconf.go index 3c94ccc788ed5..0ce6c64e651d9 100644 --- a/lib/config/fileconf.go +++ b/lib/config/fileconf.go @@ -397,6 +397,8 @@ type Global struct { // CachePolicy is used to control local cache type CachePolicy struct { + // Type is for cache type `sqlite` or `in-memory` + Type string `yaml:"type,omitempty"` // EnabledFlag enables or disables cache EnabledFlag string `yaml:"enabled,omitempty"` // TTL sets maximum TTL for the cached values @@ -431,19 +433,22 @@ func (c *CachePolicy) NeverExpires() bool { // Parse parses cache policy from Teleport config func (c *CachePolicy) Parse() (*service.CachePolicy, error) { out := service.CachePolicy{ + Type: c.Type, Enabled: c.Enabled(), NeverExpires: c.NeverExpires(), } - if out.NeverExpires { - return &out, nil - } - var err error - if c.TTL != "" { - out.TTL, err = time.ParseDuration(c.TTL) - if err != nil { - return nil, trace.BadParameter("cache.ttl invalid duration: %v, accepted format '10h'", c.TTL) + if !out.NeverExpires { + var err error + if c.TTL != "" { + out.TTL, err = time.ParseDuration(c.TTL) + if err != nil { + return nil, trace.BadParameter("cache.ttl invalid duration: %v, accepted format '10h'", c.TTL) + } } } + if err := out.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } return &out, nil } diff --git a/lib/reversetunnel/conn.go b/lib/reversetunnel/conn.go index 080c7a6558c1e..60e111b9722f7 100644 --- a/lib/reversetunnel/conn.go +++ b/lib/reversetunnel/conn.go @@ -108,7 +108,7 @@ func newRemoteConn(cfg *connConfig) *remoteConn { }), connConfig: cfg, clock: clockwork.NewRealClock(), - newProxiesC: make(chan []services.Server, 10), + newProxiesC: make(chan []services.Server, 100), } c.closeContext, c.closeCancel = context.WithCancel(context.Background()) @@ -218,7 +218,10 @@ func (c *remoteConn) updateProxies(proxies []services.Server) error { case c.newProxiesC <- proxies: return nil default: - return trace.ConnectionProblem(nil, "discovery channel overflow at %v", len(c.newProxiesC)) + // Missing proxies update is no longer critical with more permissive + // discovery protocol that tolerates conflicting, stale or missing updates + c.log.Warnf("Discovery channel overflow at %v.", len(c.newProxiesC)) + return nil } } diff --git a/lib/service/cfg.go b/lib/service/cfg.go index 666ec31cc0191..7f26b1f8e166d 100644 --- a/lib/service/cfg.go +++ b/lib/service/cfg.go @@ -1,5 +1,5 @@ /* -Copyright 2015 Gravitational, Inc. +Copyright 2015-2020 Gravitational, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import ( "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/lite" + "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/bpf" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" @@ -225,6 +226,8 @@ func (cfg *Config) DebugDumpToYAML() string { // CachePolicy sets caching policy for proxies and nodes type CachePolicy struct { + // Type sets the cache type + Type string // Enabled enables or disables caching Enabled bool // TTL sets maximum TTL for the cached values @@ -246,6 +249,19 @@ func (c *CachePolicy) GetRecentTTL() time.Duration { return *c.RecentTTL } +// CheckAndSetDefaults checks and sets default values +func (c *CachePolicy) CheckAndSetDefaults() error { + switch c.Type { + case "", lite.GetName(): + c.Type = lite.GetName() + case memory.GetName(): + default: + return trace.BadParameter("unsupported cache type %q, supported values are %q and %q", + c.Type, lite.GetName(), memory.GetName()) + } + return nil +} + // String returns human-friendly representation of the policy func (c CachePolicy) String() string { if !c.Enabled { @@ -258,12 +274,12 @@ func (c CachePolicy) String() string { recentCachePolicy = fmt.Sprintf("will cache frequently accessed items for %v", c.GetRecentTTL()) } if c.NeverExpires { - return fmt.Sprintf("cache that will not expire in case if connection to database is lost, %v", recentCachePolicy) + return fmt.Sprintf("%v cache that will not expire in case if connection to database is lost, %v", c.Type, recentCachePolicy) } if c.TTL == 0 { - return fmt.Sprintf("cache that will expire after connection to database is lost after %v, %v", defaults.CacheTTL, recentCachePolicy) + return fmt.Sprintf("%v cache that will expire after connection to database is lost after %v, %v", c.Type, defaults.CacheTTL, recentCachePolicy) } - return fmt.Sprintf("cache that will expire after connection to database is lost after %v, %v", c.TTL, recentCachePolicy) + return fmt.Sprintf("%v cache that will expire after connection to database is lost after %v, %v", c.Type, c.TTL, recentCachePolicy) } // ProxyConfig specifies configuration for proxy service diff --git a/lib/service/service.go b/lib/service/service.go index 6106411368048..5fb2fc347b725 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1263,6 +1263,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca } var cacheBackend backend.Backend if cfg.inMemory { + process.Debugf("Creating in-memory backend for %v.", cfg.cacheName) mem, err := memory.New(memory.Config{ Context: process.ExitContext(), EventsOff: !cfg.events, @@ -1273,6 +1274,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca } cacheBackend = mem } else { + process.Debugf("Creating sqlite backend for %v.", cfg.cacheName) path := filepath.Join(append([]string{process.Config.DataDir, "cache"}, cfg.cacheName...)...) if err := os.MkdirAll(path, teleport.SharedDirMode); err != nil { return nil, trace.ConvertSystemError(err) @@ -1341,6 +1343,7 @@ func (process *TeleportProcess) newLocalCache(clt auth.ClientI, setupConfig cach return clt, nil } cache, err := process.newAccessCache(accessCacheConfig{ + inMemory: process.Config.CachePolicy.Type == memory.GetName(), services: clt, setup: process.setupCachePolicy(setupConfig), cacheName: cacheName,