diff --git a/CHANGELOG.md b/CHANGELOG.md index 24e7c5f65..80fc40f5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Several non-critical data race issues (#218) - ConnectionPool does not properly handle disconnection with Opts.Reconnect set (#272) +- Watcher events loss with a small per-request timeout (#284) ## [1.10.0] - 2022-12-31 diff --git a/connection.go b/connection.go index 47505dfa7..65d21365c 100644 --- a/connection.go +++ b/connection.go @@ -1456,10 +1456,13 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc st <- state if sendAck { - conn.Do(newWatchRequest(key)).Get() // We expect a reconnect and re-subscribe if it fails to // send the watch request. So it looks ok do not check a - // result. + // result. But we need to make sure that the re-watch + // request will not be finished by a small per-request + // timeout. + req := newWatchRequest(key).Context(context.Background()) + conn.Do(req).Get() } } @@ -1477,7 +1480,12 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc if !conn.ClosedNow() { // conn.ClosedNow() check is a workaround for calling // Unregister from connectionClose(). - conn.Do(newUnwatchRequest(key)).Get() + // + // We need to make sure that the unwatch request will + // not be finished by a small per-request timeout to + // avoid lost of the request. + req := newUnwatchRequest(key).Context(context.Background()) + conn.Do(req).Get() } conn.watchMap.Delete(key) close(state.unready)