Skip to content

Commit

Permalink
WIP: Improvements on ws_client. Now the client can returns an object …
Browse files Browse the repository at this point in the history
…to interact with websocket server
  • Loading branch information
mbohlool committed Feb 16, 2017
1 parent 1635150 commit df4c73b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 39 deletions.
94 changes: 94 additions & 0 deletions examples/exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import time

from kubernetes import config
from kubernetes.client import configuration
from kubernetes.client.apis import core_v1_api
from kubernetes.client.rest import ApiException

config.load_kube_config(
context="gke_cloud-kubernetes-dev_us-central1-f_mehdy-cluster")
configuration.assert_hostname = False
api = core_v1_api.CoreV1Api()
name = 'busybox-test2'

resp = None
try:
resp = api.read_namespaced_pod(name=name,
namespace='default')
except ApiException as e:
if e.status != 404:
print("Unknown error: %s" % e)
exit(1)

if not resp:
print("Pod %s does not exits. Creating it..." % name)
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name
},
'spec': {
'containers': [{
'image': 'busybox',
'name': 'sleep',
"args": [
"/bin/sh",
"-c",
"while true;do date;sleep 5; done"
]
}]
}
}
resp = api.create_namespaced_pod(body=pod_manifest,
namespace='default')
while True:
resp = api.read_namespaced_pod(name=name,
namespace='default')
if resp.status.phase != 'Pending':
break
time.sleep(1)
print "Done."

exec_command = [
'/bin/sh',
'-c',
'sleep 1; echo This message goes to stderr >&2; sleep 2; echo test2']
resp = api.connect_get_namespaced_pod_exec(name, 'default',
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print("STDOUT: %s" % resp.read_stdout())
if resp.peek_stderr():
print("STDERR: %s" % resp.read_stderr())

exit(1)
# This part does not work yet. resp.write_stdin does not work.

exec_command = ['/bin/sh']
resp = api.connect_get_namespaced_pod_exec(name, 'default',
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
commands = [
"echo test1",
"sleep 1",
"echo This message goes to stderr >&2",
"sleep 2",
"exit"
]
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print("STDOUT: %s" % resp.read_stdout())
if resp.peek_stderr():
print("STDERR: %s" % resp.read_stderr())
if commands:
c = commands.pop(0)
print "Running command... %s" % c
resp.write_stdin(c)
1 change: 1 addition & 0 deletions kubernetes/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def request(self, method, url, query_params=None, headers=None,
url,
query_params=query_params,
_request_timeout=_request_timeout,
_preload_content=_preload_content,
headers=headers)

if method == "GET":
Expand Down
134 changes: 95 additions & 39 deletions kubernetes/client/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,30 @@

from .rest import ApiException

import select
import certifi
import time
import collections
import websocket
from websocket import WebSocket, ABNF, enableTrace
import six
import ssl
from six.moves.urllib.parse import urlencode
from six.moves.urllib.parse import quote_plus

import socket

class WSClient:
def __init__(self, configuration, url, headers):
self.messages = []
self.errors = []
websocket.enableTrace(False)
header = None
enableTrace(False)
header = []
self._connected = False
self._stdout = ""
self._stderr = ""
self._all = ""

# We just need to pass the Authorization, ignore all the other
# http headers we get from the generated code
if 'Authorization' in headers:
header = "Authorization: %s" % headers['Authorization']

self.ws = websocket.WebSocketApp(url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
header=[header] if header else None)
self.ws.on_open = self.on_open
header.append("Authorization: %s" % headers['Authorization'])

if url.startswith('wss://') and configuration.verify_ssl:
ssl_opts = {
Expand All @@ -52,30 +49,87 @@ def __init__(self, configuration, url, headers):
else:
ssl_opts = {'cert_reqs': ssl.CERT_NONE}

self.ws.run_forever(sslopt=ssl_opts)

def on_message(self, ws, message):
if message[0] == '\x01':
message = message[1:]
if message:
if six.PY3 and isinstance(message, six.binary_type):
message = message.decode('utf-8')
self.messages.append(message)

def on_error(self, ws, error):
self.errors.append(error)

def on_close(self, ws):
pass

def on_open(self, ws):
pass
self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False)
self.sock.connect(url, header=header)
self._connected = True

def peek_stdout(self):
self.update()
return self._stdout

def read_stdout(self):
if not self._stdout:
self.update(timeout=None)
ret = self._stdout
self._stdout = ""
return ret

def peek_stderr(self):
self.update()
return self._stderr

def read_stderr(self):
if not self._stderr:
self.update(timeout=None)
ret = self._stderr
self._stderr = ""
return ret

def read_all(self):
out = self._all
self._all = ""
self._stdout = ""
self._stderr = ""
return out

def is_open(self):
return self._connected

# TODO: This method does not seem to work.
def write_stdin(self, data):
self.sock.send(data)

def update(self, timeout=0):
if not self.is_open():
return
if not self.sock.connected:
self._connected = False
return
r, _, _ = select.select(
(self.sock.sock, ), (), (), timeout)
if r:
op_code, frame = self.sock.recv_data_frame(True)
if op_code == ABNF.OPCODE_CLOSE:
self._connected = False
return
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
data = frame.data
if six.PY3:
data = data.decode("utf-8")
if data[0] == '\x01':
data = data[1:]
if data:
self._all += data
if data[0] == '\x02':
self._stderr += data[1:]
else:
self._stdout += data

def run_forever(self, timeout=None):
if timeout:
start = time.time()
while self.is_open() and time.time() - start < timeout:
self.update(timeout=(timeout - time.time() + start))
else:
while self.is_open():
self.update(timeout=None)


WSResponse = collections.namedtuple('WSResponse', ['data'])


def GET(configuration, url, query_params, _request_timeout, headers):
def GET(configuration, url, query_params, _request_timeout, _preload_content,
headers):
# switch protocols from http to websocket
url = url.replace('http://', 'ws://')
url = url.replace('https://', 'wss://')
Expand Down Expand Up @@ -105,10 +159,12 @@ def GET(configuration, url, query_params, _request_timeout, headers):
else:
url += '&command=' + quote_plus(commands)

client = WSClient(configuration, url, headers)
if client.errors:
raise ApiException(
status=0,
reason='\n'.join([str(error) for error in client.errors])
)
try:
client = WSClient(configuration, url, headers)
if not _preload_content:
return client
client.run_forever(timeout=_request_timeout)
return WSResponse('%s' % ''.join(client.read_all()))
except (Exception, KeyboardInterrupt, SystemExit) as e:
raise ApiException(status=0, reason=str(e))
return WSResponse('%s' % ''.join(client.messages))
1 change: 1 addition & 0 deletions kubernetes/e2e_test/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ def get_e2e_configuration():
if config.host is None:
raise unittest.SkipTest('Unable to find a running Kubernetes instance')
print('Running test against : %s' % config.host)
config.assert_hostname = False
return config

0 comments on commit df4c73b

Please sign in to comment.