Skip to content

Commit 83b7f49

Browse files
djshow832xhebox
authored andcommitted
router: refresh backends when the backends are empty (pingcap#173)
1 parent d538835 commit 83b7f49

File tree

4 files changed

+67
-1
lines changed

4 files changed

+67
-1
lines changed

pkg/manager/router/backend_observer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ type BackendObserver struct {
122122
eventReceiver BackendEventReceiver
123123
wg waitgroup.WaitGroup
124124
cancelFunc context.CancelFunc
125+
refreshChan chan struct{}
125126
}
126127

127128
// StartBackendObserver creates a BackendObserver and starts watching.
@@ -152,6 +153,7 @@ func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver,
152153
httpCli: httpCli,
153154
httpTLS: httpTLS,
154155
eventReceiver: eventReceiver,
156+
refreshChan: make(chan struct{}),
155157
}
156158
bo.fetcher = backendFetcher
157159
return bo, nil
@@ -166,6 +168,15 @@ func (bo *BackendObserver) Start() {
166168
})
167169
}
168170

171+
// Refresh indicates the observer to refresh immediately.
172+
func (bo *BackendObserver) Refresh() {
173+
// If the observer happens to be refreshing, skip this round.
174+
select {
175+
case bo.refreshChan <- struct{}{}:
176+
default:
177+
}
178+
}
179+
169180
func (bo *BackendObserver) observe(ctx context.Context) {
170181
for ctx.Err() == nil {
171182
backendInfo := bo.fetcher.GetBackendList(ctx)
@@ -176,6 +187,7 @@ func (bo *BackendObserver) observe(ctx context.Context) {
176187
bo.notifyIfChanged(backendStatus)
177188
select {
178189
case <-time.After(bo.config.healthCheckInterval):
190+
case <-bo.refreshChan:
179191
case <-ctx.Done():
180192
return
181193
}

pkg/manager/router/router_score.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) string {
9090
return backend.addr
9191
}
9292
}
93+
// No available backends, maybe the health check result is outdated during rolling restart.
94+
// Refresh the backends asynchronously in this case.
95+
if router.observer != nil {
96+
router.observer.Refresh()
97+
}
9398
return ""
9499
}
95100

pkg/manager/router/router_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,3 +668,52 @@ func TestConcurrency(t *testing.T) {
668668
cancel()
669669
router.Close()
670670
}
671+
672+
// Test that the backends are refreshed immediately after it's empty.
673+
func TestRefresh(t *testing.T) {
674+
backends := make([]string, 0)
675+
var m sync.Mutex
676+
fetcher := NewExternalFetcher(func() []string {
677+
m.Lock()
678+
defer m.Unlock()
679+
return backends
680+
})
681+
// Create a router with a very long health check interval.
682+
lg := logger.CreateLoggerForTest(t)
683+
rt := &ScoreBasedRouter{
684+
logger: lg,
685+
backends: list.New(),
686+
}
687+
cfg := NewDefaultHealthCheckConfig()
688+
cfg.healthCheckInterval = time.Minute
689+
observer, err := StartBackendObserver(lg, rt, nil, cfg, fetcher)
690+
require.NoError(t, err)
691+
rt.Lock()
692+
rt.observer = observer
693+
rt.Unlock()
694+
defer rt.Close()
695+
// The initial backends are empty.
696+
selector := rt.GetBackendSelector()
697+
addr := selector.Next()
698+
require.True(t, len(addr) == 0)
699+
// Create a new backend and add to the list.
700+
server := newBackendServer(t)
701+
m.Lock()
702+
backends = append(backends, server.sqlAddr)
703+
m.Unlock()
704+
defer server.close()
705+
// The backends are refreshed very soon.
706+
timer := time.NewTimer(3 * time.Second)
707+
defer timer.Stop()
708+
for {
709+
select {
710+
case <-timer.C:
711+
t.Fatal("timeout")
712+
case <-time.After(100 * time.Millisecond):
713+
addr = selector.Next()
714+
if len(addr) > 0 {
715+
return
716+
}
717+
}
718+
}
719+
}

pkg/proxy/backend/backend_conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
)
2424

2525
const (
26-
DialTimeout = 5 * time.Second
26+
DialTimeout = 2 * time.Second
2727
)
2828

2929
type BackendConnection struct {

0 commit comments

Comments
 (0)