20
20
from kubernetes import client
21
21
22
22
PYDOC_RETURN_LABEL = ":return:"
23
+ PYDOC_FOLLOW_PARAM = ":param bool follow:"
23
24
24
25
# Removing this suffix from return type name should give us event's object
25
26
# type. e.g., if list_namespaces() returns "NamespaceList" type,
@@ -65,7 +66,7 @@ def __init__(self, return_type=None):
65
66
self ._raw_return_type = return_type
66
67
self ._stop = False
67
68
self ._api_client = client .ApiClient ()
68
- self .resource_version = 0
69
+ self .resource_version = None
69
70
70
71
def stop (self ):
71
72
self ._stop = True
@@ -78,8 +79,17 @@ def get_return_type(self, func):
78
79
return return_type [:- len (TYPE_LIST_SUFFIX )]
79
80
return return_type
80
81
82
+ def get_watch_argument_name (self , func ):
83
+ if PYDOC_FOLLOW_PARAM in pydoc .getdoc (func ):
84
+ return 'follow'
85
+ else :
86
+ return 'watch'
87
+
81
88
def unmarshal_event (self , data , return_type ):
82
- js = json .loads (data )
89
+ try :
90
+ js = json .loads (data )
91
+ except ValueError :
92
+ return data
83
93
js ['raw_object' ] = js ['object' ]
84
94
if return_type :
85
95
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
@@ -122,7 +132,7 @@ def stream(self, func, *args, **kwargs):
122
132
123
133
self ._stop = False
124
134
return_type = self .get_return_type (func )
125
- kwargs ['watch' ] = True
135
+ kwargs [self . get_watch_argument_name ( func ) ] = True
126
136
kwargs ['_preload_content' ] = False
127
137
if 'resource_version' in kwargs :
128
138
self .resource_version = kwargs ['resource_version' ]
@@ -136,9 +146,12 @@ def stream(self, func, *args, **kwargs):
136
146
if self ._stop :
137
147
break
138
148
finally :
139
- kwargs ['resource_version' ] = self .resource_version
140
149
resp .close ()
141
150
resp .release_conn ()
151
+ if self .resource_version is not None :
152
+ kwargs ['resource_version' ] = self .resource_version
153
+ else :
154
+ self ._stop = True
142
155
143
156
if timeouts or self ._stop :
144
157
break
0 commit comments