Skip to content

Commit

Permalink
fixup! Improvements on ws_client. Now the client can returns an objec…
Browse files Browse the repository at this point in the history
…t to interact with websocket server and reach each channel separately
  • Loading branch information
mbohlool committed Feb 17, 2017
1 parent 8e3fb91 commit fd956cd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
13 changes: 6 additions & 7 deletions kubernetes/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,12 @@ def request(self, method, url, query_params=None, headers=None,
# FIXME(dims) : We need a better way to figure out which
# calls end up using web sockets
if url.endswith('/exec') and (method == "GET" or method == "POST"):
return ws_client.GET(self.config,
url,
query_params=query_params,
_request_timeout=_request_timeout,
_preload_content=_preload_content,
headers=headers)

return ws_client.websocket_call(self.config,
url,
query_params=query_params,
_request_timeout=_request_timeout,
_preload_content=_preload_content,
headers=headers)
if method == "GET":
return self.rest_client.GET(url,
query_params=query_params,
Expand Down
40 changes: 37 additions & 3 deletions kubernetes/client/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@

class WSClient:
def __init__(self, configuration, url, headers):
"""A websocket client with support for channels.
Exec command uses different channels for different streams. for
example, 0 is stdin, 1 is stdout and 2 is stderr. Some other API calls
like port forwarding can forward different pods' streams to different
channels.
"""
enableTrace(False)
header = []
self._connected = False
Expand All @@ -37,7 +44,7 @@ def __init__(self, configuration, url, headers):

# We just need to pass the Authorization, ignore all the other
# http headers we get from the generated code
if 'Authorization' in headers:
if headers and 'Authorization' in headers:
header.append("Authorization: %s" % headers['Authorization'])

if url.startswith('wss://') and configuration.verify_ssl:
Expand All @@ -57,12 +64,15 @@ def __init__(self, configuration, url, headers):
self._connected = True

def peek_channel(self, channel, timeout=0):
"""Peek a channel and return part of the input,
empty string otherwise."""
self.update(timeout=timeout)
if channel in self._channels:
return self._channels[channel]
return ""

def read_channel(self, channel, timeout=0):
"""Read data from a channel."""
if channel not in self._channels:
ret = self.peek_channel(channel, timeout)
else:
Expand All @@ -72,6 +82,7 @@ def read_channel(self, channel, timeout=0):
return ret

def readline_channel(self, channel, timeout=None):
"""Read a line from a channel."""
if timeout is None:
timeout = float("inf")
start = time.time()
Expand All @@ -90,39 +101,57 @@ def readline_channel(self, channel, timeout=None):
self.update(timeout=(timeout - time.time() + start))

def write_channel(self, channel, data):
"""Write data to a channel."""
self.sock.send(chr(channel) + data)

def peek_stdout(self, timeout=0):
"""Same as peek_channel with channel=1."""
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout)

def read_stdout(self, timeout=None):
"""Same as read_channel with channel=1."""
return self.read_channel(STDOUT_CHANNEL, timeout=timeout)

def readline_stdout(self, timeout=None):
"""Same as readline_channel with channel=1."""
return self.readline_channel(STDOUT_CHANNEL, timeout=timeout)

def peek_stderr(self, timeout=0):
"""Same as peek_channel with channel=2."""
return self.peek_channel(STDERR_CHANNEL, timeout=timeout)

def read_stderr(self, timeout=None):
"""Same as read_channel with channel=2."""
return self.read_channel(STDERR_CHANNEL, timeout=timeout)

def readline_stderr(self, timeout=None):
"""Same as readline_channel with channel=2."""
return self.readline_channel(STDERR_CHANNEL, timeout=timeout)

def read_all(self):
"""Read all of the inputs with the same order they recieved. The channel
information would be part of the string. This is useful for
non-interactive call where a set of command passed to the API call and
their result is needed after the call is concluded.
TODO: Maybe we can process this and return a more meaningful map with
channels mapped for each input.
"""
out = self._all
self._all = ""
self._channels = {}
return out

def is_open(self):
"""True if the connection is still alive."""
return self._connected

def write_stdin(self, data):
"""The same as write_channel with channel=0."""
self.write_channel(STDIN_CHANNEL, data)

def update(self, timeout=0):
"""Update channel buffers with at most one complete frame of input."""
if not self.is_open():
return
if not self.sock.connected:
Expand Down Expand Up @@ -150,6 +179,8 @@ def update(self, timeout=0):
self._channels[channel] += data

def run_forever(self, timeout=None):
"""Wait till connection is closed or timeout reached. Buffer any input
received during this time."""
if timeout:
start = time.time()
while self.is_open() and time.time() - start < timeout:
Expand All @@ -162,8 +193,11 @@ def run_forever(self, timeout=None):
WSResponse = collections.namedtuple('WSResponse', ['data'])


def GET(configuration, url, query_params, _request_timeout, _preload_content,
headers):
def websocket_call(configuration, url, query_params, _request_timeout,
_preload_content, headers):
"""An internal function to be called in api-client when a websocket
connection is required."""

# switch protocols from http to websocket
url = url.replace('http://', 'ws://')
url = url.replace('https://', 'wss://')
Expand Down

0 comments on commit fd956cd

Please sign in to comment.