From 324b1305491bde1fdd56e9af2f6a3e326a7d7947 Mon Sep 17 00:00:00 2001 From: Kevinz Date: Fri, 7 Mar 2025 11:41:37 +0800 Subject: [PATCH 01/12] feat: add switch control when watching events whether deserialization is required --- kubernetes/base/watch/watch.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index da81f97029..2461d4dfe6 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -172,6 +172,7 @@ def stream(self, func, *args, **kwargs): # We want to ensure we are returning within that timeout. disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False + deserialize = kwargs.pop('deserialize', True) while True: resp = func(*args, **kwargs) try: @@ -179,7 +180,11 @@ def stream(self, func, *args, **kwargs): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log if watch_arg == "watch": - event = self.unmarshal_event(line, return_type) + if deserialize: + event = self.unmarshal_event(line, return_type) + else: + # Only do basic JSON parsing, no deserialize + event = json.loads(line) if isinstance(event, dict) \ and event['type'] == 'ERROR': obj = event['raw_object'] From 0c352c4295111935fe4966fbbe6fee7f383086f4 Mon Sep 17 00:00:00 2001 From: Kevinz Date: Fri, 14 Mar 2025 12:12:45 +0800 Subject: [PATCH 02/12] feat: set default deserialize value is False and add test code --- kubernetes/base/watch/watch.py | 2 +- kubernetes/base/watch/watch_test.py | 38 +++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 2461d4dfe6..891eab33fe 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -172,7 +172,7 @@ def stream(self, func, *args, **kwargs): # We want to ensure we are returning within that timeout. disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False - deserialize = kwargs.pop('deserialize', True) + deserialize = kwargs.pop('deserialize', False) while True: resp = func(*args, **kwargs) try: diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index c5bc5c378c..69995270c4 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -489,6 +489,44 @@ def test_watch_with_error_event_and_timeout_param(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_with_deserialize_param(self): + """test watch.stream() deserialize param""" + # prepare test data + test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}' + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock(return_value=[test_json + '\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # test case with deserialize=True + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=True): + self.assertEqual("ADDED", e['type']) + # Verify that the object is deserialized correctly + self.assertTrue(hasattr(e['object'], 'metadata')) + self.assertEqual("test1", e['object'].metadata.name) + self.assertEqual("1", e['object'].metadata.resource_version) + # Verify that the original object is saved + self.assertEqual(json.loads(test_json)['object'], e['raw_object']) + + # test case with deserialize=False + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=False): + self.assertEqual("ADDED", e['type']) + # The validation object remains in the original dictionary format + self.assertIsInstance(e['object'], dict) + self.assertEqual("test1", e['object']['metadata']['name']) + self.assertEqual("1", e['object']['metadata']['resourceVersion']) + + # verify the api is called twice + fake_api.get_namespaces.assert_has_calls([ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True) + ]) if __name__ == '__main__': unittest.main() From 04855e6478305d843968f19db3fd2164480cca3b Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Sat, 8 Mar 2025 20:45:32 +0530 Subject: [PATCH 03/12] Changes for issue 2358 Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs. --- kubernetes/base/watch/watch.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 891eab33fe..4f3f6169be 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -71,6 +71,7 @@ def iter_resp_lines(resp): # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) next_newline = buffer.find(b'\n') + last_was_empty = False # Set empty-line flag while next_newline != -1: # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character line = buffer[:next_newline].decode( @@ -78,6 +79,11 @@ def iter_resp_lines(resp): buffer = buffer[next_newline+1:] if line: yield line + last_was_empty = False # Reset empty-line flag + else: + if not last_was_empty: + yield '\n' # Only print one empty line + last_was_empty = True # Mark that we handled an empty line next_newline = buffer.find(b'\n') @@ -176,6 +182,7 @@ def stream(self, func, *args, **kwargs): while True: resp = func(*args, **kwargs) try: + last_was_empty = False # Set empty line false for line in iter_resp_lines(resp): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log @@ -203,7 +210,12 @@ def stream(self, func, *args, **kwargs): retry_after_410 = False yield event else: - yield line + if line: + yield line # Normal non-empty line + last_was_empty = False + elif not last_was_empty: + yield '/n' # Only yield one empty line + last_was_empty = True if self._stop: break finally: From 3f22287cdc7f656514416979fa39025ef32f89d1 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 13 Mar 2025 13:57:29 +0530 Subject: [PATCH 04/12] Create test_pod_logs.py This file is to test whether empty line are printed when watch function in the kubernetes python client is used. --- kubernetes/test/test_pod_logs.py | 33 ++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 kubernetes/test/test_pod_logs.py diff --git a/kubernetes/test/test_pod_logs.py b/kubernetes/test/test_pod_logs.py new file mode 100644 index 0000000000..0beaa0606f --- /dev/null +++ b/kubernetes/test/test_pod_logs.py @@ -0,0 +1,33 @@ +from kubernetes import client, config, watch + +pod_name = "demo-bug" + + +config.load_kube_config() + + +api = client.CoreV1Api() +namespace = config.list_kube_config_contexts()[1]["context"]["namespace"] + +pod_manifest = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": pod_name, + }, + "spec": { + "containers": [{"image": "hello-world", "name": pod_name}], + }, +} +api.create_namespaced_pod(body=pod_manifest, namespace=namespace) + +input("\n\nSubmit when running\n\n") + +w = watch.Watch() +for e in w.stream( + api.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + follow=True, +): + print(e) From 7b89a76caee44f1f5298a19a0d09609676e077c7 Mon Sep 17 00:00:00 2001 From: Kevinz Date: Tue, 1 Apr 2025 14:11:09 +0800 Subject: [PATCH 05/12] fix: fix conflict --- kubernetes/base/watch/watch_test.py | 86 ++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 69995270c4..fded0ec897 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -14,12 +14,18 @@ import unittest +import os + +import time + from unittest.mock import Mock, call -from kubernetes import client +from kubernetes import client,config from .watch import Watch +from kubernetes.client import ApiException + class WatchTests(unittest.TestCase): def setUp(self): @@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self): # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is # the only way to do so. Without that, the stream will re-read the test data forever. for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): + # Here added a statement for exception for empty lines. + if e is None: + continue count += 1 self.assertEqual("test%d" % count, e['object'].metadata.name) self.assertEqual(3, count) @@ -488,6 +497,11 @@ def test_watch_with_error_event_and_timeout_param(self): amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + + @classmethod + def setUpClass(cls): + cls.api = Mock() + cls.namespace = "default" def test_watch_with_deserialize_param(self): """test watch.stream() deserialize param""" @@ -527,6 +541,76 @@ def test_watch_with_deserialize_param(self): call(_preload_content=False, watch=True), call(_preload_content=False, watch=True) ]) + def test_pod_log_empty_lines(self): + pod_name = "demo-bug" + # Manifest with busybax to keep pod engaged for sometiem + pod_manifest = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": pod_name}, + "spec": { + "containers": [{ + "image": "busybox", + "name": "my-container", + "command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"] + }] + }, + } + + try: + self.api.create_namespaced_pod = Mock() + self.api.read_namespaced_pod = Mock() + self.api.delete_namespaced_pod = Mock() + self.api.read_namespaced_pod_log = Mock() + + #pod creating step + self.api.create_namespaced_pod.return_value = None + + #Checking pod status + mock_pod = Mock() + mock_pod.status.phase = "Running" + self.api.read_namespaced_pod.return_value = mock_pod + + # Printing at pod output + self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) + + # Wait for the pod to reach 'Running' + timeout = 60 + start_time = time.time() + while time.time() - start_time < timeout: + pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace) + if pod.status.phase == "Running": + break + time.sleep(2) + else: + self.fail("Pod did not reach 'Running' state within timeout") + + # Reading and streaming logs using Watch (mocked) + w = Watch() + log_output = [] + #Mock logs used for this test + w.stream = Mock(return_value=[ + "Hello from Docker", + "\n", # Empty line + "Another log line", + "\n", # Another empty line + "Final log" + ]) + for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): + log_output.append(event) + print(event) + + # Print outputs + print(f"Captured logs: {log_output}") + # self.assertTrue(any("Hello from Docker" in line for line in log_output)) + self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + + except ApiException as e: + self.fail(f"Kubernetes API exception: {e}") + finally: + #checking pod is calling for delete + self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) + self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) if __name__ == '__main__': unittest.main() From 33f18929ae4edcf961ca68825866c94152fc1c88 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Tue, 18 Mar 2025 13:48:38 +0530 Subject: [PATCH 06/12] Delete kubernetes/test/test_pod_logs.py Deleting file with tests added in test/test_pod_los.py --- kubernetes/test/test_pod_logs.py | 33 -------------------------------- 1 file changed, 33 deletions(-) delete mode 100644 kubernetes/test/test_pod_logs.py diff --git a/kubernetes/test/test_pod_logs.py b/kubernetes/test/test_pod_logs.py deleted file mode 100644 index 0beaa0606f..0000000000 --- a/kubernetes/test/test_pod_logs.py +++ /dev/null @@ -1,33 +0,0 @@ -from kubernetes import client, config, watch - -pod_name = "demo-bug" - - -config.load_kube_config() - - -api = client.CoreV1Api() -namespace = config.list_kube_config_contexts()[1]["context"]["namespace"] - -pod_manifest = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "name": pod_name, - }, - "spec": { - "containers": [{"image": "hello-world", "name": pod_name}], - }, -} -api.create_namespaced_pod(body=pod_manifest, namespace=namespace) - -input("\n\nSubmit when running\n\n") - -w = watch.Watch() -for e in w.stream( - api.read_namespaced_pod_log, - name=pod_name, - namespace=namespace, - follow=True, -): - print(e) From 899ed5ff22ed2a3cade0b058d6fd00f22a11d86b Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Wed, 19 Mar 2025 14:15:44 +0530 Subject: [PATCH 07/12] Update watch.py Changes made in unmarshal_event for not having issues with empty lines. --- kubernetes/base/watch/watch.py | 45 +++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 4f3f6169be..6f6df0880a 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -82,7 +82,7 @@ def iter_resp_lines(resp): last_was_empty = False # Reset empty-line flag else: if not last_was_empty: - yield '\n' # Only print one empty line + yield '' # Only print one empty line last_was_empty = True # Mark that we handled an empty line next_newline = buffer.find(b'\n') @@ -113,24 +113,29 @@ def get_watch_argument_name(self, func): return 'watch' def unmarshal_event(self, data, return_type): - js = json.loads(data) - js['raw_object'] = js['object'] - # BOOKMARK event is treated the same as ERROR for a quick fix of - # decoding exception - # TODO: make use of the resource_version in BOOKMARK event for more - # efficient WATCH - if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': - obj = SimpleNamespace(data=json.dumps(js['raw_object'])) - js['object'] = self._api_client.deserialize(obj, return_type) - if hasattr(js['object'], 'metadata'): - self.resource_version = js['object'].metadata.resource_version - # For custom objects that we don't have model defined, json - # deserialization results in dictionary - elif (isinstance(js['object'], dict) and 'metadata' in js['object'] - and 'resourceVersion' in js['object']['metadata']): - self.resource_version = js['object']['metadata'][ - 'resourceVersion'] - return js + if not data or data.isspace(): + return None + try: + js = json.loads(data) + js['raw_object'] = js['object'] + # BOOKMARK event is treated the same as ERROR for a quick fix of + # decoding exception + # TODO: make use of the resource_version in BOOKMARK event for more + # efficient WATCH + if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': + obj = SimpleNamespace(data=json.dumps(js['raw_object'])) + js['object'] = self._api_client.deserialize(obj, return_type) + if hasattr(js['object'], 'metadata'): + self.resource_version = js['object'].metadata.resource_version + # For custom objects that we don't have model defined, json + # deserialization results in dictionary + elif (isinstance(js['object'], dict) and 'metadata' in js['object'] + and 'resourceVersion' in js['object']['metadata']): + self.resource_version = js['object']['metadata'][ + 'resourceVersion'] + return js + except json.JSONDecodeError: + return None def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. @@ -214,7 +219,7 @@ def stream(self, func, *args, **kwargs): yield line # Normal non-empty line last_was_empty = False elif not last_was_empty: - yield '/n' # Only yield one empty line + yield '' # Only yield one empty line last_was_empty = True if self._stop: break From 285afc7d21903f0d012518fcff960c709a01b061 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 00:27:16 +0530 Subject: [PATCH 08/12] Update watch_test.py Removed pod_manifest from watch_test.py. --- kubernetes/base/watch/watch_test.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index fded0ec897..373bb56a5f 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -543,20 +543,7 @@ def test_watch_with_deserialize_param(self): ]) def test_pod_log_empty_lines(self): pod_name = "demo-bug" - # Manifest with busybax to keep pod engaged for sometiem - pod_manifest = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": pod_name}, - "spec": { - "containers": [{ - "image": "busybox", - "name": "my-container", - "command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"] - }] - }, - } - + try: self.api.create_namespaced_pod = Mock() self.api.read_namespaced_pod = Mock() From 38a4b364e6d07b72cb03b32385b6603ed4403e07 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 01:44:56 +0530 Subject: [PATCH 09/12] Update watch_test.py Changes made to check whether entire log is printed or not. --- kubernetes/base/watch/watch_test.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 373bb56a5f..1d56a1beaf 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -578,9 +578,9 @@ def test_pod_log_empty_lines(self): #Mock logs used for this test w.stream = Mock(return_value=[ "Hello from Docker", - "\n", # Empty line + "", # Empty line "Another log line", - "\n", # Another empty line + "", # Another empty line "Final log" ]) for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): @@ -590,7 +590,16 @@ def test_pod_log_empty_lines(self): # Print outputs print(f"Captured logs: {log_output}") # self.assertTrue(any("Hello from Docker" in line for line in log_output)) - self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + # self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + expected_log = [ + "Hello from Docker", + "", + "Another log line", + "", + "Final log" + ] + + self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs") except ApiException as e: self.fail(f"Kubernetes API exception: {e}") From 9420d74c8b95836e4a470e2bf97e61e15cc89a69 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 11:21:20 +0530 Subject: [PATCH 10/12] Update watch.py Changes made in watch.py to print multiple empty line if necessary. --- kubernetes/base/watch/watch.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 6f6df0880a..1499f098ed 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -71,7 +71,6 @@ def iter_resp_lines(resp): # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) next_newline = buffer.find(b'\n') - last_was_empty = False # Set empty-line flag while next_newline != -1: # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character line = buffer[:next_newline].decode( @@ -79,11 +78,8 @@ def iter_resp_lines(resp): buffer = buffer[next_newline+1:] if line: yield line - last_was_empty = False # Reset empty-line flag else: - if not last_was_empty: - yield '' # Only print one empty line - last_was_empty = True # Mark that we handled an empty line + yield '' # Only print one empty line next_newline = buffer.find(b'\n') @@ -187,7 +183,6 @@ def stream(self, func, *args, **kwargs): while True: resp = func(*args, **kwargs) try: - last_was_empty = False # Set empty line false for line in iter_resp_lines(resp): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log @@ -217,10 +212,8 @@ def stream(self, func, *args, **kwargs): else: if line: yield line # Normal non-empty line - last_was_empty = False - elif not last_was_empty: + else: yield '' # Only yield one empty line - last_was_empty = True if self._stop: break finally: From 015e9339e336c6be52e06cb44af472ed83b3a6a7 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 11:23:23 +0530 Subject: [PATCH 11/12] Update watch_test.py As per request added few empty lines to test case in watch_test.py --- kubernetes/base/watch/watch_test.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 1d56a1beaf..b9cc29c2e9 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -578,9 +578,12 @@ def test_pod_log_empty_lines(self): #Mock logs used for this test w.stream = Mock(return_value=[ "Hello from Docker", - "", # Empty line + "", + "", + "\n\n", "Another log line", - "", # Another empty line + "", + "\n", "Final log" ]) for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): @@ -594,8 +597,11 @@ def test_pod_log_empty_lines(self): expected_log = [ "Hello from Docker", "", + "", + "\n\n", "Another log line", "", + "\n", "Final log" ] From 81150d39a34258675e3ea1966a5cfee238e7fddb Mon Sep 17 00:00:00 2001 From: Kevinz Date: Fri, 14 Mar 2025 12:12:45 +0800 Subject: [PATCH 12/12] feat: set default deserialize value is False and add test code --- kubernetes/base/watch/watch_test.py | 39 +++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index b9cc29c2e9..a8d925b096 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -614,5 +614,44 @@ def test_pod_log_empty_lines(self): self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) + def test_watch_with_deserialize_param(self): + """test watch.stream() deserialize param""" + # prepare test data + test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}' + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock(return_value=[test_json + '\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # test case with deserialize=True + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=True): + self.assertEqual("ADDED", e['type']) + # Verify that the object is deserialized correctly + self.assertTrue(hasattr(e['object'], 'metadata')) + self.assertEqual("test1", e['object'].metadata.name) + self.assertEqual("1", e['object'].metadata.resource_version) + # Verify that the original object is saved + self.assertEqual(json.loads(test_json)['object'], e['raw_object']) + + # test case with deserialize=False + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=False): + self.assertEqual("ADDED", e['type']) + # The validation object remains in the original dictionary format + self.assertIsInstance(e['object'], dict) + self.assertEqual("test1", e['object']['metadata']['name']) + self.assertEqual("1", e['object']['metadata']['resourceVersion']) + + # verify the api is called twice + fake_api.get_namespaces.assert_has_calls([ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True) + ]) + if __name__ == '__main__': unittest.main()