Skip to content

Commit

Permalink
karmada-search: Fix lock race affects watch RestChan not close, causi…
Browse files Browse the repository at this point in the history
…ng client watch api to hang

Signed-off-by: xigang <wangxiang2014@gmail.com>
  • Loading branch information
xigang committed Nov 11, 2023
1 parent 1b2c6ed commit 249d571
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions pkg/search/proxy/store/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"sort"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -166,22 +167,47 @@ func (c *multiClusterContinue) String() string {
return base64.RawURLEncoding.EncodeToString(buf)
}

// nothing will ever be sent down this channel
var neverExitWatch <-chan time.Time = make(chan time.Time)

// timeoutFactory abstracts watch timeout logic for testing
type TimeoutFactory interface {
TimeoutCh() (<-chan time.Time, func() bool)
}

// realTimeoutFactory implements timeoutFactory
type realTimeoutFactory struct {
timeout time.Duration
}

// TimeoutCh returns a channel which will receive something when the watch times out,
// and a cleanup function to call when this happens.
func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
if w.timeout == 0 {
return neverExitWatch, func() bool { return false }
}
t := time.NewTimer(w.timeout)
return t.C, t.Stop
}

type decoratedWatcher struct {
watcher watch.Interface
decorator func(watch.Event)
}

type watchMux struct {
lock sync.RWMutex
sources []decoratedWatcher
result chan watch.Event
done chan struct{}
lock sync.RWMutex
sources []decoratedWatcher
result chan watch.Event
done chan struct{}
timeoutFactory TimeoutFactory
}

func newWatchMux() *watchMux {
return &watchMux{
result: make(chan watch.Event),
done: make(chan struct{}),
result: make(chan watch.Event, 100),
done: make(chan struct{}),
timeoutFactory: &realTimeoutFactory{timeout: time.Second * 10},
}
}

Expand Down Expand Up @@ -225,8 +251,12 @@ func (w *watchMux) Stop() {
}

func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch.Event)) {
defer func() { _ = recover() }()
defer source.Stop()
defer w.Stop()

timeoutCh, cleanup := w.timeoutFactory.TimeoutCh()
defer cleanup()
for {
var copyEvent watch.Event
select {
Expand Down Expand Up @@ -255,8 +285,9 @@ func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch
select {
case <-w.done:
return
default:
w.result <- copyEvent
case <-timeoutCh:
return
case w.result <- copyEvent:
}
}()
}
Expand Down Expand Up @@ -386,4 +417,4 @@ func BuildMultiClusterResourceVersion(clusterResourceMap map[string]string) stri
m.set(cluster, rv)
}
return m.String()
}
}

0 comments on commit 249d571

Please sign in to comment.