Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at making RemoteKernelManager independent #810

Merged
merged 1 commit into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Jupyter Enterprise Gateway leverages local resource managers to distribute kerne
kernel-kubernetes
kernel-docker
kernel-conductor
kernel-library

.. toctree::
:maxdepth: 2
Expand Down
22 changes: 22 additions & 0 deletions docs/source/kernel-library.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Standalone Remote Kernel Execution
##### a.k.a "Library Mode"

Remote kernels can be executed by using the `RemoteKernelManager` class directly. This enables running kernels using `ProcessProxy`s without the Enterprise Gateway Webapp.

This can be useful in niche situations, for example, using [nbconvert](https://nbconvert.readthedocs.io/) or [nbclient](https://nbclient.readthedocs.io/) to execute a kernel on a remote cluster.

Sample code using nbclient 0.2.0:

```python
import nbformat
from nbclient import NotebookClient
from enterprise_gateway.services.kernels.remotemanager import RemoteKernelManager

with open("my_notebook.ipynb") as fp:
test_notebook = nbformat.read(fp, as_version=4)

client = NotebookClient(nb=test_notebook, kernel_manager_class=RemoteKernelManager)
client.execute()
```

The above code will execute the notebook on a kernel using the configured `ProcessProxy` (defaults to Kubernetes).
3 changes: 3 additions & 0 deletions docs/source/use-cases.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ issues and manage network configurations that adhere to my corporate policy.

- **As an administrator**, I want to constrain the number of active kernels that each of my users can have at any
given time.

- **As a solution architect**, I want to easily integrate the ability to launch remote kernels with existing platforms,
so I can leverage my compute cluster in a customizable way.
392 changes: 5 additions & 387 deletions enterprise_gateway/enterprisegatewayapp.py

Large diffs are not rendered by default.

398 changes: 393 additions & 5 deletions enterprise_gateway/mixins.py

Large diffs are not rendered by default.

160 changes: 116 additions & 44 deletions enterprise_gateway/services/kernels/remotemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
from ipython_genutils.importstring import import_item
from notebook.services.kernels.kernelmanager import MappingKernelManager
from jupyter_client.ioloop.manager import IOLoopKernelManager
from traitlets import directional_link, log as traitlets_log

from ..processproxies.processproxy import LocalProcessProxy, RemoteProcessProxy
from ..sessions.kernelsessionmanager import KernelSessionManager
from enterprise_gateway.mixins import EnterpriseGatewayConfigMixin
golf-player marked this conversation as resolved.
Show resolved Hide resolved


def get_process_proxy_config(kernelspec):
Expand Down Expand Up @@ -44,6 +46,46 @@ def get_process_proxy_config(kernelspec):
return {"class_name": "enterprise_gateway.services.processproxies.processproxy.LocalProcessProxy", "config": {}}


def new_kernel_id(**kwargs):
"""
This method provides a mechanism by which clients can specify a kernel's id. In this case
that mechanism is via the per-kernel environment variable: KERNEL_ID. If specified, its value
will be validated and returned, otherwise the result from the provided method is returned.

NOTE: This method exists in jupyter_client.multikernelmanager.py for releases > 5.2.3. If you
find that this method is not getting invoked, then you likely need to update the version of
jupyter_client. The Enterprise Gateway dependency will be updated once new releases of
jupyter_client are more prevalent.

Returns
-------
kernel_id : str
The uuid string to associate with the new kernel
"""
log = kwargs.pop("log", None) or traitlets_log.get_logger()
kernel_id_fn = kwargs.pop("kernel_id_fn", None) or (lambda: unicode_type(uuid.uuid4()))
kevin-bates marked this conversation as resolved.
Show resolved Hide resolved

env = kwargs.get('env')
if env and env.get('KERNEL_ID'): # If there's a KERNEL_ID in the env, check it out
# convert string back to UUID - validating string in the process.
str_kernel_id = env.get('KERNEL_ID')
try:
str_v4_kernel_id = str(uuid.UUID(str_kernel_id, version=4))
if str_kernel_id != str_v4_kernel_id: # Given string is not uuid v4 compliant
raise ValueError("value is not uuid v4 compliant")
except ValueError as ve:
log.error("Invalid v4 UUID value detected in ['env']['KERNEL_ID']: '{}'! Error: {}".
format(str_kernel_id, ve))
raise ve
# user-provided id is valid, use it
kernel_id = unicode_type(str_kernel_id)
log.debug("Using user-provided kernel_id: {}".format(kernel_id))
else:
kernel_id = kernel_id_fn(**kwargs)

return kernel_id


class RemoteMappingKernelManager(MappingKernelManager):
"""Extends the MappingKernelManager with support for managing remote kernels via the process-proxy. """

Expand Down Expand Up @@ -161,44 +203,12 @@ def start_kernel_from_session(self, kernel_id, kernel_name, connection_info, pro
return True

def new_kernel_id(self, **kwargs):
"""Determines the kernel_id to use for a new kernel.

This method provides a mechanism by which clients can specify a kernel's id. In this case
that mechanism is via the per-kernel environment variable: KERNEL_ID. If specified, its value
will be validated and returned, otherwise the result from the superclass method is returned.

NOTE: This method exists in jupyter_client.multikernelmanager.py for releases > 5.2.3. If you
find that this method is not getting invoked, then you likely need to update the version of
jupyter_client. The Enterprise Gateway dependency will be updated once new releases of
jupyter_client are more prevalent.

Returns
-------
kernel_id : str
The uuid string to associate with the new kernel
"""
env = kwargs.get('env')
if env and env.get('KERNEL_ID'): # If there's a KERNEL_ID in the env, check it out
# convert string back to UUID - validating string in the process.
str_kernel_id = env.get('KERNEL_ID')
try:
str_v4_kernel_id = str(uuid.UUID(str_kernel_id, version=4))
if str_kernel_id != str_v4_kernel_id: # Given string is not uuid v4 compliant
raise ValueError("value is not uuid v4 compliant")
except ValueError as ve:
self.log.error("Invalid v4 UUID value detected in ['env']['KERNEL_ID']: '{}'! Error: {}".
format(str_kernel_id, ve))
raise ve
# user-provided id is valid, use it
kernel_id = unicode_type(str_kernel_id)
self.log.debug("Using user-provided kernel_id: {}".format(kernel_id))
else:
kernel_id = super(RemoteMappingKernelManager, self).new_kernel_id(**kwargs)
"""Determines the kernel_id to use for a new kernel."""

return kernel_id
return new_kernel_id(kernel_id_fn=super(RemoteMappingKernelManager, self).new_kernel_id, log=self.log)


class RemoteKernelManager(IOLoopKernelManager):
class RemoteKernelManager(EnterpriseGatewayConfigMixin, IOLoopKernelManager):
"""Extends the IOLoopKernelManager used by the MappingKernelManager.

This class is responsible for detecting that a remote kernel is desired, then launching the
Expand All @@ -211,7 +221,6 @@ def __init__(self, **kwargs):
self.process_proxy = None
self.response_address = None
self.sigint_value = None
self.port_range = None
self.kernel_id = None
self.user_overrides = {}
self.restarting = False # need to track whether we're in a restart situation or not
Expand All @@ -224,6 +233,43 @@ def __init__(self, **kwargs):
if hasattr(self, "cache_ports"):
self.cache_ports = False

if not self.connection_file:
self.kernel_id = new_kernel_id(log=self.log)

self._link_dependent_props()

if self.kernel_spec_manager is None:
self.kernel_spec_manager = self.kernel_spec_manager_class(
parent=self,
)

def _link_dependent_props(self):
"""
Ensure that RemoteKernelManager, when used as part of an EnterpriseGatewayApp,
has certain necessary configuration stay in sync with the app's configuration.

When RemoteKernelManager is used independently, this function is a no-op, and
default values or configuration set on this class is used.
"""
try:
eg_instance = self.parent.parent
except AttributeError:
return
dependent_props = ["authorized_users",
"unauthorized_users",
"port_range",
"impersonation_enabled",
"max_kernels_per_user",
"env_whitelist",
"env_process_whitelist",
"yarn_endpoint",
"alt_yarn_endpoint",
"yarn_endpoint_security_enabled",
"conductor_endpoint",
"remote_hosts"
]
self._links = [directional_link((eg_instance, prop), (self, prop)) for prop in dependent_props]
golf-player marked this conversation as resolved.
Show resolved Hide resolved

def start_kernel(self, **kwargs):
"""Starts a kernel in a separate process.

Expand All @@ -248,8 +294,8 @@ def _capture_user_overrides(self, **kwargs):
env = kwargs.get('env', {})
self.user_overrides.update({key: value for key, value in env.items()
if key.startswith('KERNEL_') or
key in self.parent.parent.env_process_whitelist or
key in self.parent.parent.env_whitelist})
key in self.env_process_whitelist or
key in self.env_whitelist})

