From df4c73bb8134974585c378c48160189075604f2f Mon Sep 17 00:00:00 2001 From: mbohlool Date: Wed, 15 Feb 2017 12:37:43 -0800 Subject: [PATCH] WIP: Improvements on ws_client. Now the client can returns an object to interact with websocket server --- examples/exec.py | 94 ++++++++++++++++++++++ kubernetes/client/api_client.py | 1 + kubernetes/client/ws_client.py | 134 ++++++++++++++++++++++---------- kubernetes/e2e_test/base.py | 1 + 4 files changed, 191 insertions(+), 39 deletions(-) create mode 100644 examples/exec.py diff --git a/examples/exec.py b/examples/exec.py new file mode 100644 index 0000000000..3c59e51ccf --- /dev/null +++ b/examples/exec.py @@ -0,0 +1,94 @@ +import time + +from kubernetes import config +from kubernetes.client import configuration +from kubernetes.client.apis import core_v1_api +from kubernetes.client.rest import ApiException + +config.load_kube_config( + context="gke_cloud-kubernetes-dev_us-central1-f_mehdy-cluster") +configuration.assert_hostname = False +api = core_v1_api.CoreV1Api() +name = 'busybox-test2' + +resp = None +try: + resp = api.read_namespaced_pod(name=name, + namespace='default') +except ApiException as e: + if e.status != 404: + print("Unknown error: %s" % e) + exit(1) + +if not resp: + print("Pod %s does not exits. Creating it..." % name) + pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': name + }, + 'spec': { + 'containers': [{ + 'image': 'busybox', + 'name': 'sleep', + "args": [ + "/bin/sh", + "-c", + "while true;do date;sleep 5; done" + ] + }] + } + } + resp = api.create_namespaced_pod(body=pod_manifest, + namespace='default') + while True: + resp = api.read_namespaced_pod(name=name, + namespace='default') + if resp.status.phase != 'Pending': + break + time.sleep(1) + print "Done." + +exec_command = [ + '/bin/sh', + '-c', + 'sleep 1; echo This message goes to stderr >&2; sleep 2; echo test2'] +resp = api.connect_get_namespaced_pod_exec(name, 'default', + command=exec_command, + stderr=True, stdin=True, + stdout=True, tty=False, + _preload_content=False) +while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + print("STDOUT: %s" % resp.read_stdout()) + if resp.peek_stderr(): + print("STDERR: %s" % resp.read_stderr()) + +exit(1) +# This part does not work yet. resp.write_stdin does not work. + +exec_command = ['/bin/sh'] +resp = api.connect_get_namespaced_pod_exec(name, 'default', + command=exec_command, + stderr=True, stdin=True, + stdout=True, tty=False, + _preload_content=False) +commands = [ + "echo test1", + "sleep 1", + "echo This message goes to stderr >&2", + "sleep 2", + "exit" +] +while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + print("STDOUT: %s" % resp.read_stdout()) + if resp.peek_stderr(): + print("STDERR: %s" % resp.read_stderr()) + if commands: + c = commands.pop(0) + print "Running command... %s" % c + resp.write_stdin(c) diff --git a/kubernetes/client/api_client.py b/kubernetes/client/api_client.py index 6dbe7137d3..139cc67007 100644 --- a/kubernetes/client/api_client.py +++ b/kubernetes/client/api_client.py @@ -351,6 +351,7 @@ def request(self, method, url, query_params=None, headers=None, url, query_params=query_params, _request_timeout=_request_timeout, + _preload_content=_preload_content, headers=headers) if method == "GET": diff --git a/kubernetes/client/ws_client.py b/kubernetes/client/ws_client.py index b143400bee..6b3ef0d8bd 100644 --- a/kubernetes/client/ws_client.py +++ b/kubernetes/client/ws_client.py @@ -12,33 +12,30 @@ from .rest import ApiException +import select import certifi +import time import collections -import websocket +from websocket import WebSocket, ABNF, enableTrace import six import ssl from six.moves.urllib.parse import urlencode from six.moves.urllib.parse import quote_plus - +import socket class WSClient: def __init__(self, configuration, url, headers): - self.messages = [] - self.errors = [] - websocket.enableTrace(False) - header = None + enableTrace(False) + header = [] + self._connected = False + self._stdout = "" + self._stderr = "" + self._all = "" # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code if 'Authorization' in headers: - header = "Authorization: %s" % headers['Authorization'] - - self.ws = websocket.WebSocketApp(url, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close, - header=[header] if header else None) - self.ws.on_open = self.on_open + header.append("Authorization: %s" % headers['Authorization']) if url.startswith('wss://') and configuration.verify_ssl: ssl_opts = { @@ -52,30 +49,87 @@ def __init__(self, configuration, url, headers): else: ssl_opts = {'cert_reqs': ssl.CERT_NONE} - self.ws.run_forever(sslopt=ssl_opts) - - def on_message(self, ws, message): - if message[0] == '\x01': - message = message[1:] - if message: - if six.PY3 and isinstance(message, six.binary_type): - message = message.decode('utf-8') - self.messages.append(message) - - def on_error(self, ws, error): - self.errors.append(error) - - def on_close(self, ws): - pass - - def on_open(self, ws): - pass + self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False) + self.sock.connect(url, header=header) + self._connected = True + + def peek_stdout(self): + self.update() + return self._stdout + + def read_stdout(self): + if not self._stdout: + self.update(timeout=None) + ret = self._stdout + self._stdout = "" + return ret + + def peek_stderr(self): + self.update() + return self._stderr + + def read_stderr(self): + if not self._stderr: + self.update(timeout=None) + ret = self._stderr + self._stderr = "" + return ret + + def read_all(self): + out = self._all + self._all = "" + self._stdout = "" + self._stderr = "" + return out + + def is_open(self): + return self._connected + + # TODO: This method does not seem to work. + def write_stdin(self, data): + self.sock.send(data) + + def update(self, timeout=0): + if not self.is_open(): + return + if not self.sock.connected: + self._connected = False + return + r, _, _ = select.select( + (self.sock.sock, ), (), (), timeout) + if r: + op_code, frame = self.sock.recv_data_frame(True) + if op_code == ABNF.OPCODE_CLOSE: + self._connected = False + return + elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: + data = frame.data + if six.PY3: + data = data.decode("utf-8") + if data[0] == '\x01': + data = data[1:] + if data: + self._all += data + if data[0] == '\x02': + self._stderr += data[1:] + else: + self._stdout += data + + def run_forever(self, timeout=None): + if timeout: + start = time.time() + while self.is_open() and time.time() - start < timeout: + self.update(timeout=(timeout - time.time() + start)) + else: + while self.is_open(): + self.update(timeout=None) WSResponse = collections.namedtuple('WSResponse', ['data']) -def GET(configuration, url, query_params, _request_timeout, headers): +def GET(configuration, url, query_params, _request_timeout, _preload_content, + headers): # switch protocols from http to websocket url = url.replace('http://', 'ws://') url = url.replace('https://', 'wss://') @@ -105,10 +159,12 @@ def GET(configuration, url, query_params, _request_timeout, headers): else: url += '&command=' + quote_plus(commands) - client = WSClient(configuration, url, headers) - if client.errors: - raise ApiException( - status=0, - reason='\n'.join([str(error) for error in client.errors]) - ) + try: + client = WSClient(configuration, url, headers) + if not _preload_content: + return client + client.run_forever(timeout=_request_timeout) + return WSResponse('%s' % ''.join(client.read_all())) + except (Exception, KeyboardInterrupt, SystemExit) as e: + raise ApiException(status=0, reason=str(e)) return WSResponse('%s' % ''.join(client.messages)) diff --git a/kubernetes/e2e_test/base.py b/kubernetes/e2e_test/base.py index 5f04ab7e8e..ee19b14ac4 100644 --- a/kubernetes/e2e_test/base.py +++ b/kubernetes/e2e_test/base.py @@ -42,4 +42,5 @@ def get_e2e_configuration(): if config.host is None: raise unittest.SkipTest('Unable to find a running Kubernetes instance') print('Running test against : %s' % config.host) + config.assert_hostname = False return config