Skip to content

Commit 6f125e4

Browse files
Merge pull request #17049 from knobunc/bug/bz1471899-change-router-locking
Automatic merge from submit-queue. Change the router reload suppression so that it doesn't block updates Change the router reload suppression so that it doesn't block updates This changes the locking so that a reload doesn't hold a lock of the router object for the duration of the reload so that updates that happen while the router is reloading can be processed immediately and batched up, then included when the next reload occurs. Before this, if a reload ran longer than the reload interval, only one event would be processed before triggering a new reload. Which would then lock out other event processing. This caused the router to not make any meaningful progress consuming events. A new module to do the rate limiting has been added. The module has have a top and bottom half. The top half simply calls the bottom half with a flag indicating the user has made a change. The flag simply tells the bottom half to register the desire to reload (so we can do it under a single lock acquisition). The bottom half is in charge of determining if it can immediately reload or if it has to wait. If it must wait, then it works out the earliest time it can reload and schedules a callback to itself for that time. If it determines it can reload, then it runs the reload code immediately. When the reload is complete, it calls itself again to make sure there was no other pending reload that had come in while the reload was running. Whenever the bottom half calls itself, it does it without the flag indicating the user made a change. Fixes bug 1471899 -- https://bugzilla.redhat.com/show_bug.cgi?id=1471899 @openshift/networking PTAL
2 parents f12efea + dac5ce6 commit 6f125e4

File tree

6 files changed

+243
-27
lines changed

6 files changed

+243
-27
lines changed

