Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 5eb6633

Browse files
committed
fix watching with a specified resource version
The watch code reset the version to the last found in the response. When you first list existing objects and then start watching from that resource version the existing versions are older than the version you wanted and the watch starts from the wrong version after the first restart. This leads to for example already deleted objects ending in the stream again. Fix this by not resetting to an older version than the one specified in the watch. It does not handle overflows of the resource version but they are 64 bit integers so they should not realistically overflow even in the most loaded clusters. Closes kubernetes-client/python#700
1 parent 5c242ea commit 5eb6633

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

watch/watch.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,14 @@ def unmarshal_event(self, data, return_type):
8383
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
8484
js['object'] = self._api_client.deserialize(obj, return_type)
8585
if hasattr(js['object'], 'metadata'):
86-
self.resource_version = js['object'].metadata.resource_version
86+
self.resource_version = int(
87+
js['object'].metadata.resource_version)
8788
# For custom objects that we don't have model defined, json
8889
# deserialization results in dictionary
8990
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
9091
and 'resourceVersion' in js['object']['metadata']):
91-
self.resource_version = js['object']['metadata'][
92-
'resourceVersion']
92+
self.resource_version = int(
93+
js['object']['metadata']['resourceVersion'])
9394
return js
9495

9596
def stream(self, func, *args, **kwargs):
@@ -122,6 +123,7 @@ def stream(self, func, *args, **kwargs):
122123
return_type = self.get_return_type(func)
123124
kwargs['watch'] = True
124125
kwargs['_preload_content'] = False
126+
min_resource_version = int(kwargs.get('resource_version', 0))
125127

126128
timeouts = ('timeout_seconds' in kwargs)
127129
while True:
@@ -132,7 +134,13 @@ def stream(self, func, *args, **kwargs):
132134
if self._stop:
133135
break
134136
finally:
135-
kwargs['resource_version'] = self.resource_version
137+
# if the existing objects are older than the requested version
138+
# continue to watch from the requested resource version
139+
# does not handle overflow though that should take a few
140+
# hundred years
141+
kwargs['resource_version'] = max(
142+
self.resource_version, min_resource_version
143+
)
136144
resp.close()
137145
resp.release_conn()
138146

watch/watch_test.py

+38-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def test_watch_with_decode(self):
4949
# make sure decoder worked and updated Watch.resource_version
5050
self.assertEqual(
5151
"%d" % count, e['object'].metadata.resource_version)
52-
self.assertEqual("%d" % count, w.resource_version)
52+
self.assertEqual(count, w.resource_version)
5353
count += 1
5454
# make sure we can stop the watch and the last event with won't be
5555
# returned
@@ -62,6 +62,42 @@ def test_watch_with_decode(self):
6262
fake_resp.close.assert_called_once()
6363
fake_resp.release_conn.assert_called_once()
6464

65+
def test_watch_resource_version_set(self):
66+
#
67+
fake_resp = Mock()
68+
fake_resp.close = Mock()
69+
fake_resp.release_conn = Mock()
70+
values = [
71+
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
72+
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
73+
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
74+
'"resourceVersion": "2"}, "spec": {}, "sta',
75+
'tus": {}}}\n'
76+
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
77+
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
78+
]
79+
fake_resp.read_chunked = Mock(
80+
return_value=values)
81+
82+
fake_api = Mock()
83+
fake_api.get_namespaces = Mock(return_value=fake_resp)
84+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
85+
86+
w = Watch()
87+
count = 1
88+
# ensure we keep our requested resource version when the existing
89+
# versions are older than the requested version
90+
# needed for the list existing objects, then watch from there use case
91+
for e in w.stream(fake_api.get_namespaces, resource_version=5):
92+
count += 1
93+
if count % 3 == 0:
94+
fake_api.get_namespaces.assert_called_once_with(
95+
_preload_content=False, watch=True, resource_version=5)
96+
fake_api.get_namespaces.reset_mock()
97+
# returned
98+
if count == len(values) * 3:
99+
w.stop()
100+
65101
def test_watch_stream_twice(self):
66102
w = Watch(float)
67103
for step in ['first', 'second']:
@@ -148,7 +184,7 @@ def test_unmarshal_with_custom_object(self):
148184
# Watch.resource_version
149185
self.assertTrue(isinstance(event['object'], dict))
150186
self.assertEqual("1", event['object']['metadata']['resourceVersion'])
151-
self.assertEqual("1", w.resource_version)
187+
self.assertEqual(1, w.resource_version)
152188

153189
def test_watch_with_exception(self):
154190
fake_resp = Mock()

0 commit comments

Comments
 (0)