Skip to content

Commit

Permalink
bugfix: watcher events loss
Browse files Browse the repository at this point in the history
Watchers may behave incorrectly if a request timeout is too small. A
re-IPROTO_WATCH request may be not send to a server. It could lead to
loss of events stream.

It also could lead to a lost IPROTO_UNREGISTER request, but a user
won't see the problem.

Closes #284
  • Loading branch information
oleg-jukovec committed May 10, 2023
1 parent 986a9a1 commit a76dcd6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Several non-critical data race issues (#218)
- ConnectionPool does not properly handle disconnection with Opts.Reconnect
set (#272)
- Watcher events loss with a small per-request timeout (#284)

## [1.10.0] - 2022-12-31

Expand Down
14 changes: 11 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,10 +1456,13 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
st <- state

if sendAck {
conn.Do(newWatchRequest(key)).Get()
// We expect a reconnect and re-subscribe if it fails to
// send the watch request. So it looks ok do not check a
// result.
// result. But we need to make sure that the re-watch
// request will not be finished by a small per-request
// timeout.
req := newWatchRequest(key).Context(context.Background())
conn.Do(req).Get()
}
}

Expand All @@ -1477,7 +1480,12 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
if !conn.ClosedNow() {
// conn.ClosedNow() check is a workaround for calling
// Unregister from connectionClose().
conn.Do(newUnwatchRequest(key)).Get()
//
// We need to make sure that the unwatch request will
// not be finished by a small per-request timeout to
// avoid lost of the request.
req := newUnwatchRequest(key).Context(context.Background())
conn.Do(req).Get()
}
conn.watchMap.Delete(key)
close(state.unready)
Expand Down

0 comments on commit a76dcd6

Please sign in to comment.