-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improvements on ws_client #125
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import time | ||
|
||
from kubernetes import config | ||
from kubernetes.client import configuration | ||
from kubernetes.client.apis import core_v1_api | ||
from kubernetes.client.rest import ApiException | ||
|
||
config.load_kube_config() | ||
configuration.assert_hostname = False | ||
api = core_v1_api.CoreV1Api() | ||
name = 'busybox-test' | ||
|
||
resp = None | ||
try: | ||
resp = api.read_namespaced_pod(name=name, | ||
namespace='default') | ||
except ApiException as e: | ||
if e.status != 404: | ||
print("Unknown error: %s" % e) | ||
exit(1) | ||
|
||
if not resp: | ||
print("Pod %s does not exits. Creating it..." % name) | ||
pod_manifest = { | ||
'apiVersion': 'v1', | ||
'kind': 'Pod', | ||
'metadata': { | ||
'name': name | ||
}, | ||
'spec': { | ||
'containers': [{ | ||
'image': 'busybox', | ||
'name': 'sleep', | ||
"args": [ | ||
"/bin/sh", | ||
"-c", | ||
"while true;do date;sleep 5; done" | ||
] | ||
}] | ||
} | ||
} | ||
resp = api.create_namespaced_pod(body=pod_manifest, | ||
namespace='default') | ||
while True: | ||
resp = api.read_namespaced_pod(name=name, | ||
namespace='default') | ||
if resp.status.phase != 'Pending': | ||
break | ||
time.sleep(1) | ||
print("Done.") | ||
|
||
|
||
# calling exec and wait for response. | ||
exec_command = [ | ||
'/bin/sh', | ||
'-c', | ||
'echo This message goes to stderr >&2; echo This message goes to stdout'] | ||
resp = api.connect_get_namespaced_pod_exec(name, 'default', | ||
command=exec_command, | ||
stderr=True, stdin=False, | ||
stdout=True, tty=False) | ||
print("Response: " + resp) | ||
|
||
# Calling exec interactively. | ||
exec_command = ['/bin/sh'] | ||
resp = api.connect_get_namespaced_pod_exec(name, 'default', | ||
command=exec_command, | ||
stderr=True, stdin=True, | ||
stdout=True, tty=False, | ||
|
||
_preload_content=False) | ||
commands = [ | ||
"echo test1", | ||
"echo \"This message goes to stderr\" >&2", | ||
] | ||
while resp.is_open(): | ||
resp.update(timeout=1) | ||
if resp.peek_stdout(): | ||
print("STDOUT: %s" % resp.read_stdout()) | ||
if resp.peek_stderr(): | ||
print("STDERR: %s" % resp.read_stderr()) | ||
if commands: | ||
c = commands.pop(0) | ||
print("Running command... %s\n" % c) | ||
resp.write_stdin(c + "\n") | ||
else: | ||
break | ||
|
||
resp.write_stdin("date\n") | ||
sdate = resp.readline_stdout(timeout=3) | ||
print("Server date command returns: %s" % sdate) | ||
resp.write_stdin("whoami\n") | ||
user = resp.readline_stdout(timeout=3) | ||
print("Server user is: %s" % user) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,33 +12,40 @@ | |
|
||
from .rest import ApiException | ||
|
||
import select | ||
import certifi | ||
import time | ||
import collections | ||
import websocket | ||
from websocket import WebSocket, ABNF, enableTrace | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a suggestion however can we organize these, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have pep8 and sort_import running automatically and checking these. We cannot organize them the way we want unless we want to remove those checks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
import six | ||
import ssl | ||
from six.moves.urllib.parse import urlencode | ||
from six.moves.urllib.parse import quote_plus | ||
|
||
STDIN_CHANNEL = 0 | ||
STDOUT_CHANNEL = 1 | ||
STDERR_CHANNEL = 2 | ||
|
||
|
||
class WSClient: | ||
def __init__(self, configuration, url, headers): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add some documentation to this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add some documentation for methods in this class. init specifically does not need docs in my opinion as this class is only created inside this file. |
||
self.messages = [] | ||
self.errors = [] | ||
websocket.enableTrace(False) | ||
header = None | ||
"""A websocket client with support for channels. | ||
Exec command uses different channels for different streams. for | ||
example, 0 is stdin, 1 is stdout and 2 is stderr. Some other API calls | ||
like port forwarding can forward different pods' streams to different | ||
channels. | ||
""" | ||
enableTrace(False) | ||
header = [] | ||
self._connected = False | ||
self._channels = {} | ||
self._all = "" | ||
|
||
# We just need to pass the Authorization, ignore all the other | ||
# http headers we get from the generated code | ||
if 'Authorization' in headers: | ||
header = "Authorization: %s" % headers['Authorization'] | ||
|
||
self.ws = websocket.WebSocketApp(url, | ||
on_message=self.on_message, | ||
on_error=self.on_error, | ||
on_close=self.on_close, | ||
header=[header] if header else None) | ||
self.ws.on_open = self.on_open | ||
if headers and 'authorization' in headers: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we support both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should be consistent. We used lowercase |
||
header.append("authorization: %s" % headers['authorization']) | ||
|
||
if url.startswith('wss://') and configuration.verify_ssl: | ||
ssl_opts = { | ||
|
@@ -52,30 +59,145 @@ def __init__(self, configuration, url, headers): | |
else: | ||
ssl_opts = {'cert_reqs': ssl.CERT_NONE} | ||
|
||
self.ws.run_forever(sslopt=ssl_opts) | ||
|
||
def on_message(self, ws, message): | ||
if message[0] == '\x01': | ||
message = message[1:] | ||
if message: | ||
if six.PY3 and isinstance(message, six.binary_type): | ||
message = message.decode('utf-8') | ||
self.messages.append(message) | ||
|
||
def on_error(self, ws, error): | ||
self.errors.append(error) | ||
|
||
def on_close(self, ws): | ||
pass | ||
|
||
def on_open(self, ws): | ||
pass | ||
self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False) | ||
self.sock.connect(url, header=header) | ||
self._connected = True | ||
|
||
def peek_channel(self, channel, timeout=0): | ||
"""Peek a channel and return part of the input, | ||
empty string otherwise.""" | ||
self.update(timeout=timeout) | ||
if channel in self._channels: | ||
return self._channels[channel] | ||
return "" | ||
|
||
def read_channel(self, channel, timeout=0): | ||
"""Read data from a channel.""" | ||
if channel not in self._channels: | ||
ret = self.peek_channel(channel, timeout) | ||
else: | ||
ret = self._channels[channel] | ||
if channel in self._channels: | ||
del self._channels[channel] | ||
return ret | ||
|
||
def readline_channel(self, channel, timeout=None): | ||
"""Read a line from a channel.""" | ||
if timeout is None: | ||
timeout = float("inf") | ||
start = time.time() | ||
while self.is_open() and time.time() - start < timeout: | ||
if channel in self._channels: | ||
data = self._channels[channel] | ||
if "\n" in data: | ||
index = data.find("\n") | ||
ret = data[:index] | ||
data = data[index+1:] | ||
if data: | ||
self._channels[channel] = data | ||
else: | ||
del self._channels[channel] | ||
return ret | ||
self.update(timeout=(timeout - time.time() + start)) | ||
|
||
def write_channel(self, channel, data): | ||
"""Write data to a channel.""" | ||
self.sock.send(chr(channel) + data) | ||
|
||
def peek_stdout(self, timeout=0): | ||
"""Same as peek_channel with channel=1.""" | ||
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout) | ||
|
||
def read_stdout(self, timeout=None): | ||
"""Same as read_channel with channel=1.""" | ||
return self.read_channel(STDOUT_CHANNEL, timeout=timeout) | ||
|
||
def readline_stdout(self, timeout=None): | ||
"""Same as readline_channel with channel=1.""" | ||
return self.readline_channel(STDOUT_CHANNEL, timeout=timeout) | ||
|
||
def peek_stderr(self, timeout=0): | ||
"""Same as peek_channel with channel=2.""" | ||
return self.peek_channel(STDERR_CHANNEL, timeout=timeout) | ||
|
||
def read_stderr(self, timeout=None): | ||
"""Same as read_channel with channel=2.""" | ||
return self.read_channel(STDERR_CHANNEL, timeout=timeout) | ||
|
||
def readline_stderr(self, timeout=None): | ||
"""Same as readline_channel with channel=2.""" | ||
return self.readline_channel(STDERR_CHANNEL, timeout=timeout) | ||
|
||
def read_all(self): | ||
"""Read all of the inputs with the same order they recieved. The channel | ||
information would be part of the string. This is useful for | ||
non-interactive call where a set of command passed to the API call and | ||
their result is needed after the call is concluded. | ||
TODO: Maybe we can process this and return a more meaningful map with | ||
channels mapped for each input. | ||
""" | ||
out = self._all | ||
self._all = "" | ||
self._channels = {} | ||
return out | ||
|
||
def is_open(self): | ||
"""True if the connection is still alive.""" | ||
return self._connected | ||
|
||
def write_stdin(self, data): | ||
"""The same as write_channel with channel=0.""" | ||
self.write_channel(STDIN_CHANNEL, data) | ||
|
||
def update(self, timeout=0): | ||
"""Update channel buffers with at most one complete frame of input.""" | ||
if not self.is_open(): | ||
return | ||
if not self.sock.connected: | ||
self._connected = False | ||
return | ||
r, _, _ = select.select( | ||
(self.sock.sock, ), (), (), timeout) | ||
if r: | ||
op_code, frame = self.sock.recv_data_frame(True) | ||
if op_code == ABNF.OPCODE_CLOSE: | ||
self._connected = False | ||
return | ||
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any other There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know much about sockets however I see that there are more codes https://github.com/websocket-client/websocket-client/blob/master/websocket/_abnf.py#L102 where I thought we may have to cover our bases but I would like to leave that up to someone else who is more experienced in the area. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've modeled this from WebSocketApp here: https://github.com/websocket-client/websocket-client/blob/master/websocket/_app.py#L205. Ping and Pong are optional (and we can support it later). Cont can be ignored and it is only useful if you want to get notified on an incomplete frame. however, we may improve this later by doing something on "Cont" (like running update again). |
||
data = frame.data | ||
if six.PY3: | ||
data = data.decode("utf-8") | ||
self._all += data | ||
if len(data) > 1: | ||
channel = ord(data[0]) | ||
data = data[1:] | ||
if data: | ||
if channel not in self._channels: | ||
self._channels[channel] = data | ||
else: | ||
self._channels[channel] += data | ||
|
||
def run_forever(self, timeout=None): | ||
"""Wait till connection is closed or timeout reached. Buffer any input | ||
received during this time.""" | ||
if timeout: | ||
start = time.time() | ||
while self.is_open() and time.time() - start < timeout: | ||
self.update(timeout=(timeout - time.time() + start)) | ||
else: | ||
while self.is_open(): | ||
self.update(timeout=None) | ||
|
||
|
||
WSResponse = collections.namedtuple('WSResponse', ['data']) | ||
|
||
|
||
def GET(configuration, url, query_params, _request_timeout, headers): | ||
def websocket_call(configuration, url, query_params, _request_timeout, | ||
_preload_content, headers): | ||
"""An internal function to be called in api-client when a websocket | ||
connection is required.""" | ||
|
||
# switch protocols from http to websocket | ||
url = url.replace('http://', 'ws://') | ||
url = url.replace('https://', 'wss://') | ||
|
@@ -105,10 +227,11 @@ def GET(configuration, url, query_params, _request_timeout, headers): | |
else: | ||
url += '&command=' + quote_plus(commands) | ||
|
||
client = WSClient(configuration, url, headers) | ||
if client.errors: | ||
raise ApiException( | ||
status=0, | ||
reason='\n'.join([str(error) for error in client.errors]) | ||
) | ||
return WSResponse('%s' % ''.join(client.messages)) | ||
try: | ||
client = WSClient(configuration, url, headers) | ||
if not _preload_content: | ||
return client | ||
client.run_forever(timeout=_request_timeout) | ||
return WSResponse('%s' % ''.join(client.read_all())) | ||
except (Exception, KeyboardInterrupt, SystemExit) as e: | ||
raise ApiException(status=0, reason=str(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mentioned in the comments that
stdin
is not working, can we get rid of that argument here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome!