Skip to content

Commit

Permalink
Adds in-memory cache option, improves scalability for IOT mode.
Browse files Browse the repository at this point in the history
This commit resolves #3227

In IOT mode, 10K nodes are connecting back to the proxies, putting
a lot of pressure on the proxy cache.

Before this commit, Proxy's only cache option were persistent
sqlite-backed caches. The advantage of those caches that Proxies
could continue working after reboots with Auth servers unavailable.

The disadvantage is that sqlite backend breaks down on many concurrent
reads due to performance issues.

This commit introduces the new cache configuration option, 'in-memory':

```yaml
teleport:
  cache:
    # default value sqlite,
    # the only supported values are sqlite or in-memory
    type: in-memory
```

This cache mode allows two m4.4xlarge proxies to handle 10K IOT mode connected
nodes with no issues.

The second part of the commit disables the cache reload on timer that caused
inconsistent view results for 10K displayed nodes with servers disappearing
from the view.

The third part of the commit increases the channels buffering discovery
requests 10x. The channels were overfilling in 10K nodes and nodes
were disconnected. The logic now does not treat the channel overflow
as a reason to close the connection. This is possible due to the changes
in the discovery protocol that allow target nodes to handle missing
entries, duplicate entries or conflicting values.
  • Loading branch information
klizhentas committed Feb 6, 2020
1 parent 8536b9d commit eadbc79
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 36 deletions.
1 change: 0 additions & 1 deletion lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (c *CircularBuffer) Reset() {
defer c.Unlock()
// could close mulitple times
c.watchers.walk(func(w *BufferWatcher) {
c.Debugf("Closing watcher %p via reset.", w)
w.closeWatcher()
})
c.watchers = newWatcherTree()
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/lite/lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
busyTimeout = 10000
)