pkg/router/template/fake.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ package templaterouter
55
func NewFakeTemplateRouter() *templateRouter {
66
fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{})
77
return &templateRouter{
8-
state: map[string]ServiceAliasConfig{},
9-
serviceUnits: make(map[string]ServiceUnit),
10-
certManager: fakeCertManager,
11-
rateLimitedCommitFunction: nil,
12-
rateLimitedCommitStopChannel: make(chan struct{}),
8+
state: map[string]ServiceAliasConfig{},
9+
serviceUnits: make(map[string]ServiceUnit),
10+
certManager: fakeCertManager,
11+
rateLimitedCommitFunction: nil,
1312
}
1413
}
1514

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package limiter
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/golang/glog"
8+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
9+
)
10+
11+
// HandlerFunc defines function signature for a CoalescingSerializingRateLimiter.
12+
type HandlerFunc func() error
13+
14+
// CoalescingSerializingRateLimiter guarantees that calls will not happen to the given function
15+
// more frequently than the given interval, and it guarantees that only one call will happen at a time.
16+
// The calls are not queued, i.e. if you make 5 calls to RegisterChange(), it does not guarantee that the
17+
// handler will be invoked 5 times, it merely guarantees it will be invoked once, and no more often than
18+
// the rate.
19+
// The calls to the handler will happen in the background and are expected to do their own locking if needed.
20+
type CoalescingSerializingRateLimiter struct {
21+
// handlerFunc is the function to rate limit and seriaize calls to.
22+
handlerFunc HandlerFunc
23+
24+
// callInterval is the minimum time between the starts of handler calls.
25+
callInterval time.Duration
26+
27+
// lastStart is the time the last run of the handler started.
28+
lastStart time.Time
29+
30+
// changeReqTime is nil if no change has been registered since the last handler run completed, otherwise it is the
31+
// time last change was registered.
32+
changeReqTime *time.Time
33+
34+
// handlerRunning indicates whether the Handler is actively running.
35+
handlerRunning bool
36+
37+
// lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once.
38+
lock sync.Mutex
39+
40+
// callbackTimer is the timer we use to make callbacks to re-run the function to decide if we need to do work.
41+
callbackTimer *time.Timer
42+
}
43+
44+
func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc HandlerFunc) *CoalescingSerializingRateLimiter {
45+
limiter := &CoalescingSerializingRateLimiter{
46+
handlerFunc: handlerFunc,
47+
callInterval: interval,
48+
lastStart: time.Time{},
49+
changeReqTime: nil,
50+
handlerRunning: false,
51+
}
52+
53+
return limiter
54+
}
55+
56+
// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within
57+
// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will
58+
// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will
59+
// only run once when the time allows it.
60+
func (csrl *CoalescingSerializingRateLimiter) RegisterChange() {
61+
glog.V(8).Infof("RegisterChange called")
62+
63+
csrl.changeWorker(true)
64+
}
65+
66+
func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) {
67+
csrl.lock.Lock()
68+
defer csrl.lock.Unlock()
69+
70+
glog.V(8).Infof("changeWorker called")
71+
72+
if userChanged && csrl.changeReqTime == nil {
73+
// They just registered a change manually (and we aren't in the middle of a change)
74+
now := time.Now()
75+
csrl.changeReqTime = &now
76+
}
77+
78+
if csrl.handlerRunning {
79+
// We don't need to do anything else... there's a run in progress, and when it is done it will re-call this function at which point the work will then happen
80+
glog.V(8).Infof("The handler was already running (%v) started at %s, returning from the worker", csrl.handlerRunning, csrl.lastStart.String())
81+
return
82+
}
83+
84+
if csrl.changeReqTime == nil {
85+
// There's no work queued so we have nothing to do. We should only get here when
86+
// the function is re-called after a reload
87+
glog.V(8).Infof("No invoke requested time, so there's no queued work. Nothing to do.")
88+
return
89+
}
90+
91+
// There is no handler running, let's see if we should run yet, or schedule a callback
92+
now := time.Now()
93+
sinceLastRun := now.Sub(csrl.lastStart)
94+
untilNextCallback := csrl.callInterval - sinceLastRun
95+
glog.V(8).Infof("Checking reload; now: %v, lastStart: %v, sinceLast %v, limit %v, remaining %v", now, csrl.lastStart, sinceLastRun, csrl.callInterval, untilNextCallback)
96+
97+
if untilNextCallback > 0 {
98+
// We want to reload... but can't yet because some window is not satisfied
99+
if csrl.callbackTimer == nil {
100+
csrl.callbackTimer = time.AfterFunc(untilNextCallback, func() { csrl.changeWorker(false) })
101+
} else {
102+
// While we are resetting the timer, it should have fired and be stopped.
103+
// The first time the worker is called it will know the precise duration
104+
// until when a run would be valid and has scheduled a timer for that point
105+
csrl.callbackTimer.Reset(untilNextCallback)
106+
}
107+
108+
glog.V(8).Infof("Can't invoke the handler yet, need to delay %s, callback scheduled", untilNextCallback.String())
109+
110+
return
111+
}
112+
113+
// Otherwise we can reload immediately... let's do it!
114+
glog.V(8).Infof("Calling the handler function (for invoke time %v)", csrl.changeReqTime)
115+
csrl.handlerRunning = true
116+
csrl.changeReqTime = nil
117+
csrl.lastStart = now
118+
119+
// Go run the handler so we don't block the caller
120+
go csrl.runHandler()
121+
122+
return
123+
}
124+
125+
func (csrl *CoalescingSerializingRateLimiter) runHandler() {
126+
// Call the handler, but do it in its own function so we can cleanup in case the handler panics
127+
runHandler := func() error {
128+
defer func() {
129+
csrl.lock.Lock()
130+
csrl.handlerRunning = false
131+
csrl.lock.Unlock()
132+
}()
133+
134+
return csrl.handlerFunc()
135+
}
136+
if err := runHandler(); err != nil {
137+
utilruntime.HandleError(err)
138+
}
139+
140+
// Re-call the commit in case there is work waiting that came in while we were working
141+
// we want to call the top level commit in case the state has not changed
142+
glog.V(8).Infof("Re-Calling the worker after a reload in case work came in")
143+
csrl.changeWorker(false)
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package limiter
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"testing"
7+
"time"
8+
)
9+
10+
type handler struct {
11+
count int
12+
sync.Mutex
13+
}
14+
15+
func (h *handler) handle() error {
16+
h.Lock()
17+
defer h.Unlock()
18+
h.count += 1
19+
return nil
20+
}
21+
22+
func (h *handler) counter() int {
23+
h.Lock()
24+
defer h.Unlock()
25+
return h.count
26+
}
27+
28+
func TestCoalescingSerializingRateLimiter(t *testing.T) {
29+
30+
fmt.Println("start")
31+
32+
tests := []struct {
33+
Name string
34+
Interval time.Duration
35+
Times int
36+
}{
37+
{
38+
Name: "3PO",
39+
Interval: 3 * time.Second,
40+
Times: 10,
41+
},
42+
{
43+
Name: "five-fer",
44+
Interval: 5 * time.Second,
45+
Times: 20,
46+
},
47+
{
48+
Name: "longjob",
49+
Interval: 2 * time.Second,
50+
Times: 20,
51+
},
52+
}
53+
54+
for _, tc := range tests {
55+
h := &handler{}
56+
rlf := NewCoalescingSerializingRateLimiter(tc.Interval, h.handle)
57+
58+
for i := 0; i < tc.Times; i++ {
59+
fmt.Println("start")
60+
rlf.RegisterChange()
61+
fmt.Println("end")
62+
}
63+
64+
select {
65+
case <-time.After(tc.Interval + 2*time.Second):
66+
fmt.Println("after")
67+
68+
counter := h.counter()
69+
if tc.Interval > 0 && counter >= tc.Times/2 {
70+
t.Errorf("For coalesced calls, expected number of invocations to be at least half. Expected: < %v Got: %v",
71+
tc.Times/2, counter)
72+
}
73+
}
74+
}
75+
}

pkg/router/template/router.go

+8-17
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
cmdutil "github.com/openshift/origin/pkg/cmd/util"
2323
routeapi "github.com/openshift/origin/pkg/route/apis/route"
2424
"github.com/openshift/origin/pkg/router/controller"
25-
"github.com/openshift/origin/pkg/util/ratelimiter"
25+
"github.com/openshift/origin/pkg/router/template/limiter"
2626
)
2727

