-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: simplify watcher synchronization #6525
Conversation
Do we need to backport this then? |
@gyuho probably, it fixes the new test case |
@@ -284,6 +284,9 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch | |||
if ok { | |||
select { | |||
case ret := <-retc: | |||
if ret == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen? Can we add some comment? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this happens if retc is closed, I changed it to ret, ok := <-retc
so it's clearer that's what it's checking
@@ -118,8 +118,8 @@ type watchGrpcStream struct { | |||
|
|||
// mu protects the streams map | |||
mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that this mu also protects closeErr?
@@ -118,8 +118,8 @@ type watchGrpcStream struct { | |||
|
|||
// mu protects the streams map | |||
mu sync.RWMutex | |||
// streams holds all active watchers | |||
streams map[int64]*watcherStream | |||
// substreams holds all active watchers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
holds all active gRPC streams for watchers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's in watcher.streams
, this tracks the watch id's that are in a single grpc stream (hence the rename from streams to substreams, using "streams" twice is super confusing). Will clarify the comments a bit.
w.mu.Lock() | ||
w.streams[ws.id] = ws | ||
w.mu.Unlock() | ||
w.substreams[ws.id] = ws |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we drop the lock here?
} | ||
w.mu.Unlock() | ||
return empty | ||
delete(w.substreams, ws.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we drop the lock here?
w.closeStream(ws) | ||
} | ||
|
||
w.owner.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably add a delStream func on owner?
w.mu.RLock() | ||
defer w.mu.RUnlock() | ||
ws, ok := w.streams[pbresp.WatchId] | ||
ws, ok := w.substreams[pbresp.WatchId] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we drop the lock here?
@heyitsanthony OK. After read the whole thing, I understand that we want to serialize the closing event in the single run routine. Can we update the description of the mutex then? |
there's still some intrinsic raceiness between the resume path and stream cancelation, going to rework this a little bit more so the substream goroutines don't have to reason about resume |
40f82e0
to
a6606b1
Compare
Tore up the watcher code from the roots. The old code's resume path was totally broken and could livelock in some cases. Now, all watch registrations go through a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
donec chan struct{} | ||
// closing is set to true when stream should be scheduled to shutdown. | ||
closing bool | ||
// id is the registered watch id for on the grpc stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/for on/on/
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, will fix
@@ -314,12 +320,7 @@ func (w *watcher) Close() (err error) { | |||
} | |||
|
|||
func (w *watchGrpcStream) Close() (err error) { | |||
w.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q. Why do we remove this now? Was this causing any problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sharing was making it unnecessarily difficult to reason about the watcherStream
teardown path so now watcherStream
will post itself to watchGrpcStream.closingc
and watchGrpcStream.run
handles the final removal of watcherStream
resources.
6a3b548
to
a64d996
Compare
w.resumec = make(chan struct{}) | ||
w.joinSubstreams() | ||
for _, ws := range w.substreams { | ||
ws.id = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to update the donec of wc as what we do for the resuming streams at line 654?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but ws
is appended into resuming
and the following loop over resuming
picks it up, so it's unnecessary here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah. right.
// streams are marked as nil in the queue since the head must wait for its inflight registration. | ||
func (w *watchGrpcStream) nextResume() *watcherStream { | ||
for len(w.resuming) != 0 { | ||
if w.resuming[0] != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to handle the nil case outside the next resume? it seems like we can always remove the abandoned resuming at the caller side, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible, but the caller side would have to reimplement this loop every time. I'd rather have it done in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -283,7 +288,10 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch | |||
// receive channel | |||
if ok { | |||
select { | |||
case ret := <-retc: | |||
case ret, ok := <-wr.retc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments for this? it is not clear to me why do we need to call watch recursively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i failed to figure out where we might close retc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't any more; removed the close case
a64d996
to
a8aaefa
Compare
LGTM. Fix CI? |
Was more complicated than it needed to be and didn't really work in the first place. Restructured watcher registation to use a queue.
a8aaefa
to
5b50658
Compare
Was more complicated than it needed to be.
Also fixes clobbering id's on resume and losing watcher channels when watchers are disconnected and canceled.
/cc @hongchaodeng