Skip to content

Commit

Permalink
feat: add http provider for rate limit config
Browse files Browse the repository at this point in the history
Signed-off-by: Lu Xie <luxie@roblox.com>
  • Loading branch information
rbx-luxie committed Sep 15, 2022
1 parent ea444b1 commit 2060c5e
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 54 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ descriptors:
## Loading Configuration
### From local files
The Ratelimit service uses a library written by Lyft called [goruntime](https://github.com/lyft/goruntime) to do configuration loading. Goruntime monitors
a designated path, and watches for symlink swaps to files in the directory tree to reload configuration files.
Expand Down Expand Up @@ -525,6 +527,20 @@ For more information on how runtime works you can read its [README](https://gith
By default it is not possible to define multiple configuration files within `RUNTIME_SUBDIRECTORY` referencing the same domain.
To enable this behavior set `MERGE_DOMAIN_CONFIG` to `true`.
### From http provider
The Ratelimit service allows to load configuration from a http server. The endpoint can be configured via the settings package with the following environment variables:
```
HTTP_PROVIDER_ENABLED default:"false"
HTTP_PROVIDER_ENDPOINT
HTTP_PROVIDER_SUBPATH default:""
HTTP_PROVIDER_POLL_INTERVAL default:"10"
HTTP_PROVIDER_POLL_TIMEOUT default:"10"
```
Configuration will be reloaded intervally according to the value of `HTTP_POLL_INTERVAL` (second). `HTTP_PROVIDER_SUBPATH` is an array which allows you to have multiple endpoints for different config files.
## Log Format
A centralized log collection system works better with logs in json format. JSON format avoids the need for custom parsing rules.
Expand Down
91 changes: 91 additions & 0 deletions src/httpprovider/httpprovider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package httpprovider

import (
"fmt"
"io"
"net/http"
"time"

logger "github.com/sirupsen/logrus"

"github.com/envoyproxy/ratelimit/src/config"
)

type HttpProvider struct {
Endpoint string
Subpath []string
PollInterval time.Duration
PollTimeout time.Duration
ConfigChan chan []config.RateLimitConfigToLoad
httpClient *http.Client
StopChan chan bool
}

func NewHttpProvider(endpoint string, subpath []string, pollInterval time.Duration, pollTimeout time.Duration) *HttpProvider {
return &HttpProvider{
Endpoint: endpoint,
Subpath: subpath,
PollInterval: pollInterval,
ConfigChan: make(chan []config.RateLimitConfigToLoad),
httpClient: &http.Client{
Timeout: pollTimeout,
},
StopChan: make(chan bool),
}
}

func (p *HttpProvider) Provide() error {
for {
select {
case <-p.StopChan:
return nil
default:
configs, err := p.fetchConfigurationData()
if err != nil {
logger.Errorf("Failed to fetch the configuration: %+v", err)
time.Sleep(p.PollInterval)
continue
}
p.ConfigChan <- configs
time.Sleep(p.PollInterval)
}
}
}

func (p *HttpProvider) Stop() {
p.StopChan <- true
}

// fetchConfigurationData fetches the configuration data from the configured endpoint.
func (p *HttpProvider) fetchConfigurationData() ([]config.RateLimitConfigToLoad, error) {
configs := []config.RateLimitConfigToLoad{}

urls := []string{}
if len(p.Subpath) == 0 {
urls = append(urls, p.Endpoint)
} else {
for _, path := range p.Subpath {
urls = append(urls, fmt.Sprintf("%s/%s", p.Endpoint, path))
}
}

for _, url := range urls {
res, err := p.httpClient.Get(url)
if err != nil {
return nil, err
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("received non-ok response code: %d from the endpoint: %s", res.StatusCode, url)
}

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
configs = append(configs, config.RateLimitConfigToLoad{url, string(body)})
}
return configs, nil
}
7 changes: 7 additions & 0 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3"

"github.com/envoyproxy/ratelimit/src/httpprovider"

"github.com/lyft/goruntime/loader"
stats "github.com/lyft/gostats"
"google.golang.org/grpc"
Expand Down Expand Up @@ -39,6 +41,11 @@ type Server interface {
*/
Runtime() loader.IFace

/**
* Returns the configuration http provider for the server.
*/
HttpProvider() *httpprovider.HttpProvider

/**
* Stops serving the grpc port (for integration testing).
*/
Expand Down
76 changes: 48 additions & 28 deletions src/server/server_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"sync"
"syscall"
"time"

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
Expand All @@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"github.com/envoyproxy/ratelimit/src/httpprovider"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/settings"

Expand All @@ -60,6 +62,7 @@ type server struct {
store gostats.Store
scope gostats.Scope
runtime loader.IFace
httpProvider *httpprovider.HttpProvider
debugListener serverDebugListener
httpServer *http.Server
listenerMu sync.Mutex
Expand Down Expand Up @@ -151,6 +154,10 @@ func (server *server) Start() {
logger.Infof("Failed to start debug server '%+v'", err)
}()

if server.httpProvider != nil {
go server.httpProvider.Provide()
}

go server.startGrpc()

server.handleGracefulShutdown()
Expand Down Expand Up @@ -188,6 +195,10 @@ func (server *server) Runtime() loader.IFace {
return server.runtime
}

func (server *server) HttpProvider() *httpprovider.HttpProvider {
return server.httpProvider
}

func NewServer(s settings.Settings, name string, statsManager stats.Manager, localCache *freecache.Cache, opts ...settings.Option) Server {
return newServer(s, name, statsManager, localCache, opts...)
}
Expand Down Expand Up @@ -234,36 +245,42 @@ func newServer(s settings.Settings, name string, statsManager stats.Manager, loc
ret.store.AddStatGenerator(limiter.NewLocalCacheStats(localCache, ret.scope.Scope("localcache")))
}

// setup runtime
loaderOpts := make([]loader.Option, 0, 1)
if s.RuntimeIgnoreDotFiles {
loaderOpts = append(loaderOpts, loader.IgnoreDotFiles)
// setup configuration getter
if s.HttpProviderEnabled {
// setup http provider
ret.httpProvider = httpprovider.NewHttpProvider(s.HttpProviderEndpoint, s.HttpProviderSubpath, time.Duration(s.HttpProviderPollInterval)*time.Second, time.Duration(s.HttpProviderPollTimeout)*time.Second)
} else {
loaderOpts = append(loaderOpts, loader.AllowDotFiles)
}
var err error
if s.RuntimeWatchRoot {
ret.runtime, err = loader.New2(
s.RuntimePath,
s.RuntimeSubdirectory,
ret.store.ScopeWithTags("runtime", s.ExtraTags),
&loader.SymlinkRefresher{RuntimePath: s.RuntimePath},
loaderOpts...)
} else {
directoryRefresher := &loader.DirectoryRefresher{}
// Adding loader.Remove to the default set of goruntime's FileSystemOps.
directoryRefresher.WatchFileSystemOps(loader.Remove, loader.Write, loader.Create, loader.Chmod)

ret.runtime, err = loader.New2(
filepath.Join(s.RuntimePath, s.RuntimeSubdirectory),
"config",
ret.store.ScopeWithTags("runtime", s.ExtraTags),
directoryRefresher,
loaderOpts...)
}
// setup runtime
loaderOpts := make([]loader.Option, 0, 1)
if s.RuntimeIgnoreDotFiles {
loaderOpts = append(loaderOpts, loader.IgnoreDotFiles)
} else {
loaderOpts = append(loaderOpts, loader.AllowDotFiles)
}
var err error
if s.RuntimeWatchRoot {
ret.runtime, err = loader.New2(
s.RuntimePath,
s.RuntimeSubdirectory,
ret.store.ScopeWithTags("runtime", s.ExtraTags),
&loader.SymlinkRefresher{RuntimePath: s.RuntimePath},
loaderOpts...)
} else {
directoryRefresher := &loader.DirectoryRefresher{}
// Adding loader.Remove to the default set of goruntime's FileSystemOps.
directoryRefresher.WatchFileSystemOps(loader.Remove, loader.Write, loader.Create, loader.Chmod)

ret.runtime, err = loader.New2(
filepath.Join(s.RuntimePath, s.RuntimeSubdirectory),
"config",
ret.store.ScopeWithTags("runtime", s.ExtraTags),
directoryRefresher,
loaderOpts...)
}

if err != nil {
panic(err)
if err != nil {
panic(err)
}
}

// setup http router
Expand Down Expand Up @@ -331,6 +348,9 @@ func newServer(s settings.Settings, name string, statsManager stats.Manager, loc

func (server *server) Stop() {
server.grpcServer.GracefulStop()
if server.httpProvider != nil {
server.httpProvider.Stop()
}
server.listenerMu.Lock()
defer server.listenerMu.Unlock()
if server.debugListener.listener != nil {
Expand Down
61 changes: 37 additions & 24 deletions src/service/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,20 @@ type service struct {
globalShadowMode bool
}

func (this *service) reloadConfig(statsManager stats.Manager) {
func (this *service) loadRuntime() []config.RateLimitConfigToLoad {
files := []config.RateLimitConfigToLoad{}
snapshot := this.runtime.Snapshot()
for _, key := range snapshot.Keys() {
if this.runtimeWatchRoot && !strings.HasPrefix(key, "config.") {
continue
}

files = append(files, config.RateLimitConfigToLoad{key, snapshot.Get(key)})
}
return files
}

func (this *service) reloadConfig(statsManager stats.Manager, files []config.RateLimitConfigToLoad) {
defer func() {
if e := recover(); e != nil {
configError, ok := e.(config.RateLimitConfigError)
Expand All @@ -65,16 +78,6 @@ func (this *service) reloadConfig(statsManager stats.Manager) {
}
}()

files := []config.RateLimitConfigToLoad{}
snapshot := this.runtime.Snapshot()
for _, key := range snapshot.Keys() {
if this.runtimeWatchRoot && !strings.HasPrefix(key, "config.") {
continue
}

files = append(files, config.RateLimitConfigToLoad{key, snapshot.Get(key)})
}

rlSettings := settings.NewSettings()
newConfig := this.configLoader.Load(files, statsManager, rlSettings.MergeDomainConfigurations)
this.stats.ConfigLoadSuccess.Inc()
Expand Down Expand Up @@ -313,7 +316,7 @@ func (this *service) GetCurrentConfig() config.RateLimitConfig {
}

func NewService(runtime loader.IFace, cache limiter.RateLimitCache,
configLoader config.RateLimitConfigLoader, statsManager stats.Manager, runtimeWatchRoot bool, clock utils.TimeSource, shadowMode bool) RateLimitServiceServer {
configLoader config.RateLimitConfigLoader, statsManager stats.Manager, httpProviderChan chan []config.RateLimitConfigToLoad, httpProviderEnabled bool, runtimeWatchRoot bool, clock utils.TimeSource, shadowMode bool) RateLimitServiceServer {

newService := &service{
runtime: runtime,
Expand All @@ -328,18 +331,28 @@ func NewService(runtime loader.IFace, cache limiter.RateLimitCache,
customHeaderClock: clock,
}

runtime.AddUpdateCallback(newService.runtimeUpdateEvent)

newService.reloadConfig(statsManager)
go func() {
// No exit right now.
for {
logger.Debugf("waiting for runtime update")
<-newService.runtimeUpdateEvent
logger.Debugf("got runtime update and reloading config")
newService.reloadConfig(statsManager)
}
}()
if httpProviderEnabled {
go func() {
for {
logger.Debugf("waiting for http provider update")
configs := <-httpProviderChan
logger.Debugf("got http provider update and reloading config")
newService.reloadConfig(statsManager, configs)
}
}()
} else {
runtime.AddUpdateCallback(newService.runtimeUpdateEvent)
newService.reloadConfig(statsManager, newService.loadRuntime())
go func() {
// No exit right now.
for {
logger.Debugf("waiting for runtime update")
<-newService.runtimeUpdateEvent
logger.Debugf("got runtime update and reloading config")
newService.reloadConfig(statsManager, newService.loadRuntime())
}
}()
}

return newService
}
9 changes: 9 additions & 0 deletions src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,20 @@ func (runner *Runner) Run() {
runner.srv = srv
runner.mu.Unlock()

var httpProviderChan chan []config.RateLimitConfigToLoad
if s.HttpProviderEnabled {
httpProviderChan = srv.HttpProvider().ConfigChan
} else {
httpProviderChan = make(chan []config.RateLimitConfigToLoad)
}

service := ratelimit.NewService(
srv.Runtime(),
createLimiter(srv, s, localCache, runner.statsManager),
config.NewRateLimitConfigLoaderImpl(),
runner.statsManager,
httpProviderChan,
s.HttpProviderEnabled,
s.RuntimeWatchRoot,
utils.NewTimeSourceImpl(),
s.GlobalShadowMode,
Expand Down
7 changes: 7 additions & 0 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ type Settings struct {
RuntimeIgnoreDotFiles bool `envconfig:"RUNTIME_IGNOREDOTFILES" default:"false"`
RuntimeWatchRoot bool `envconfig:"RUNTIME_WATCH_ROOT" default:"true"`

// Settings for configuraiton http provider
HttpProviderEnabled bool `envconfig:"HTTP_PROVIDER_ENABLED" default:"false"`
HttpProviderEndpoint string `envconfig:"HTTP_PROVIDER_ENDPOINT"`
HttpProviderSubpath []string `envconfig:"HTTP_PROVIDER_SUBPATH" default:""`
HttpProviderPollInterval int `envconfig:"HTTP_PROVIDER_POLL_INTERVAL" default:"10"`
HttpProviderPollTimeout int `envconfig:"HTTP_PROVIDER_POLL_TIMEOUT" default:"10"`

// Settings for all cache types
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"`
Expand Down
Loading

0 comments on commit 2060c5e

Please sign in to comment.