Skip to content

Commit 488518d

Browse files
committed
Enable binary support for WSClient
Currently, under python 3, the WSClient decodes all data via UTF-8. This will break, e.g. capturing the stdout of tar or gzip. This adds a new 'binary' kwarg to the WSClient class and websocket_call function. If this is set to true, then the decoding will not happen, and all channels will be interpreted as binary. This does raise a slight complication, as the OpenAPI-generated client will convert the output to a string, no matter what, which it ends up doing by (effectively) calling repr(). This requires a bit of magic to recover the orignial bytes, and is inefficient. However, this is only the case when using the default _preload_content=True, setting this to False and manually calling read_all or read_channel, this issue does not arise.
1 parent 7712421 commit 488518d

File tree

3 files changed

+73
-14
lines changed

3 files changed

+73
-14
lines changed

kubernetes/base/stream/stream.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,18 @@ def _websocket_request(websocket_request, force_kwargs, api_method, *args, **kwa
3030
except AttributeError:
3131
configuration = api_client.config
3232
prev_request = api_client.request
33+
binary = kwargs.pop('binary', False)
3334
try:
34-
api_client.request = functools.partial(websocket_request, configuration)
35-
return api_method(*args, **kwargs)
35+
api_client.request = functools.partial(websocket_request, configuration, binary=binary)
36+
out = api_method(*args, **kwargs)
37+
# The api_client insists on converting this to a string using its representation, so we have
38+
# to do this dance to strip it of the b' prefix and ' suffix, encode it byte-per-byte (latin1),
39+
# escape all of the unicode \x*'s, then encode it back byte-by-byte
40+
# However, if _preload_content=False is passed, then the entire WSClient is returned instead
41+
# of a response, and we want to leave it alone
42+
if binary and kwargs.get('_preload_content', True):
43+
out = out[2:-1].encode('latin1').decode('unicode_escape').encode('latin1')
44+
return out
3645
finally:
3746
api_client.request = prev_request
3847

kubernetes/base/stream/ws_client.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
import six
2727
import yaml
2828

29+
2930
from six.moves.urllib.parse import urlencode, urlparse, urlunparse
30-
from six import StringIO
31+
from six import StringIO, BytesIO
3132

3233
from websocket import WebSocket, ABNF, enableTrace
3334
from base64 import urlsafe_b64decode
@@ -48,7 +49,7 @@ def getvalue(self):
4849

4950

5051
class WSClient:
51-
def __init__(self, configuration, url, headers, capture_all):
52+
def __init__(self, configuration, url, headers, capture_all, binary=False):
5253
"""A websocket client with support for channels.
5354
5455
Exec command uses different channels for different streams. for
@@ -58,8 +59,10 @@ def __init__(self, configuration, url, headers, capture_all):
5859
"""
5960
self._connected = False
6061
self._channels = {}
62+
self.binary = binary
63+
self.newline = '\n' if not self.binary else b'\n'
6164
if capture_all:
62-
self._all = StringIO()
65+
self._all = StringIO() if not self.binary else BytesIO()
6366
else:
6467
self._all = _IgnoredIO()
6568
self.sock = create_websocket(configuration, url, headers)
@@ -92,8 +95,8 @@ def readline_channel(self, channel, timeout=None):
9295
while self.is_open() and time.time() - start < timeout:
9396
if channel in self._channels:
9497
data = self._channels[channel]
95-
if "\n" in data:
96-
index = data.find("\n")
98+
if self.newline in data:
99+
index = data.find(self.newline)
97100
ret = data[:index]
98101
data = data[index+1:]
99102
if data:
@@ -197,10 +200,12 @@ def update(self, timeout=0):
197200
return
198201
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
199202
data = frame.data
200-
if six.PY3:
203+
if six.PY3 and not self.binary:
201204
data = data.decode("utf-8", "replace")
202205
if len(data) > 1:
203-
channel = ord(data[0])
206+
channel = data[0]
207+
if six.PY3 and not self.binary:
208+
channel = ord(channel)
204209
data = data[1:]
205210
if data:
206211
if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
@@ -518,13 +523,17 @@ def websocket_call(configuration, _method, url, **kwargs):
518523
_request_timeout = kwargs.get("_request_timeout", 60)
519524
_preload_content = kwargs.get("_preload_content", True)
520525
capture_all = kwargs.get("capture_all", True)
521-
526+
binary = kwargs.get('binary', False)
522527
try:
523-
client = WSClient(configuration, url, headers, capture_all)
528+
client = WSClient(configuration, url, headers, capture_all, binary=binary)
524529
if not _preload_content:
525530
return client
526531
client.run_forever(timeout=_request_timeout)
527-
return WSResponse('%s' % ''.join(client.read_all()))
532+
all = client.read_all()
533+
if binary:
534+
return WSResponse(all)
535+
else:
536+
return WSResponse('%s' % ''.join(all))
528537
except (Exception, KeyboardInterrupt, SystemExit) as e:
529538
raise ApiException(status=0, reason=str(e))
530539

kubernetes/e2e_test/test_client.py

+43-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import unittest
2121
import uuid
2222
import six
23+
import io
24+
import gzip
2325

2426
from kubernetes.client import api_client
2527
from kubernetes.client.api import core_v1_api
@@ -118,15 +120,28 @@ def test_pod_apis(self):
118120
command=exec_command,
119121
stderr=False, stdin=False,
120122
stdout=True, tty=False)
121-
print('EXEC response : %s' % resp)
123+
print('EXEC response : %s (%s)' % (repr(resp), type(resp)))
124+
self.assertIsInstance(resp, str)
122125
self.assertEqual(3, len(resp.splitlines()))
123126