// GetName() is a part of backend API and it returns SQLite backend type
// GetName is a part of backend API and it returns SQLite backend type
// as it appears in `storage/type` section of Teleport YAML
func GetName() string {
return BackendName
Expand Down
6 changes: 6 additions & 0 deletions lib/backend/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import (
log "github.com/sirupsen/logrus"
)

// GetName is a part of backend API and it returns in-memory backend type
// as it appears in `storage/type` section of Teleport YAML
func GetName() string {
return "in-memory"
}

const (
// defaultBTreeDegreee is a default degree of a B-Tree
defaultBTreeDegree = 8
Expand Down
18 changes: 2 additions & 16 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ type Config struct {
Backend backend.Backend
// RetryPeriod is a period between cache retries on failures
RetryPeriod time.Duration
// ReloadPeriod is a period when cache performs full reload
ReloadPeriod time.Duration
// EventsC is a channel for event notifications,
// used in tests
EventsC chan CacheEvent
Expand Down Expand Up @@ -234,9 +232,6 @@ func (c *Config) CheckAndSetDefaults() error {
if c.RetryPeriod == 0 {
c.RetryPeriod = defaults.HighResPollingPeriod
}
if c.ReloadPeriod == 0 {
c.ReloadPeriod = defaults.LowResPollingPeriod
}
if c.Component == "" {
c.Component = teleport.ComponentCache
}
Expand Down Expand Up @@ -331,10 +326,7 @@ func (c *Cache) update() {
return
}
for {
// Reload period is here to protect against
// unknown cache going out of sync problems
// that we did not predict.
err := c.fetchAndWatch(retry, time.After(c.ReloadPeriod))
err := c.fetchAndWatch(retry)
if err != nil {
c.setCacheState(err)
if !c.isClosed() {
Expand Down Expand Up @@ -438,7 +430,7 @@ func (c *Cache) notify(event CacheEvent) {
// we assume that this cache will eventually end up in a correct state
// potentially lagging behind the state of the database.
//
func (c *Cache) fetchAndWatch(retry utils.Retry, reloadC <-chan time.Time) error {
func (c *Cache) fetchAndWatch(retry utils.Retry) error {
watcher, err := c.Events.NewWatcher(c.ctx, services.Watch{
QueueSize: c.QueueSize,
Name: c.Component,
Expand Down Expand Up @@ -467,9 +459,6 @@ func (c *Cache) fetchAndWatch(retry utils.Retry, reloadC <-chan time.Time) error
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-reloadC:
c.Debugf("Triggering scheduled reload.")
return nil
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case event := <-watcher.Events():
Expand All @@ -488,9 +477,6 @@ func (c *Cache) fetchAndWatch(retry utils.Retry, reloadC <-chan time.Time) error
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-reloadC:
c.Debugf("Triggering scheduled reload.")
return nil
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case event := <-watcher.Events():
Expand Down
11 changes: 7 additions & 4 deletions lib/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/service"
Expand Down Expand Up @@ -513,11 +514,13 @@ func (s *ConfigTestSuite) TestParseCachePolicy(c *check.C) {
out *service.CachePolicy
err error
}{
{in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, NeverExpires: true}},
{in: &CachePolicy{EnabledFlag: "yes", TTL: "10h"}, out: &service.CachePolicy{Enabled: true, NeverExpires: false, TTL: 10 * time.Hour}},
{in: &CachePolicy{EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false, NeverExpires: false, TTL: 10 * time.Hour}},
{in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Enabled: false}},
{in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, NeverExpires: true, Type: lite.GetName()}},
{in: &CachePolicy{EnabledFlag: "yes", TTL: "10h"}, out: &service.CachePolicy{Enabled: true, NeverExpires: false, TTL: 10 * time.Hour, Type: lite.GetName()}},
{in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false, NeverExpires: false, TTL: 10 * time.Hour, Type: memory.GetName()}},
{in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, NeverExpires: true, Type: memory.GetName()}},
{in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Type: lite.GetName(), Enabled: false}},
{in: &CachePolicy{EnabledFlag: "false", TTL: "zap"}, err: trace.BadParameter("bad format")},
{in: &CachePolicy{Type: "memsql"}, err: trace.BadParameter("unsupported backend")},
}
for i, tc := range tcs {
comment := check.Commentf("test case #%v", i)
Expand Down
21 changes: 13 additions & 8 deletions lib/config/fileconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ type Global struct {

// CachePolicy is used to control local cache
type CachePolicy struct {
// Type is for cache type `sqlite` or `in-memory`
Type string `yaml:"type,omitempty"`
// EnabledFlag enables or disables cache
EnabledFlag string `yaml:"enabled,omitempty"`
// TTL sets maximum TTL for the cached values
Expand Down Expand Up @@ -431,19 +433,22 @@ func (c *CachePolicy) NeverExpires() bool {
// Parse parses cache policy from Teleport config
func (c *CachePolicy) Parse() (*service.CachePolicy, error) {
out := service.CachePolicy{
Type: c.Type,
Enabled: c.Enabled(),
NeverExpires: c.NeverExpires(),
}
if out.NeverExpires {
return &out, nil
}
var err error
if c.TTL != "" {
out.TTL, err = time.ParseDuration(c.TTL)
if err != nil {
return nil, trace.BadParameter("cache.ttl invalid duration: %v, accepted format '10h'", c.TTL)
if !out.NeverExpires {
var err error
if c.TTL != "" {
out.TTL, err = time.ParseDuration(c.TTL)
if err != nil {
return nil, trace.BadParameter("cache.ttl invalid duration: %v, accepted format '10h'", c.TTL)
}
}
}
if err := out.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return &out, nil
}

Expand Down
7 changes: 5 additions & 2 deletions lib/reversetunnel/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newRemoteConn(cfg *connConfig) *remoteConn {
}),
connConfig: cfg,
clock: clockwork.NewRealClock(),
newProxiesC: make(chan []services.Server, 10),
newProxiesC: make(chan []services.Server, 100),
}

c.closeContext, c.closeCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -218,7 +218,10 @@ func (c *remoteConn) updateProxies(proxies []services.Server) error {
case c.newProxiesC <- proxies:
return nil
default:
return trace.ConnectionProblem(nil, "discovery channel overflow at %v", len(c.newProxiesC))
// Missing proxies update is no longer critical with more permissive
// discovery protocol that tolerates conflicting, stale or missing updates
c.log.Warnf("Discovery channel overflow at %v.", len(c.newProxiesC))
return nil
}
}

Expand Down
24 changes: 20 additions & 4 deletions lib/service/cfg.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2015 Gravitational, Inc.
Copyright 2015-2020 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@ import (
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/bpf"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -225,6 +226,8 @@ func (cfg *Config) DebugDumpToYAML() string {

// CachePolicy sets caching policy for proxies and nodes
type CachePolicy struct {
// Type sets the cache type
Type string
// Enabled enables or disables caching
Enabled bool
// TTL sets maximum TTL for the cached values
Expand All @@ -246,6 +249,19 @@ func (c *CachePolicy) GetRecentTTL() time.Duration {
return *c.RecentTTL
}

// CheckAndSetDefaults checks and sets default values
func (c *CachePolicy) CheckAndSetDefaults() error {
switch c.Type {
case "", lite.GetName():
c.Type = lite.GetName()
case memory.GetName():
default:
return trace.BadParameter("unsupported cache type %q, supported values are %q and %q",
c.Type, lite.GetName(), memory.GetName())
}
return nil
}

// String returns human-friendly representation of the policy
func (c CachePolicy) String() string {
if !c.Enabled {
Expand All @@ -258,12 +274,12 @@ func (c CachePolicy) String() string {
recentCachePolicy = fmt.Sprintf("will cache frequently accessed items for %v", c.GetRecentTTL())
}
if c.NeverExpires {
return fmt.Sprintf("cache that will not expire in case if connection to database is lost, %v", recentCachePolicy)
return fmt.Sprintf("%v cache that will not expire in case if connection to database is lost, %v", c.Type, recentCachePolicy)
}
if c.TTL == 0 {
return fmt.Sprintf("cache that will expire after connection to database is lost after %v, %v", defaults.CacheTTL, recentCachePolicy)
return fmt.Sprintf("%v cache that will expire after connection to database is lost after %v, %v", c.Type, defaults.CacheTTL, recentCachePolicy)
}
return fmt.Sprintf("cache that will expire after connection to database is lost after %v, %v", c.TTL, recentCachePolicy)
return fmt.Sprintf("%v cache that will expire after connection to database is lost after %v, %v", c.Type, c.TTL, recentCachePolicy)
}

// ProxyConfig specifies configuration for proxy service
Expand Down
3 changes: 3 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca
}
var cacheBackend backend.Backend
if cfg.inMemory {
process.Debugf("Creating in-memory backend for %v.", cfg.cacheName)
mem, err := memory.New(memory.Config{
Context: process.ExitContext(),
EventsOff: !cfg.events,
Expand All @@ -1273,6 +1274,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca
}
cacheBackend = mem
} else {
process.Debugf("Creating sqlite backend for %v.", cfg.cacheName)
path := filepath.Join(append([]string{process.Config.DataDir, "cache"}, cfg.cacheName...)...)
if err := os.MkdirAll(path, teleport.SharedDirMode); err != nil {
return nil, trace.ConvertSystemError(err)
Expand Down Expand Up @@ -1341,6 +1343,7 @@ func (process *TeleportProcess) newLocalCache(clt auth.ClientI, setupConfig cach
return clt, nil
}
cache, err := process.newAccessCache(accessCacheConfig{
inMemory: process.Config.CachePolicy.Type == memory.GetName(),
services: clt,
setup: process.setupCachePolicy(setupConfig),
cacheName: cacheName,
Expand Down

0 comments on commit eadbc79

Please sign in to comment.