def format_kernel_cmd(self, extra_arguments=None):
""" Replace templated args (e.g. {response_address}, {port_range}, or {kernel_id}). """
Expand Down Expand Up @@ -325,26 +371,28 @@ def restart_kernel(self, now=False, **kwargs):
kernel.
"""
self.restarting = True
kernel_id = os.path.basename(self.connection_file).replace('kernel-', '').replace('.json', '')
kernel_id = self.kernel_id or os.path.basename(self.connection_file).replace('kernel-', '').replace('.json', '')
# Check if this is a remote process proxy and if now = True. If so, check its connection count. If no
# connections, shutdown else perform the restart. Note: auto-restart sets now=True, but handlers use
# the default value (False).
if isinstance(self.process_proxy, RemoteProcessProxy) and now:
if self.parent._kernel_connections.get(kernel_id, 0) == 0:
if isinstance(self.process_proxy, RemoteProcessProxy) and now and self.mapping_kernel_manager:
golf-player marked this conversation as resolved.
Show resolved Hide resolved
if self.mapping_kernel_manager._kernel_connections.get(kernel_id, 0) == 0:
self.log.warning("Remote kernel ({}) will not be automatically restarted since there are no "
"clients connected at this time.".format(kernel_id))
# Use the parent mapping kernel manager so activity monitoring and culling is also shutdown
self.parent.shutdown_kernel(kernel_id, now=now)
self.mapping_kernel_manager.shutdown_kernel(kernel_id, now=now)
return
super(RemoteKernelManager, self).restart_kernel(now, **kwargs)
if isinstance(self.process_proxy, RemoteProcessProxy): # for remote kernels...
# Re-establish activity watching...
if self._activity_stream:
self._activity_stream.close()
self._activity_stream = None
self.parent.start_watching_activity(kernel_id)
if self.mapping_kernel_manager:
self.mapping_kernel_manager.start_watching_activity(kernel_id)
# Refresh persisted state.
self.parent.parent.kernel_session_manager.refresh_session(kernel_id)
if self.kernel_session_manager:
self.kernel_session_manager.refresh_session(kernel_id)
self.restarting = False

def signal_kernel(self, signum):
Expand Down Expand Up @@ -423,3 +471,27 @@ def _get_process_proxy(self):
format(self.kernel_spec.display_name, process_proxy_class_name))
process_proxy_class = import_item(process_proxy_class_name)
self.process_proxy = process_proxy_class(kernel_manager=self, proxy_config=process_proxy_cfg.get('config'))

kevin-bates marked this conversation as resolved.
Show resolved Hide resolved
# When this class is used by an EnterpriseGatewayApp instance, it will be able to
# access the app's configuration using the traitlet parent chain.
# When it's used independently, it should fall back to safe defaults.
@property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a comment block somewhere (thinking prior to these properties) that talks about when self.parent would not exist. This might warrant an entry in the docs as well - although I don't mind keeping this on the down-low for the time being. 😉

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Is there somewhere specific in the docs where something like this might go?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay deferring this for now, although if we were to add it, the following locations might be worthy of updates:

I'm trying to think of what we'd call the topic. The term "Library Mode" popped into my head just now. I'm not enamored with that, but I'd like the topic to convey a more direct capability that utilizes the heart of EG. "Integration Mode" - something that allows third-parties to use RMK directly. Ideas welcome!

The topic should include an example - much like the one you opened the issue with - based on the final results. (Btw, I'd like to have those details anyway so I could try things out against my Hadoop YARN cluster.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like library mode. Just anything that conveys the meaning that this is JEG but without the gateway webapp.

Standalone Remote Kernel Execution if verbosity isn't an issue.

The example actually becomes really simple.

import nbformat
from nbclient import NotebookClient
from enterprise_gateway.services.kernels.remotemanager import RemoteKernelManager

with open("/home/ish/notebooks/Untitled.ipynb") as fp:
    test_notebook = nbformat.read(fp, as_version=4)

client = NotebookClient(nb=test_notebook, kernel_manager_class=RemoteKernelManager)
client.execute()

If you'd like a full docker with kernelspecs and config, I can do that, but I don't have experience with Yarn so it may take a while.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like "Standalone Remote Kernel Execution" as the topic. Perhaps in the opening statement, we can toss in an "a.k.a., Library Mode". 😄 This way we have a way to quickly identify the context when communicating this functionality or discussing it in general.

The sample is excellent - I'm really happy with how this is turning out. Let's not worry about a docker image for the moment. Given the example, I can run this against my YARN cluster, although probably not today (sorry).

def kernel_session_manager(self):
try:
return self.parent.parent.kernel_session_manager
except AttributeError:
return None

@property
def cull_idle_timeout(self):
try:
return self.parent.cull_idle_timeout
except AttributeError:
return 0

@property
def mapping_kernel_manager(self):
try:
return self.parent
except AttributeError:
return None
2 changes: 1 addition & 1 deletion enterprise_gateway/services/processproxies/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, kernel_manager, proxy_config):
self.env = None
self.rest_credential = None
self.conductor_endpoint = proxy_config.get('conductor_endpoint',
kernel_manager.parent.parent.conductor_endpoint)
kernel_manager.conductor_endpoint)

def launch_process(self, kernel_cmd, **kwargs):
"""Launches the specified process within a Conductor cluster environment."""
Expand Down
2 changes: 1 addition & 1 deletion enterprise_gateway/services/processproxies/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, kernel_manager, proxy_config):
if proxy_config.get('remote_hosts'):
self.hosts = proxy_config.get('remote_hosts').split(',')
else:
self.hosts = kernel_manager.parent.parent.remote_hosts # from command line or env
self.hosts = kernel_manager.remote_hosts # from command line or env

def launch_process(self, kernel_cmd, **kwargs):
"""Launches a kernel process on a selected host."""
Expand Down
34 changes: 20 additions & 14 deletions enterprise_gateway/services/processproxies/processproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ def __init__(self, kernel_manager, proxy_config):
# relaunch (see jupyter_client.manager.start_kernel().
self.kernel_manager.ip = '0.0.0.0'
self.log = kernel_manager.log

# extract the kernel_id string from the connection file and set the KERNEL_ID environment variable
self.kernel_manager.kernel_id = os.path.basename(self.kernel_manager.connection_file). \
replace('kernel-', '').replace('.json', '')
if self.kernel_manager.kernel_id is None:
self.kernel_manager.kernel_id = os.path.basename(self.kernel_manager.connection_file). \
replace('kernel-', '').replace('.json', '')

self.kernel_id = self.kernel_manager.kernel_id
self.kernel_launch_timeout = default_kernel_launch_timeout
self.lower_port = 0
Expand All @@ -127,15 +130,15 @@ def __init__(self, kernel_manager, proxy_config):

# Handle authorization sets...
# Take union of unauthorized users...
self.unauthorized_users = self.kernel_manager.parent.parent.unauthorized_users
self.unauthorized_users = self.kernel_manager.unauthorized_users
if proxy_config.get('unauthorized_users'):
self.unauthorized_users = self.unauthorized_users.union(proxy_config.get('unauthorized_users').split(','))

# Let authorized users override global value - if set on kernelspec...
if proxy_config.get('authorized_users'):
self.authorized_users = set(proxy_config.get('authorized_users').split(','))
else:
self.authorized_users = self.kernel_manager.parent.parent.authorized_users
self.authorized_users = self.kernel_manager.authorized_users

# Represents the local process (from popen) if applicable. Note that we could have local_proc = None even when
# the subclass is a LocalProcessProxy (or YarnProcessProxy). This will happen if EG is restarted and the
Expand Down Expand Up @@ -432,7 +435,7 @@ def _enforce_authorization(self, **kwargs):

# Although it may already be set in the env, just override in case it was only set via command line or config
# Convert to string since execve() (called by Popen in base classes) wants string values.
env_dict['EG_IMPERSONATION_ENABLED'] = str(self.kernel_manager.parent.parent.impersonation_enabled)
env_dict['EG_IMPERSONATION_ENABLED'] = str(self.kernel_manager.impersonation_enabled)

# Ensure KERNEL_USERNAME is set
kernel_username = KernelSessionManager.get_kernel_username(**kwargs)
Expand Down Expand Up @@ -460,16 +463,19 @@ def _enforce_limits(self, **kwargs):

# if kernels-per-user is configured, ensure that this next kernel is still within the limit. If this
# is due to a restart, skip enforcement since we're re-using that id.
max_kernels_per_user = self.kernel_manager.parent.parent.max_kernels_per_user
max_kernels_per_user = self.kernel_manager.max_kernels_per_user
if max_kernels_per_user >= 0 and not self.kernel_manager.restarting:
env_dict = kwargs.get('env')
username = env_dict['KERNEL_USERNAME']
current_kernel_count = self.kernel_manager.parent.parent.kernel_session_manager.active_sessions(username)
if current_kernel_count >= max_kernels_per_user:
error_message = "A max kernels per user limit has been set to {} and user '{}' currently has {} " \
"active {}.".format(max_kernels_per_user, username, current_kernel_count,
"kernel" if max_kernels_per_user == 1 else "kernels")
self.log_and_raise(http_status_code=403, reason=error_message)

# Per user limits are only meaningful if a session manager exists.
if self.kernel_manager.kernel_session_manager:
current_kernel_count = self.kernel_manager.kernel_session_manager.active_sessions(username)
if current_kernel_count >= max_kernels_per_user:
error_message = "A max kernels per user limit has been set to {} and user '{}' currently has {} " \
"active {}.".format(max_kernels_per_user, username, current_kernel_count,
"kernel" if max_kernels_per_user == 1 else "kernels")
self.log_and_raise(http_status_code=403, reason=error_message)

def get_process_info(self):
"""Captures the base information necessary for kernel persistence relative to process proxies.
Expand All @@ -494,7 +500,7 @@ def load_process_info(self, process_info):
def _validate_port_range(self, proxy_config):
"""Validates the port range configuration option to ensure appropriate values."""
# Let port_range override global value - if set on kernelspec...
port_range = self.kernel_manager.parent.parent.port_range
port_range = self.kernel_manager.port_range
if proxy_config.get('port_range'):
port_range = proxy_config.get('port_range')

Expand Down Expand Up @@ -797,7 +803,7 @@ def _spawn_ssh_tunnel(self, kernel_channel, local_port, remote_port, remote_ip,
return pexpect.spawn(cmd, env=os.environ.copy().pop('SSH_ASKPASS', None))

def _get_keep_alive_interval(self, kernel_channel):
cull_idle_timeout = self.kernel_manager.parent.cull_idle_timeout
cull_idle_timeout = self.kernel_manager.cull_idle_timeout

if (kernel_channel == KernelChannel.COMMUNICATION or
kernel_channel == KernelChannel.CONTROL or
Expand Down
Loading