From 79a58979a346dc4106d3b5bd93c9af3bcb93d480 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Julian=20T=C3=B6lle?= <julian.toelle@hetzner-cloud.de>
Date: Tue, 2 Jul 2024 15:32:10 +0200
Subject: [PATCH] fix(routes): many requests for outdated routes by rate
 limiting

Fixes #673

In `routes.ListRoutes()` we have to find the matching server/node for
every route in the network. We find the server by utilizing a cache that
maps every private IP to the corresponding server.

This cache has a feature that refreshes the list of servers if an entry
can not be found. This is sensible, as the server might have been just
created. This is also fatal, as this refresh happens for every single
cache access. If there are a lot of routes in the network that do not
belong to any server, we refresh the cache many times for each
`ListRoutes()`. This is even more serious, as `ListRoutes()` is being
called every 10-30 seconds (see #395).

This commit introduces a `rate.Limiter` in the `AllServersCache` which
only allows the refresh to happen every 30 seconds.
---
 go.mod                        |   2 +-
 go.sum                        |   5 --
 hcloud/routes.go              |  10 ++-
 internal/hcops/server.go      |   7 ++-
 internal/hcops/server_test.go | 112 +++++++++++++++++++++++++++++++++-
 5 files changed, 125 insertions(+), 11 deletions(-)

diff --git a/go.mod b/go.mod
index a53552314..e4944a346 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
 	github.com/spf13/pflag v1.0.5
 	github.com/stretchr/testify v1.9.0
 	github.com/syself/hrobot-go v0.2.5
+	golang.org/x/time v0.3.0
 	k8s.io/api v0.29.1
 	k8s.io/apimachinery v0.29.1
 	k8s.io/client-go v0.29.1
@@ -88,7 +89,6 @@ require (
 	golang.org/x/sys v0.16.0 // indirect
 	golang.org/x/term v0.16.0 // indirect
 	golang.org/x/text v0.14.0 // indirect
-	golang.org/x/time v0.3.0 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
 	google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e // indirect
diff --git a/go.sum b/go.sum
index 8d1d9e0b0..b84ab2687 100644
--- a/go.sum
+++ b/go.sum
@@ -168,7 +168,6 @@ github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ai
 github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
-github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
 github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
 github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
 github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
@@ -179,8 +178,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
-github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
-github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
 github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/syself/hrobot-go v0.2.5 h1:Zs7GDFRd6fDn4YHYE9e5CGtRm6KYmMZwMMnm7OC/09g=
@@ -334,8 +331,6 @@ k8s.io/component-helpers v0.29.1 h1:54MMEDu6xeJmMtAKztsPwu0kJKr4+jCUzaEIn2UXRoc=
 k8s.io/component-helpers v0.29.1/go.mod h1:+I7xz4kfUgxWAPJIVKrqe4ml4rb9UGpazlOmhXYo+cY=
 k8s.io/controller-manager v0.29.1 h1:bTnJFF/OWooRVeJ4QLA1ApuPH+fjHSmcVMMeL7qvI2E=
 k8s.io/controller-manager v0.29.1/go.mod h1:fVhGGuBiB0B2yT2+OHXZaA88owVn5zkv18A+G9E9Qlw=
-k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
-k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
 k8s.io/klog/v2 v2.130.0 h1:5nB3+3HpqKqXJIXNtJdtxcDCfaa9KL8StJgMzGJkUkM=
 k8s.io/klog/v2 v2.130.0/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
 k8s.io/kms v0.29.1 h1:6dMOaxllwiAZ8p3Hys65b78MDG+hONpBBpk1rQsaEtk=
diff --git a/hcloud/routes.go b/hcloud/routes.go
index bcb8ff155..fd2b7054c 100644
--- a/hcloud/routes.go
+++ b/hcloud/routes.go
@@ -7,6 +7,7 @@ import (
 	"net"
 	"time"
 
+	"golang.org/x/time/rate"
 	"k8s.io/apimachinery/pkg/types"
 	cloudprovider "k8s.io/cloud-provider"
 	"k8s.io/klog/v2"
@@ -16,6 +17,10 @@ import (
 	"github.com/hetznercloud/hcloud-go/v2/hcloud"
 )
 
+var (
+	serversCacheMissRefreshRate = rate.Every(30 * time.Second)
+)
+
 type routes struct {
 	client      *hcloud.Client
 	network     *hcloud.Network
@@ -40,8 +45,9 @@ func newRoutes(client *hcloud.Client, networkID int64) (*routes, error) {
 		serverCache: &hcops.AllServersCache{
 			// client.Server.All will load ALL the servers in the project, even those
 			// that are not part of the Kubernetes cluster.
-			LoadFunc: client.Server.All,
-			Network:  networkObj,
+			LoadFunc:                client.Server.All,
+			Network:                 networkObj,
+			CacheMissRefreshLimiter: rate.NewLimiter(serversCacheMissRefreshRate, 1),
 		},
 	}, nil
 }
diff --git a/internal/hcops/server.go b/internal/hcops/server.go
index ea3fcc6a7..2a712289d 100644
--- a/internal/hcops/server.go
+++ b/internal/hcops/server.go
@@ -7,6 +7,7 @@ import (
 	"sync"
 	"time"
 
+	"golang.org/x/time/rate"
 	"k8s.io/klog/v2"
 
 	"github.com/hetznercloud/hcloud-cloud-controller-manager/internal/metrics"
@@ -22,6 +23,8 @@ type AllServersCache struct {
 	LoadFunc    func(context.Context) ([]*hcloud.Server, error)
 	LoadTimeout time.Duration
 	MaxAge      time.Duration
+	// Set to limit the amount of refreshes due to cache misses
+	CacheMissRefreshLimiter *rate.Limiter
 
 	// If set, only IPs in this network will be considered for [ByPrivateIP]
 	Network *hcloud.Network
@@ -97,8 +100,8 @@ func (c *AllServersCache) getFromCache(retrieveFromCacheMaps func() (*hcloud.Ser
 		return server, nil
 	}
 
-	// If the server was not in the cache, we want to refresh if we did not already in this call.
-	if !cacheRefreshed {
+	// If the server was not in the cache, we want to refresh if we did not already in this call and if there is available limit.
+	if !cacheRefreshed && c.CacheMissRefreshLimiter.Allow() {
 		if err := c.refreshCache(); err != nil {
 			return nil, fmt.Errorf("%s: %w", op, err)
 		}
diff --git a/internal/hcops/server_test.go b/internal/hcops/server_test.go
index 7bf90d88c..fc4382406 100644
--- a/internal/hcops/server_test.go
+++ b/internal/hcops/server_test.go
@@ -9,6 +9,7 @@ import (
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
+	"golang.org/x/time/rate"
 
 	"github.com/hetznercloud/hcloud-cloud-controller-manager/internal/hcops"
 	"github.com/hetznercloud/hcloud-cloud-controller-manager/internal/mocks"
@@ -150,6 +151,81 @@ func TestAllServersCache_CacheRefresh(t *testing.T) {
 	runAllServersCacheTests(t, "Cache refresh", tmpl, cacheOps)
 }
 
+func TestAllServersCache_CacheMissRefresh(t *testing.T) {
+	srv := &hcloud.Server{
+		ID:   1337,
+		Name: "cache-miss-refresh",
+		PrivateNet: []hcloud.ServerPrivateNet{
+			{
+				IP: net.ParseIP("10.0.0.9"),
+			},
+		},
+	}
+	cacheOps := newAllServersCacheOps(t, srv)
+	tmpl := allServersCacheTestCase{
+		SetUp: func(t *testing.T, tt *allServersCacheTestCase) {
+			tt.ServerClient.
+				On("All", mock.Anything).
+				Return([]*hcloud.Server{}, nil).Once()
+
+			tt.ServerClient.
+				On("All", mock.Anything).
+				Return([]*hcloud.Server{srv}, nil).Once()
+
+			// Setup initial cache
+			result, err := tt.Cache.ByName(srv.Name)
+			assert.Error(t, err)
+			assert.Nil(t, result)
+		},
+		Assert: func(t *testing.T, tt *allServersCacheTestCase) {
+			// All must be called only twice. Once during set-up because the cache is expired,
+			// and then again during the cache retrieval because the server was not found.
+			tt.ServerClient.AssertNumberOfCalls(t, "All", 2)
+		},
+		Expected: srv,
+	}
+
+	runAllServersCacheTests(t, "Cache refresh", tmpl, cacheOps)
+}
+
+func TestAllServersCache_CacheRefreshLimited(t *testing.T) {
+	srv := &hcloud.Server{
+		ID:   1337,
+		Name: "cache-refresh-limited",
+		PrivateNet: []hcloud.ServerPrivateNet{
+			{
+				IP: net.ParseIP("10.0.0.9"),
+			},
+		},
+	}
+	cacheOps := newAllServersCacheOps(t, srv)
+	tmpl := allServersCacheTestCase{
+		SetUp: func(t *testing.T, tt *allServersCacheTestCase) {
+			tt.ServerClient.
+				On("All", mock.Anything).
+				Return([]*hcloud.Server{}, nil)
+
+			result, err := tt.Cache.ByName(srv.Name)
+			assert.Error(t, err)
+			assert.Nil(t, result)
+
+			result, err = tt.Cache.ByName(srv.Name)
+			assert.Error(t, err)
+			assert.Nil(t, result)
+		},
+		Assert: func(t *testing.T, tt *allServersCacheTestCase) {
+			// All must be called only twice. Once during set-up because the cache is expired,
+			// and then again during set-up to trigger the limiter.
+
+			tt.ServerClient.AssertNumberOfCalls(t, "All", 2)
+		},
+		Expected:    nil,
+		ExpectedErr: hcops.ErrNotFound,
+	}
+
+	runAllServersCacheTests(t, "Cache refresh", tmpl, cacheOps)
+}
+
 func TestAllServersCache_NotFound(t *testing.T) {
 	srv := &hcloud.Server{
 		ID:   101010,
@@ -197,6 +273,39 @@ func TestAllServersCache_ClientError(t *testing.T) {
 	runAllServersCacheTests(t, "Not found", tmpl, cacheOps)
 }
 
+func TestAllServersCache_CacheMissRefreshClientError(t *testing.T) {
+	expectedErr := errors.New("cache-miss-refresh-client-error")
+	srv := &hcloud.Server{
+		ID:   202020,
+		Name: "cache-miss-refresh-client-error",
+		PrivateNet: []hcloud.ServerPrivateNet{
+			{
+				IP: net.ParseIP("10.0.0.5"),
+			},
+		},
+	}
+	cacheOps := newAllServersCacheOps(t, srv)
+	tmpl := allServersCacheTestCase{
+		SetUp: func(_ *testing.T, tt *allServersCacheTestCase) {
+			tt.ServerClient.
+				On("All", mock.Anything).
+				Return([]*hcloud.Server{}, nil).Once()
+
+			// Load the cache once
+			result, err := tt.Cache.ByName(srv.Name)
+			assert.Error(t, err)
+			assert.Nil(t, result)
+
+			tt.ServerClient.
+				On("All", mock.Anything).
+				Return(nil, expectedErr).Once()
+		},
+		ExpectedErr: expectedErr,
+	}
+
+	runAllServersCacheTests(t, "Not found", tmpl, cacheOps)
+}
+
 func TestAllServersCache_DuplicatePrivateIP(t *testing.T) {
 	// Regression test for https://github.com/hetznercloud/hcloud-cloud-controller-manager/issues/470
 
@@ -284,7 +393,8 @@ type allServersCacheTestCase struct {
 func (tt *allServersCacheTestCase) run(t *testing.T) {
 	tt.ServerClient = mocks.NewServerClient(t)
 	tt.Cache = &hcops.AllServersCache{
-		LoadFunc: tt.ServerClient.All,
+		LoadFunc:                tt.ServerClient.All,
+		CacheMissRefreshLimiter: rate.NewLimiter(rate.Every(1*time.Minute), 1),
 	}
 
 	if tt.SetUp != nil {