127+
exec_command = ['/bin/sh',
128+
'-c',
129+
'echo -n "This is a test string" | gzip']
130+
resp = stream(api.connect_get_namespaced_pod_exec, name, 'default',
131+
command=exec_command,
132+
stderr=False, stdin=False,
133+
stdout=True, tty=False,
134+
binary=True)
135+
print('EXEC response : %s (%s)' % (repr(resp), type(resp)))
136+
self.assertIsInstance(resp, bytes)
137+
self.assertEqual("This is a test string", gzip.decompress(resp).decode('utf-8'))
138+
124139
exec_command = 'uptime'
125140
resp = stream(api.connect_post_namespaced_pod_exec, name, 'default',
126141
command=exec_command,
127142
stderr=False, stdin=False,
128143
stdout=True, tty=False)
129-
print('EXEC response : %s' % resp)
144+
print('EXEC response : %s' % repr(resp))
130145
self.assertEqual(1, len(resp.splitlines()))
131146

132147
resp = stream(api.connect_post_namespaced_pod_exec, name, 'default',
@@ -154,6 +169,32 @@ def test_pod_apis(self):
154169
resp.update(timeout=5)
155170
self.assertFalse(resp.is_open())
156171

172+
resp = stream(api.connect_post_namespaced_pod_exec, name, 'default',
173+
command='/bin/sh',
174+
stderr=True, stdin=True,
175+
stdout=True, tty=False,
176+
binary=True,
177+
_preload_content=False)
178+
resp.write_stdin(b"echo test string 1\n")
179+
line = resp.readline_stdout(timeout=5)
180+
self.assertFalse(resp.peek_stderr())
181+
self.assertEqual(b"test string 1", line)
182+
resp.write_stdin(b"echo test string 2 >&2\n")
183+
line = resp.readline_stderr(timeout=5)
184+
self.assertFalse(resp.peek_stdout())
185+
self.assertEqual(b"test string 2", line)
186+
resp.write_stdin(b"exit\n")
187+
resp.update(timeout=5)
188+
while True:
189+
line = resp.read_channel(ERROR_CHANNEL)
190+
if len(line) != 0:
191+
break
192+
time.sleep(1)
193+
status = json.loads(line)
194+
self.assertEqual(status['status'], 'Success')
195+
resp.update(timeout=5)
196+
self.assertFalse(resp.is_open())
197+
157198
number_of_pods = len(api.list_pod_for_all_namespaces().items)
158199
self.assertTrue(number_of_pods > 0)
159200

0 commit comments

Comments
 (0)