2828
const (
@@ -88,9 +88,7 @@ type templateRouter struct {
8888
allowWildcardRoutes bool
8989
// rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend)
9090
// function that coalesces and controls how often the router is reloaded.
91-
rateLimitedCommitFunction *ratelimiter.RateLimitedFunction
92-
// rateLimitedCommitStopChannel is the stop/terminate channel.
93-
rateLimitedCommitStopChannel chan struct{}
91+
rateLimitedCommitFunction *limiter.CoalescingSerializingRateLimiter
9492
// lock is a mutex used to prevent concurrent router reloads.
9593
lock sync.Mutex
9694
// If true, haproxy should only bind ports when it has route and endpoint state
@@ -206,12 +204,10 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
206204
metricReload: metricsReload,
207205
metricWriteConfig: metricWriteConfig,
208206

209-
rateLimitedCommitFunction: nil,
210-
rateLimitedCommitStopChannel: make(chan struct{}),
207+
rateLimitedCommitFunction: nil,
211208
}
212209

213-
numSeconds := int(cfg.reloadInterval.Seconds())
214-
router.EnableRateLimiter(numSeconds, router.commitAndReload)
210+
router.EnableRateLimiter(cfg.reloadInterval, router.commitAndReload)
215211

216212
if err := router.writeDefaultCert(); err != nil {
217213
return nil, err
@@ -227,14 +223,9 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
227223
return router, nil
228224
}
229225

230-
func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) {
231-
keyFunc := func(_ interface{}) (string, error) {
232-
return "templaterouter", nil
233-
}
234-
235-
r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc)
236-
r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel)
237-
glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval)
226+
func (r *templateRouter) EnableRateLimiter(interval time.Duration, handlerFunc limiter.HandlerFunc) {
227+
r.rateLimitedCommitFunction = limiter.NewCoalescingSerializingRateLimiter(interval, handlerFunc)
228+
glog.V(2).Infof("Template router will coalesce reloads within %s of each other", interval.String())
238229
}
239230

240231
// secretToPem composes a PEM file at the output directory from an input private key and crt file.
@@ -328,7 +319,7 @@ func (r *templateRouter) Commit() {
328319
r.lock.Unlock()
329320

330321
if needsCommit {
331-
r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction)
322+
r.rateLimitedCommitFunction.RegisterChange()
332323
}
333324
}
334325

test/end-to-end/router_test.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -1348,10 +1348,15 @@ func createAndStartRouterContainerExtended(dockerCli *dockerClient.Client, maste
13481348
hostVols = append(hostVols, fmt.Sprintf("%[1]s:/usr/bin/openshift", binary))
13491349
}
13501350

1351+
logLevel := os.Getenv("OPENSHIFT_LOG_LEVEL")
1352+
if len(logLevel) == 0 {
1353+
logLevel = "4"
1354+
}
1355+
13511356
containerOpts := dockerClient.CreateContainerOptions{
13521357
Config: &dockerClient.Config{
13531358
Image: getRouterImage(),
1354-
Cmd: []string{"--master=" + masterIp, "--loglevel=4"},
1359+
Cmd: []string{"--master=" + masterIp, "--loglevel=" + logLevel},
13551360
Env: env,
13561361
ExposedPorts: exposedPorts,
13571362
VolumesFrom: vols,
@@ -1414,8 +1419,10 @@ func validateServer(server *tr.TestHttpService, t *testing.T) {
14141419

14151420
// cleanUp stops and removes the deployed router
14161421
func cleanUp(t *testing.T, dockerCli *dockerClient.Client, routerId string) {
1422+
getAllLogs, _ := strconv.ParseBool(os.Getenv("OPENSHIFT_GET_ALL_DOCKER_LOGS"))
1423+
14171424
dockerCli.StopContainer(routerId, 5)
1418-
if t.Failed() {
1425+
if t.Failed() || getAllLogs {
14191426
dockerCli.Logs(dockerClient.LogsOptions{
14201427
Container: routerId,
14211428
OutputStream: os.Stdout,

test/integration/router_without_haproxy_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func stressRouter(t *testing.T, namespaceCount, routesPerNamespace, routerCount,
142142
plugins := []*templateplugin.TemplatePlugin{}
143143

144144
// Don't coalesce reloads to validate reload suppression during sync.
145-
reloadInterval := 0
145+
reloadInterval := 0 * time.Second
146146

147147
// Track reload counts indexed by router name.
148148
reloadedMap := make(map[string]int)
@@ -364,7 +364,7 @@ func (p *DelayPlugin) Commit() error {
364364

365365
// launchRateLimitedRouter launches a rate-limited template router
366366
// that communicates with the api via the provided clients.
367-
func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval int, reloadedMap map[string]int) *templateplugin.TemplatePlugin {
367+
func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval time.Duration, reloadedMap map[string]int) *templateplugin.TemplatePlugin {
368368
reloadedMap[name] = 0
369369
rateLimitingFunc := func() error {
370370
reloadedMap[name] += 1
@@ -385,7 +385,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In
385385
return templatePlugin
386386
}
387387

388-
func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) {
388+
func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval time.Duration, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) {
389389
r := templateplugin.NewFakeTemplateRouter()
390390

391391
r.EnableRateLimiter(reloadInterval, func() error {

0 commit comments

Comments
 (0)