From 4b1eed4946a1e1acafbe008d24848aa59ab51c94 Mon Sep 17 00:00:00 2001 From: golf-player <> Date: Fri, 24 Apr 2020 02:36:04 -0500 Subject: [PATCH] Make RemoteKernelManager more independent Make RemoteKernelManager (RKM) able to be run without a (grand)parent EnterpriseGatewayApp (EGA) instance so it can be used by nbclient. The config which RKM with EGA was being referred to using traitlet lineage like `self.parent.parent.property_name`. Change both to inherit from a Configurable mixin which contains all of EGA's previous config. Link the attributes of the RKM instance with EGA if available to keep old behaviour. Modify other properties RKM uses that are not traits to become `@property`s which fall back to sane defaults if running independently. Change RKM to be able to generate a kernel id if necessary. Change ProcessProxy to use provided kernel ids. Move kernel id generation logic out of RemoteMappingKernelManager. Resolves #803 --- docs/source/index.rst | 1 + docs/source/kernel-library.md | 22 + docs/source/use-cases.md | 3 + enterprise_gateway/enterprisegatewayapp.py | 392 +---------------- enterprise_gateway/mixins.py | 398 +++++++++++++++++- .../services/kernels/remotemanager.py | 160 +++++-- .../services/processproxies/conductor.py | 2 +- .../services/processproxies/distributed.py | 2 +- .../services/processproxies/processproxy.py | 34 +- .../services/processproxies/yarn.py | 6 +- .../tests/test_enterprise_gateway.py | 18 +- 11 files changed, 574 insertions(+), 464 deletions(-) create mode 100644 docs/source/kernel-library.md diff --git a/docs/source/index.rst b/docs/source/index.rst index 86243b0b6..7d2da65b7 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -41,6 +41,7 @@ Jupyter Enterprise Gateway leverages local resource managers to distribute kerne kernel-kubernetes kernel-docker kernel-conductor + kernel-library .. toctree:: :maxdepth: 2 diff --git a/docs/source/kernel-library.md b/docs/source/kernel-library.md new file mode 100644 index 000000000..013a6b920 --- /dev/null +++ b/docs/source/kernel-library.md @@ -0,0 +1,22 @@ +## Standalone Remote Kernel Execution +##### a.k.a "Library Mode" + +Remote kernels can be executed by using the `RemoteKernelManager` class directly. This enables running kernels using `ProcessProxy`s without the Enterprise Gateway Webapp. + +This can be useful in niche situations, for example, using [nbconvert](https://nbconvert.readthedocs.io/) or [nbclient](https://nbclient.readthedocs.io/) to execute a kernel on a remote cluster. + +Sample code using nbclient 0.2.0: + +```python +import nbformat +from nbclient import NotebookClient +from enterprise_gateway.services.kernels.remotemanager import RemoteKernelManager + +with open("my_notebook.ipynb") as fp: + test_notebook = nbformat.read(fp, as_version=4) + +client = NotebookClient(nb=test_notebook, kernel_manager_class=RemoteKernelManager) +client.execute() +``` + +The above code will execute the notebook on a kernel using the configured `ProcessProxy` (defaults to Kubernetes). \ No newline at end of file diff --git a/docs/source/use-cases.md b/docs/source/use-cases.md index 91126bd41..3e00d124a 100644 --- a/docs/source/use-cases.md +++ b/docs/source/use-cases.md @@ -22,3 +22,6 @@ issues and manage network configurations that adhere to my corporate policy. - **As an administrator**, I want to constrain the number of active kernels that each of my users can have at any given time. + +- **As a solution architect**, I want to easily integrate the ability to launch remote kernels with existing platforms, +so I can leverage my compute cluster in a customizable way. diff --git a/enterprise_gateway/enterprisegatewayapp.py b/enterprise_gateway/enterprisegatewayapp.py index 92330ce50..09c15d282 100644 --- a/enterprise_gateway/enterprisegatewayapp.py +++ b/enterprise_gateway/enterprisegatewayapp.py @@ -12,8 +12,6 @@ import time import weakref -from distutils.util import strtobool - # Install the pyzmq ioloop. This has to be done before anything else from # tornado is imported. from zmq.eventloop import ioloop @@ -21,13 +19,11 @@ from tornado import httpserver from tornado import web -from tornado.log import enable_pretty_logging, LogFormatter +from tornado.log import enable_pretty_logging -from traitlets import default, List, Set, Unicode, Type, Instance, Bool, CBool, Integer, observe from traitlets.config import Configurable from jupyter_core.application import JupyterApp, base_aliases from jupyter_client.kernelspec import KernelSpecManager -from notebook.services.kernels.kernelmanager import MappingKernelManager from notebook.notebookapp import random_ports from notebook.utils import url_path_join @@ -39,10 +35,12 @@ from .services.kernelspecs.handlers import default_handlers as default_kernelspec_handlers from .services.sessions.handlers import default_handlers as default_session_handlers -from .services.sessions.kernelsessionmanager import KernelSessionManager, FileKernelSessionManager +from .services.sessions.kernelsessionmanager import FileKernelSessionManager from .services.sessions.sessionmanager import SessionManager from .services.kernels.remotemanager import RemoteMappingKernelManager +from .mixins import EnterpriseGatewayConfigMixin + # Add additional command line aliases aliases = dict(base_aliases) @@ -56,7 +54,7 @@ }) -class EnterpriseGatewayApp(JupyterApp): +class EnterpriseGatewayApp(EnterpriseGatewayConfigMixin, JupyterApp): """Application that provisions Jupyter kernels and proxies HTTP/Websocket traffic to the kernels. @@ -79,386 +77,6 @@ class EnterpriseGatewayApp(JupyterApp): # Enable some command line shortcuts aliases = aliases - # Server IP / PORT binding - port_env = 'EG_PORT' - port_default_value = 8888 - port = Integer(port_default_value, config=True, - help='Port on which to listen (EG_PORT env var)') - - @default('port') - def port_default(self): - return int(os.getenv(self.port_env, os.getenv('KG_PORT', self.port_default_value))) - - port_retries_env = 'EG_PORT_RETRIES' - port_retries_default_value = 50 - port_retries = Integer(port_retries_default_value, config=True, - help="""Number of ports to try if the specified port is not available - (EG_PORT_RETRIES env var)""") - - @default('port_retries') - def port_retries_default(self): - return int(os.getenv(self.port_retries_env, os.getenv('KG_PORT_RETRIES', self.port_retries_default_value))) - - ip_env = 'EG_IP' - ip_default_value = '127.0.0.1' - ip = Unicode(ip_default_value, config=True, - help='IP address on which to listen (EG_IP env var)') - - @default('ip') - def ip_default(self): - return os.getenv(self.ip_env, os.getenv('KG_IP', self.ip_default_value)) - - # Base URL - base_url_env = 'EG_BASE_URL' - base_url_default_value = '/' - base_url = Unicode(base_url_default_value, config=True, - help='The base path for mounting all API resources (EG_BASE_URL env var)') - - @default('base_url') - def base_url_default(self): - return os.getenv(self.base_url_env, os.getenv('KG_BASE_URL', self.base_url_default_value)) - - # Token authorization - auth_token_env = 'EG_AUTH_TOKEN' - auth_token = Unicode(config=True, - help='Authorization token required for all requests (EG_AUTH_TOKEN env var)') - - @default('auth_token') - def _auth_token_default(self): - return os.getenv(self.auth_token_env, os.getenv('KG_AUTH_TOKEN', '')) - - # Begin CORS headers - allow_credentials_env = 'EG_ALLOW_CREDENTIALS' - allow_credentials = Unicode(config=True, - help='Sets the Access-Control-Allow-Credentials header. (EG_ALLOW_CREDENTIALS env var)') - - @default('allow_credentials') - def allow_credentials_default(self): - return os.getenv(self.allow_credentials_env, os.getenv('KG_ALLOW_CREDENTIALS', '')) - - allow_headers_env = 'EG_ALLOW_HEADERS' - allow_headers = Unicode(config=True, - help='Sets the Access-Control-Allow-Headers header. (EG_ALLOW_HEADERS env var)') - - @default('allow_headers') - def allow_headers_default(self): - return os.getenv(self.allow_headers_env, os.getenv('KG_ALLOW_HEADERS', '')) - - allow_methods_env = 'EG_ALLOW_METHODS' - allow_methods = Unicode(config=True, - help='Sets the Access-Control-Allow-Methods header. (EG_ALLOW_METHODS env var)') - - @default('allow_methods') - def allow_methods_default(self): - return os.getenv(self.allow_methods_env, os.getenv('KG_ALLOW_METHODS', '')) - - allow_origin_env = 'EG_ALLOW_ORIGIN' - allow_origin = Unicode(config=True, - help='Sets the Access-Control-Allow-Origin header. (EG_ALLOW_ORIGIN env var)') - - @default('allow_origin') - def allow_origin_default(self): - return os.getenv(self.allow_origin_env, os.getenv('KG_ALLOW_ORIGIN', '')) - - expose_headers_env = 'EG_EXPOSE_HEADERS' - expose_headers = Unicode(config=True, - help='Sets the Access-Control-Expose-Headers header. (EG_EXPOSE_HEADERS env var)') - - @default('expose_headers') - def expose_headers_default(self): - return os.getenv(self.expose_headers_env, os.getenv('KG_EXPOSE_HEADERS', '')) - - trust_xheaders_env = 'EG_TRUST_XHEADERS' - trust_xheaders = CBool(False, config=True, - help="""Use x-* header values for overriding the remote-ip, useful when - application is behing a proxy. (EG_TRUST_XHEADERS env var)""") - - @default('trust_xheaders') - def trust_xheaders_default(self): - return strtobool(os.getenv(self.trust_xheaders_env, os.getenv('KG_TRUST_XHEADERS', 'False'))) - - certfile_env = 'EG_CERTFILE' - certfile = Unicode(None, config=True, allow_none=True, - help='The full path to an SSL/TLS certificate file. (EG_CERTFILE env var)') - - @default('certfile') - def certfile_default(self): - return os.getenv(self.certfile_env, os.getenv('KG_CERTFILE')) - - keyfile_env = 'EG_KEYFILE' - keyfile = Unicode(None, config=True, allow_none=True, - help='The full path to a private key file for usage with SSL/TLS. (EG_KEYFILE env var)') - - @default('keyfile') - def keyfile_default(self): - return os.getenv(self.keyfile_env, os.getenv('KG_KEYFILE')) - - client_ca_env = 'EG_CLIENT_CA' - client_ca = Unicode(None, config=True, allow_none=True, - help="""The full path to a certificate authority certificate for SSL/TLS - client authentication. (EG_CLIENT_CA env var)""") - - @default('client_ca') - def client_ca_default(self): - return os.getenv(self.client_ca_env, os.getenv('KG_CLIENT_CA')) - - max_age_env = 'EG_MAX_AGE' - max_age = Unicode(config=True, - help='Sets the Access-Control-Max-Age header. (EG_MAX_AGE env var)') - - @default('max_age') - def max_age_default(self): - return os.getenv(self.max_age_env, os.getenv('KG_MAX_AGE', '')) - # End CORS headers - - max_kernels_env = 'EG_MAX_KERNELS' - max_kernels = Integer(None, config=True, - allow_none=True, - help="""Limits the number of kernel instances allowed to run by this gateway. - Unbounded by default. (EG_MAX_KERNELS env var)""") - - @default('max_kernels') - def max_kernels_default(self): - val = os.getenv(self.max_kernels_env, os.getenv('KG_MAX_KERNELS')) - return val if val is None else int(val) - - default_kernel_name_env = 'EG_DEFAULT_KERNEL_NAME' - default_kernel_name = Unicode(config=True, - help='Default kernel name when spawning a kernel (EG_DEFAULT_KERNEL_NAME env var)') - - @default('default_kernel_name') - def default_kernel_name_default(self): - # defaults to Jupyter's default kernel name on empty string - return os.getenv(self.default_kernel_name_env, os.getenv('KG_DEFAULT_KERNEL_NAME', '')) - - list_kernels_env = 'EG_LIST_KERNELS' - list_kernels = Bool(config=True, - help="""Permits listing of the running kernels using API endpoints /api/kernels - and /api/sessions. (EG_LIST_KERNELS env var) Note: Jupyter Notebook - allows this by default but Jupyter Enterprise Gateway does not.""") - - @default('list_kernels') - def list_kernels_default(self): - return os.getenv(self.list_kernels_env, os.getenv('KG_LIST_KERNELS', 'False')).lower() == 'true' - - env_whitelist_env = 'EG_ENV_WHITELIST' - env_whitelist = List(config=True, - help="""Environment variables allowed to be set when a client requests a - new kernel. (EG_ENV_WHITELIST env var)""") - - @default('env_whitelist') - def env_whitelist_default(self): - return os.getenv(self.env_whitelist_env, os.getenv('KG_ENV_WHITELIST', '')).split(',') - - env_process_whitelist_env = 'EG_ENV_PROCESS_WHITELIST' - env_process_whitelist = List(config=True, - help="""Environment variables allowed to be inherited - from the spawning process by the kernel. (EG_ENV_PROCESS_WHITELIST env var)""") - - @default('env_process_whitelist') - def env_process_whitelist_default(self): - return os.getenv(self.env_process_whitelist_env, os.getenv('KG_ENV_PROCESS_WHITELIST', '')).split(',') - - # Remote hosts - remote_hosts_env = 'EG_REMOTE_HOSTS' - remote_hosts_default_value = 'localhost' - remote_hosts = List(default_value=[remote_hosts_default_value], config=True, - help="""Bracketed comma-separated list of hosts on which DistributedProcessProxy - kernels will be launched e.g., ['host1','host2']. (EG_REMOTE_HOSTS env var - - non-bracketed, just comma-separated)""") - - @default('remote_hosts') - def remote_hosts_default(self): - return os.getenv(self.remote_hosts_env, self.remote_hosts_default_value).split(',') - - # Yarn endpoint - yarn_endpoint_env = 'EG_YARN_ENDPOINT' - yarn_endpoint = Unicode(None, config=True, allow_none=True, - help="""The http url specifying the YARN Resource Manager. Note: If this value is NOT set, - the YARN library will use the files within the local HADOOP_CONFIG_DIR to determine the - active resource manager. (EG_YARN_ENDPOINT env var)""") - - @default('yarn_endpoint') - def yarn_endpoint_default(self): - return os.getenv(self.yarn_endpoint_env) - - # Alt Yarn endpoint - alt_yarn_endpoint_env = 'EG_ALT_YARN_ENDPOINT' - alt_yarn_endpoint = Unicode(None, config=True, allow_none=True, - help="""The http url specifying the alternate YARN Resource Manager. This value should - be set when YARN Resource Managers are configured for high availability. Note: If both - YARN endpoints are NOT set, the YARN library will use the files within the local - HADOOP_CONFIG_DIR to determine the active resource manager. - (EG_ALT_YARN_ENDPOINT env var)""") - - @default('alt_yarn_endpoint') - def alt_yarn_endpoint_default(self): - return os.getenv(self.alt_yarn_endpoint_env) - - yarn_endpoint_security_enabled_env = 'EG_YARN_ENDPOINT_SECURITY_ENABLED' - yarn_endpoint_security_enabled_default_value = False - yarn_endpoint_security_enabled = Bool(yarn_endpoint_security_enabled_default_value, config=True, - help="""Is YARN Kerberos/SPNEGO Security enabled (True/False). - (EG_YARN_ENDPOINT_SECURITY_ENABLED env var)""") - - @default('yarn_endpoint_security_enabled') - def yarn_endpoint_security_enabled_default(self): - return bool(os.getenv(self.yarn_endpoint_security_enabled_env, - self.yarn_endpoint_security_enabled_default_value)) - - # Conductor endpoint - conductor_endpoint_env = 'EG_CONDUCTOR_ENDPOINT' - conductor_endpoint_default_value = None - conductor_endpoint = Unicode(conductor_endpoint_default_value, config=True, - help="""The http url for accessing the Conductor REST API. - (EG_CONDUCTOR_ENDPOINT env var)""") - - @default('conductor_endpoint') - def conductor_endpoint_default(self): - return os.getenv(self.conductor_endpoint_env, self.conductor_endpoint_default_value) - - _log_formatter_cls = LogFormatter # traitlet default is LevelFormatter - - @default('log_format') - def _default_log_format(self): - """override default log format to include milliseconds""" - return u"%(color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(end_color)s %(message)s" - - # Impersonation enabled - impersonation_enabled_env = 'EG_IMPERSONATION_ENABLED' - impersonation_enabled = Bool(False, config=True, - help="""Indicates whether impersonation will be performed during kernel launch. - (EG_IMPERSONATION_ENABLED env var)""") - - @default('impersonation_enabled') - def impersonation_enabled_default(self): - return bool(os.getenv(self.impersonation_enabled_env, 'false').lower() == 'true') - - # Unauthorized users - unauthorized_users_env = 'EG_UNAUTHORIZED_USERS' - unauthorized_users_default_value = 'root' - unauthorized_users = Set(default_value={unauthorized_users_default_value}, config=True, - help="""Comma-separated list of user names (e.g., ['root','admin']) against which - KERNEL_USERNAME will be compared. Any match (case-sensitive) will prevent the - kernel's launch and result in an HTTP 403 (Forbidden) error. - (EG_UNAUTHORIZED_USERS env var - non-bracketed, just comma-separated)""") - - @default('unauthorized_users') - def unauthorized_users_default(self): - return os.getenv(self.unauthorized_users_env, self.unauthorized_users_default_value).split(',') - - # Authorized users - authorized_users_env = 'EG_AUTHORIZED_USERS' - authorized_users = Set(config=True, - help="""Comma-separated list of user names (e.g., ['bob','alice']) against which - KERNEL_USERNAME will be compared. Any match (case-sensitive) will allow the kernel's - launch, otherwise an HTTP 403 (Forbidden) error will be raised. The set of unauthorized - users takes precedence. This option should be used carefully as it can dramatically limit - who can launch kernels. (EG_AUTHORIZED_USERS env var - non-bracketed, - just comma-separated)""") - - @default('authorized_users') - def authorized_users_default(self): - au_env = os.getenv(self.authorized_users_env) - return au_env.split(',') if au_env is not None else [] - - # Port range - port_range_env = 'EG_PORT_RANGE' - port_range_default_value = "0..0" - port_range = Unicode(port_range_default_value, config=True, - help="""Specifies the lower and upper port numbers from which ports are created. - The bounded values are separated by '..' (e.g., 33245..34245 specifies a range of 1000 ports - to be randomly selected). A range of zero (e.g., 33245..33245 or 0..0) disables port-range - enforcement. (EG_PORT_RANGE env var)""") - - @default('port_range') - def port_range_default(self): - return os.getenv(self.port_range_env, self.port_range_default_value) - - # Max Kernels per User - max_kernels_per_user_env = 'EG_MAX_KERNELS_PER_USER' - max_kernels_per_user_default_value = -1 - max_kernels_per_user = Integer(max_kernels_per_user_default_value, config=True, - help="""Specifies the maximum number of kernels a user can have active - simultaneously. A value of -1 disables enforcement. - (EG_MAX_KERNELS_PER_USER env var)""") - - @default('max_kernels_per_user') - def max_kernels_per_user_default(self): - return int(os.getenv(self.max_kernels_per_user_env, self.max_kernels_per_user_default_value)) - - ws_ping_interval_env = 'EG_WS_PING_INTERVAL_SECS' - ws_ping_interval_default_value = 30 - ws_ping_interval = Integer(ws_ping_interval_default_value, config=True, - help="""Specifies the ping interval(in seconds) that should be used by zmq port - associated withspawned kernels.Set this variable to 0 to disable ping mechanism. - (EG_WS_PING_INTERVAL_SECS env var)""") - - @default('ws_ping_interval') - def ws_ping_interval_default(self): - return int(os.getenv(self.ws_ping_interval_env, self.ws_ping_interval_default_value)) - - # Dynamic Update Interval - dynamic_config_interval_env = 'EG_DYNAMIC_CONFIG_INTERVAL' - dynamic_config_interval_default_value = 0 - dynamic_config_interval = Integer(dynamic_config_interval_default_value, min=0, config=True, - help="""Specifies the number of seconds configuration files are polled for - changes. A value of 0 or less disables dynamic config updates. - (EG_DYNAMIC_CONFIG_INTERVAL env var)""") - - @default('dynamic_config_interval') - def dynamic_config_interval_default(self): - return int(os.getenv(self.dynamic_config_interval_env, self.dynamic_config_interval_default_value)) - - @observe('dynamic_config_interval') - def dynamic_config_interval_changed(self, event): - prev_val = event['old'] - self.dynamic_config_interval = event['new'] - if self.dynamic_config_interval != prev_val: - # Values are different. Stop the current poller. If new value is > 0, start a poller. - if self.dynamic_config_poller: - self.dynamic_config_poller.stop() - self.dynamic_config_poller = None - - if self.dynamic_config_interval <= 0: - self.log.warning("Dynamic configuration updates have been disabled and cannot be re-enabled " - "without restarting Enterprise Gateway!") - elif prev_val > 0: # The interval has been changed, but still positive - self.init_dynamic_configs() # Restart the poller - - dynamic_config_poller = None - - kernel_spec_manager = Instance(KernelSpecManager, allow_none=True) - - kernel_spec_manager_class = Type( - default_value=KernelSpecManager, - config=True, - help=""" - The kernel spec manager class to use. Must be a subclass - of `jupyter_client.kernelspec.KernelSpecManager`. - """ - ) - - kernel_manager_class = Type( - klass=MappingKernelManager, - default_value=RemoteMappingKernelManager, - config=True, - help=""" - The kernel manager class to use. Must be a subclass - of `notebook.services.kernels.MappingKernelManager`. - """ - ) - - kernel_session_manager_class = Type( - klass=KernelSessionManager, - default_value=FileKernelSessionManager, - config=True, - help=""" - The kernel session manager class to use. Must be a subclass - of `enterprise_gateway.services.sessions.KernelSessionManager`. - """ - ) - def initialize(self, argv=None): """Initializes the base class, configurable manager instances, the Tornado web app, and the tornado HTTP server. diff --git a/enterprise_gateway/mixins.py b/enterprise_gateway/mixins.py index 84bcc45cb..b923d28d2 100644 --- a/enterprise_gateway/mixins.py +++ b/enterprise_gateway/mixins.py @@ -2,14 +2,17 @@ # Distributed under the terms of the Modified BSD License. """Mixins for Tornado handlers.""" +from distutils.util import strtobool +from http.client import responses import json +import os import traceback + from tornado import web -try: - # py3 - from http.client import responses -except ImportError: - from httplib import responses +from tornado.log import LogFormatter + +from traitlets import default, List, Set, Unicode, Type, Instance, Bool, CBool, Integer, observe +from traitlets.config import Configurable class CORSMixin(object): @@ -129,3 +132,388 @@ def write_error(self, status_code, **kwargs): self.set_header('Content-Type', 'application/json') self.set_status(status_code, reason=reply['reason']) self.finish(json.dumps(reply)) + + +class EnterpriseGatewayConfigMixin(Configurable): + # Server IP / PORT binding + port_env = 'EG_PORT' + port_default_value = 8888 + port = Integer(port_default_value, config=True, + help='Port on which to listen (EG_PORT env var)') + + @default('port') + def port_default(self): + return int(os.getenv(self.port_env, os.getenv('KG_PORT', self.port_default_value))) + + port_retries_env = 'EG_PORT_RETRIES' + port_retries_default_value = 50 + port_retries = Integer(port_retries_default_value, config=True, + help="""Number of ports to try if the specified port is not available + (EG_PORT_RETRIES env var)""") + + @default('port_retries') + def port_retries_default(self): + return int(os.getenv(self.port_retries_env, os.getenv('KG_PORT_RETRIES', self.port_retries_default_value))) + + ip_env = 'EG_IP' + ip_default_value = '127.0.0.1' + ip = Unicode(ip_default_value, config=True, + help='IP address on which to listen (EG_IP env var)') + + @default('ip') + def ip_default(self): + return os.getenv(self.ip_env, os.getenv('KG_IP', self.ip_default_value)) + + # Base URL + base_url_env = 'EG_BASE_URL' + base_url_default_value = '/' + base_url = Unicode(base_url_default_value, config=True, + help='The base path for mounting all API resources (EG_BASE_URL env var)') + + @default('base_url') + def base_url_default(self): + return os.getenv(self.base_url_env, os.getenv('KG_BASE_URL', self.base_url_default_value)) + + # Token authorization + auth_token_env = 'EG_AUTH_TOKEN' + auth_token = Unicode(config=True, + help='Authorization token required for all requests (EG_AUTH_TOKEN env var)') + + @default('auth_token') + def _auth_token_default(self): + return os.getenv(self.auth_token_env, os.getenv('KG_AUTH_TOKEN', '')) + + # Begin CORS headers + allow_credentials_env = 'EG_ALLOW_CREDENTIALS' + allow_credentials = Unicode(config=True, + help='Sets the Access-Control-Allow-Credentials header. (EG_ALLOW_CREDENTIALS env var)') + + @default('allow_credentials') + def allow_credentials_default(self): + return os.getenv(self.allow_credentials_env, os.getenv('KG_ALLOW_CREDENTIALS', '')) + + allow_headers_env = 'EG_ALLOW_HEADERS' + allow_headers = Unicode(config=True, + help='Sets the Access-Control-Allow-Headers header. (EG_ALLOW_HEADERS env var)') + + @default('allow_headers') + def allow_headers_default(self): + return os.getenv(self.allow_headers_env, os.getenv('KG_ALLOW_HEADERS', '')) + + allow_methods_env = 'EG_ALLOW_METHODS' + allow_methods = Unicode(config=True, + help='Sets the Access-Control-Allow-Methods header. (EG_ALLOW_METHODS env var)') + + @default('allow_methods') + def allow_methods_default(self): + return os.getenv(self.allow_methods_env, os.getenv('KG_ALLOW_METHODS', '')) + + allow_origin_env = 'EG_ALLOW_ORIGIN' + allow_origin = Unicode(config=True, + help='Sets the Access-Control-Allow-Origin header. (EG_ALLOW_ORIGIN env var)') + + @default('allow_origin') + def allow_origin_default(self): + return os.getenv(self.allow_origin_env, os.getenv('KG_ALLOW_ORIGIN', '')) + + expose_headers_env = 'EG_EXPOSE_HEADERS' + expose_headers = Unicode(config=True, + help='Sets the Access-Control-Expose-Headers header. (EG_EXPOSE_HEADERS env var)') + + @default('expose_headers') + def expose_headers_default(self): + return os.getenv(self.expose_headers_env, os.getenv('KG_EXPOSE_HEADERS', '')) + + trust_xheaders_env = 'EG_TRUST_XHEADERS' + trust_xheaders = CBool(False, config=True, + help="""Use x-* header values for overriding the remote-ip, useful when + application is behing a proxy. (EG_TRUST_XHEADERS env var)""") + + @default('trust_xheaders') + def trust_xheaders_default(self): + return strtobool(os.getenv(self.trust_xheaders_env, os.getenv('KG_TRUST_XHEADERS', 'False'))) + + certfile_env = 'EG_CERTFILE' + certfile = Unicode(None, config=True, allow_none=True, + help='The full path to an SSL/TLS certificate file. (EG_CERTFILE env var)') + + @default('certfile') + def certfile_default(self): + return os.getenv(self.certfile_env, os.getenv('KG_CERTFILE')) + + keyfile_env = 'EG_KEYFILE' + keyfile = Unicode(None, config=True, allow_none=True, + help='The full path to a private key file for usage with SSL/TLS. (EG_KEYFILE env var)') + + @default('keyfile') + def keyfile_default(self): + return os.getenv(self.keyfile_env, os.getenv('KG_KEYFILE')) + + client_ca_env = 'EG_CLIENT_CA' + client_ca = Unicode(None, config=True, allow_none=True, + help="""The full path to a certificate authority certificate for SSL/TLS + client authentication. (EG_CLIENT_CA env var)""") + + @default('client_ca') + def client_ca_default(self): + return os.getenv(self.client_ca_env, os.getenv('KG_CLIENT_CA')) + + max_age_env = 'EG_MAX_AGE' + max_age = Unicode(config=True, + help='Sets the Access-Control-Max-Age header. (EG_MAX_AGE env var)') + + @default('max_age') + def max_age_default(self): + return os.getenv(self.max_age_env, os.getenv('KG_MAX_AGE', '')) + # End CORS headers + + max_kernels_env = 'EG_MAX_KERNELS' + max_kernels = Integer(None, config=True, + allow_none=True, + help="""Limits the number of kernel instances allowed to run by this gateway. + Unbounded by default. (EG_MAX_KERNELS env var)""") + + @default('max_kernels') + def max_kernels_default(self): + val = os.getenv(self.max_kernels_env, os.getenv('KG_MAX_KERNELS')) + return val if val is None else int(val) + + default_kernel_name_env = 'EG_DEFAULT_KERNEL_NAME' + default_kernel_name = Unicode(config=True, + help='Default kernel name when spawning a kernel (EG_DEFAULT_KERNEL_NAME env var)') + + @default('default_kernel_name') + def default_kernel_name_default(self): + # defaults to Jupyter's default kernel name on empty string + return os.getenv(self.default_kernel_name_env, os.getenv('KG_DEFAULT_KERNEL_NAME', '')) + + list_kernels_env = 'EG_LIST_KERNELS' + list_kernels = Bool(config=True, + help="""Permits listing of the running kernels using API endpoints /api/kernels + and /api/sessions. (EG_LIST_KERNELS env var) Note: Jupyter Notebook + allows this by default but Jupyter Enterprise Gateway does not.""") + + @default('list_kernels') + def list_kernels_default(self): + return os.getenv(self.list_kernels_env, os.getenv('KG_LIST_KERNELS', 'False')).lower() == 'true' + + env_whitelist_env = 'EG_ENV_WHITELIST' + env_whitelist = List(config=True, + help="""Environment variables allowed to be set when a client requests a + new kernel. (EG_ENV_WHITELIST env var)""") + + @default('env_whitelist') + def env_whitelist_default(self): + return os.getenv(self.env_whitelist_env, os.getenv('KG_ENV_WHITELIST', '')).split(',') + + env_process_whitelist_env = 'EG_ENV_PROCESS_WHITELIST' + env_process_whitelist = List(config=True, + help="""Environment variables allowed to be inherited + from the spawning process by the kernel. (EG_ENV_PROCESS_WHITELIST env var)""") + + @default('env_process_whitelist') + def env_process_whitelist_default(self): + return os.getenv(self.env_process_whitelist_env, os.getenv('KG_ENV_PROCESS_WHITELIST', '')).split(',') + + # Remote hosts + remote_hosts_env = 'EG_REMOTE_HOSTS' + remote_hosts_default_value = 'localhost' + remote_hosts = List(default_value=[remote_hosts_default_value], config=True, + help="""Bracketed comma-separated list of hosts on which DistributedProcessProxy + kernels will be launched e.g., ['host1','host2']. (EG_REMOTE_HOSTS env var + - non-bracketed, just comma-separated)""") + + @default('remote_hosts') + def remote_hosts_default(self): + return os.getenv(self.remote_hosts_env, self.remote_hosts_default_value).split(',') + + # Yarn endpoint + yarn_endpoint_env = 'EG_YARN_ENDPOINT' + yarn_endpoint = Unicode(None, config=True, allow_none=True, + help="""The http url specifying the YARN Resource Manager. Note: If this value is NOT set, + the YARN library will use the files within the local HADOOP_CONFIG_DIR to determine the + active resource manager. (EG_YARN_ENDPOINT env var)""") + + @default('yarn_endpoint') + def yarn_endpoint_default(self): + return os.getenv(self.yarn_endpoint_env) + + # Alt Yarn endpoint + alt_yarn_endpoint_env = 'EG_ALT_YARN_ENDPOINT' + alt_yarn_endpoint = Unicode(None, config=True, allow_none=True, + help="""The http url specifying the alternate YARN Resource Manager. This value should + be set when YARN Resource Managers are configured for high availability. Note: If both + YARN endpoints are NOT set, the YARN library will use the files within the local + HADOOP_CONFIG_DIR to determine the active resource manager. + (EG_ALT_YARN_ENDPOINT env var)""") + + @default('alt_yarn_endpoint') + def alt_yarn_endpoint_default(self): + return os.getenv(self.alt_yarn_endpoint_env) + + yarn_endpoint_security_enabled_env = 'EG_YARN_ENDPOINT_SECURITY_ENABLED' + yarn_endpoint_security_enabled_default_value = False + yarn_endpoint_security_enabled = Bool(yarn_endpoint_security_enabled_default_value, config=True, + help="""Is YARN Kerberos/SPNEGO Security enabled (True/False). + (EG_YARN_ENDPOINT_SECURITY_ENABLED env var)""") + + @default('yarn_endpoint_security_enabled') + def yarn_endpoint_security_enabled_default(self): + return bool(os.getenv(self.yarn_endpoint_security_enabled_env, + self.yarn_endpoint_security_enabled_default_value)) + + # Conductor endpoint + conductor_endpoint_env = 'EG_CONDUCTOR_ENDPOINT' + conductor_endpoint_default_value = None + conductor_endpoint = Unicode(conductor_endpoint_default_value, + allow_none=True, + config=True, + help="""The http url for accessing the Conductor REST API. + (EG_CONDUCTOR_ENDPOINT env var)""") + + @default('conductor_endpoint') + def conductor_endpoint_default(self): + return os.getenv(self.conductor_endpoint_env, self.conductor_endpoint_default_value) + + _log_formatter_cls = LogFormatter # traitlet default is LevelFormatter + + @default('log_format') + def _default_log_format(self): + """override default log format to include milliseconds""" + return u"%(color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(end_color)s %(message)s" + + # Impersonation enabled + impersonation_enabled_env = 'EG_IMPERSONATION_ENABLED' + impersonation_enabled = Bool(False, config=True, + help="""Indicates whether impersonation will be performed during kernel launch. + (EG_IMPERSONATION_ENABLED env var)""") + + @default('impersonation_enabled') + def impersonation_enabled_default(self): + return bool(os.getenv(self.impersonation_enabled_env, 'false').lower() == 'true') + + # Unauthorized users + unauthorized_users_env = 'EG_UNAUTHORIZED_USERS' + unauthorized_users_default_value = 'root' + unauthorized_users = Set(default_value={unauthorized_users_default_value}, config=True, + help="""Comma-separated list of user names (e.g., ['root','admin']) against which + KERNEL_USERNAME will be compared. Any match (case-sensitive) will prevent the + kernel's launch and result in an HTTP 403 (Forbidden) error. + (EG_UNAUTHORIZED_USERS env var - non-bracketed, just comma-separated)""") + + @default('unauthorized_users') + def unauthorized_users_default(self): + return os.getenv(self.unauthorized_users_env, self.unauthorized_users_default_value).split(',') + + # Authorized users + authorized_users_env = 'EG_AUTHORIZED_USERS' + authorized_users = Set(config=True, + help="""Comma-separated list of user names (e.g., ['bob','alice']) against which + KERNEL_USERNAME will be compared. Any match (case-sensitive) will allow the kernel's + launch, otherwise an HTTP 403 (Forbidden) error will be raised. The set of unauthorized + users takes precedence. This option should be used carefully as it can dramatically limit + who can launch kernels. (EG_AUTHORIZED_USERS env var - non-bracketed, + just comma-separated)""") + + @default('authorized_users') + def authorized_users_default(self): + au_env = os.getenv(self.authorized_users_env) + return au_env.split(',') if au_env is not None else [] + + # Port range + port_range_env = 'EG_PORT_RANGE' + port_range_default_value = "0..0" + port_range = Unicode(port_range_default_value, config=True, + help="""Specifies the lower and upper port numbers from which ports are created. + The bounded values are separated by '..' (e.g., 33245..34245 specifies a range of 1000 ports + to be randomly selected). A range of zero (e.g., 33245..33245 or 0..0) disables port-range + enforcement. (EG_PORT_RANGE env var)""") + + @default('port_range') + def port_range_default(self): + return os.getenv(self.port_range_env, self.port_range_default_value) + + # Max Kernels per User + max_kernels_per_user_env = 'EG_MAX_KERNELS_PER_USER' + max_kernels_per_user_default_value = -1 + max_kernels_per_user = Integer(max_kernels_per_user_default_value, config=True, + help="""Specifies the maximum number of kernels a user can have active + simultaneously. A value of -1 disables enforcement. + (EG_MAX_KERNELS_PER_USER env var)""") + + @default('max_kernels_per_user') + def max_kernels_per_user_default(self): + return int(os.getenv(self.max_kernels_per_user_env, self.max_kernels_per_user_default_value)) + + ws_ping_interval_env = 'EG_WS_PING_INTERVAL_SECS' + ws_ping_interval_default_value = 30 + ws_ping_interval = Integer(ws_ping_interval_default_value, config=True, + help="""Specifies the ping interval(in seconds) that should be used by zmq port + associated withspawned kernels.Set this variable to 0 to disable ping mechanism. + (EG_WS_PING_INTERVAL_SECS env var)""") + + @default('ws_ping_interval') + def ws_ping_interval_default(self): + return int(os.getenv(self.ws_ping_interval_env, self.ws_ping_interval_default_value)) + + # Dynamic Update Interval + dynamic_config_interval_env = 'EG_DYNAMIC_CONFIG_INTERVAL' + dynamic_config_interval_default_value = 0 + dynamic_config_interval = Integer(dynamic_config_interval_default_value, min=0, config=True, + help="""Specifies the number of seconds configuration files are polled for + changes. A value of 0 or less disables dynamic config updates. + (EG_DYNAMIC_CONFIG_INTERVAL env var)""") + + @default('dynamic_config_interval') + def dynamic_config_interval_default(self): + return int(os.getenv(self.dynamic_config_interval_env, self.dynamic_config_interval_default_value)) + + @observe('dynamic_config_interval') + def dynamic_config_interval_changed(self, event): + prev_val = event['old'] + self.dynamic_config_interval = event['new'] + if self.dynamic_config_interval != prev_val: + # Values are different. Stop the current poller. If new value is > 0, start a poller. + if self.dynamic_config_poller: + self.dynamic_config_poller.stop() + self.dynamic_config_poller = None + + if self.dynamic_config_interval <= 0: + self.log.warning("Dynamic configuration updates have been disabled and cannot be re-enabled " + "without restarting Enterprise Gateway!") + # The interval has been changed, but still positive + elif prev_val > 0 and hasattr(self, "init_dynamic_configs"): + self.init_dynamic_configs() # Restart the poller + + dynamic_config_poller = None + + kernel_spec_manager = Instance("jupyter_client.kernelspec.KernelSpecManager", allow_none=True) + + kernel_spec_manager_class = Type( + default_value="jupyter_client.kernelspec.KernelSpecManager", + config=True, + help=""" + The kernel spec manager class to use. Must be a subclass + of `jupyter_client.kernelspec.KernelSpecManager`. + """ + ) + + kernel_manager_class = Type( + klass="notebook.services.kernels.kernelmanager.MappingKernelManager", + default_value="enterprise_gateway.services.kernels.remotemanager.RemoteMappingKernelManager", + config=True, + help=""" + The kernel manager class to use. Must be a subclass + of `notebook.services.kernels.MappingKernelManager`. + """ + ) + + kernel_session_manager_class = Type( + klass="enterprise_gateway.services.sessions.kernelsessionmanager.KernelSessionManager", + default_value="enterprise_gateway.services.sessions.kernelsessionmanager.FileKernelSessionManager", + config=True, + help=""" + The kernel session manager class to use. Must be a subclass + of `enterprise_gateway.services.sessions.KernelSessionManager`. + """ + ) diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index 475a6a323..622b2f0b8 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -12,9 +12,11 @@ from ipython_genutils.importstring import import_item from notebook.services.kernels.kernelmanager import MappingKernelManager from jupyter_client.ioloop.manager import IOLoopKernelManager +from traitlets import directional_link, log as traitlets_log from ..processproxies.processproxy import LocalProcessProxy, RemoteProcessProxy from ..sessions.kernelsessionmanager import KernelSessionManager +from enterprise_gateway.mixins import EnterpriseGatewayConfigMixin def get_process_proxy_config(kernelspec): @@ -44,6 +46,46 @@ def get_process_proxy_config(kernelspec): return {"class_name": "enterprise_gateway.services.processproxies.processproxy.LocalProcessProxy", "config": {}} +def new_kernel_id(**kwargs): + """ + This method provides a mechanism by which clients can specify a kernel's id. In this case + that mechanism is via the per-kernel environment variable: KERNEL_ID. If specified, its value + will be validated and returned, otherwise the result from the provided method is returned. + + NOTE: This method exists in jupyter_client.multikernelmanager.py for releases > 5.2.3. If you + find that this method is not getting invoked, then you likely need to update the version of + jupyter_client. The Enterprise Gateway dependency will be updated once new releases of + jupyter_client are more prevalent. + + Returns + ------- + kernel_id : str + The uuid string to associate with the new kernel + """ + log = kwargs.pop("log", None) or traitlets_log.get_logger() + kernel_id_fn = kwargs.pop("kernel_id_fn", None) or (lambda: unicode_type(uuid.uuid4())) + + env = kwargs.get('env') + if env and env.get('KERNEL_ID'): # If there's a KERNEL_ID in the env, check it out + # convert string back to UUID - validating string in the process. + str_kernel_id = env.get('KERNEL_ID') + try: + str_v4_kernel_id = str(uuid.UUID(str_kernel_id, version=4)) + if str_kernel_id != str_v4_kernel_id: # Given string is not uuid v4 compliant + raise ValueError("value is not uuid v4 compliant") + except ValueError as ve: + log.error("Invalid v4 UUID value detected in ['env']['KERNEL_ID']: '{}'! Error: {}". + format(str_kernel_id, ve)) + raise ve + # user-provided id is valid, use it + kernel_id = unicode_type(str_kernel_id) + log.debug("Using user-provided kernel_id: {}".format(kernel_id)) + else: + kernel_id = kernel_id_fn(**kwargs) + + return kernel_id + + class RemoteMappingKernelManager(MappingKernelManager): """Extends the MappingKernelManager with support for managing remote kernels via the process-proxy. """ @@ -161,44 +203,12 @@ def start_kernel_from_session(self, kernel_id, kernel_name, connection_info, pro return True def new_kernel_id(self, **kwargs): - """Determines the kernel_id to use for a new kernel. - - This method provides a mechanism by which clients can specify a kernel's id. In this case - that mechanism is via the per-kernel environment variable: KERNEL_ID. If specified, its value - will be validated and returned, otherwise the result from the superclass method is returned. - - NOTE: This method exists in jupyter_client.multikernelmanager.py for releases > 5.2.3. If you - find that this method is not getting invoked, then you likely need to update the version of - jupyter_client. The Enterprise Gateway dependency will be updated once new releases of - jupyter_client are more prevalent. - - Returns - ------- - kernel_id : str - The uuid string to associate with the new kernel - """ - env = kwargs.get('env') - if env and env.get('KERNEL_ID'): # If there's a KERNEL_ID in the env, check it out - # convert string back to UUID - validating string in the process. - str_kernel_id = env.get('KERNEL_ID') - try: - str_v4_kernel_id = str(uuid.UUID(str_kernel_id, version=4)) - if str_kernel_id != str_v4_kernel_id: # Given string is not uuid v4 compliant - raise ValueError("value is not uuid v4 compliant") - except ValueError as ve: - self.log.error("Invalid v4 UUID value detected in ['env']['KERNEL_ID']: '{}'! Error: {}". - format(str_kernel_id, ve)) - raise ve - # user-provided id is valid, use it - kernel_id = unicode_type(str_kernel_id) - self.log.debug("Using user-provided kernel_id: {}".format(kernel_id)) - else: - kernel_id = super(RemoteMappingKernelManager, self).new_kernel_id(**kwargs) + """Determines the kernel_id to use for a new kernel.""" - return kernel_id + return new_kernel_id(kernel_id_fn=super(RemoteMappingKernelManager, self).new_kernel_id, log=self.log) -class RemoteKernelManager(IOLoopKernelManager): +class RemoteKernelManager(EnterpriseGatewayConfigMixin, IOLoopKernelManager): """Extends the IOLoopKernelManager used by the MappingKernelManager. This class is responsible for detecting that a remote kernel is desired, then launching the @@ -211,7 +221,6 @@ def __init__(self, **kwargs): self.process_proxy = None self.response_address = None self.sigint_value = None - self.port_range = None self.kernel_id = None self.user_overrides = {} self.restarting = False # need to track whether we're in a restart situation or not @@ -224,6 +233,43 @@ def __init__(self, **kwargs): if hasattr(self, "cache_ports"): self.cache_ports = False + if not self.connection_file: + self.kernel_id = new_kernel_id(log=self.log) + + self._link_dependent_props() + + if self.kernel_spec_manager is None: + self.kernel_spec_manager = self.kernel_spec_manager_class( + parent=self, + ) + + def _link_dependent_props(self): + """ + Ensure that RemoteKernelManager, when used as part of an EnterpriseGatewayApp, + has certain necessary configuration stay in sync with the app's configuration. + + When RemoteKernelManager is used independently, this function is a no-op, and + default values or configuration set on this class is used. + """ + try: + eg_instance = self.parent.parent + except AttributeError: + return + dependent_props = ["authorized_users", + "unauthorized_users", + "port_range", + "impersonation_enabled", + "max_kernels_per_user", + "env_whitelist", + "env_process_whitelist", + "yarn_endpoint", + "alt_yarn_endpoint", + "yarn_endpoint_security_enabled", + "conductor_endpoint", + "remote_hosts" + ] + self._links = [directional_link((eg_instance, prop), (self, prop)) for prop in dependent_props] + def start_kernel(self, **kwargs): """Starts a kernel in a separate process. @@ -248,8 +294,8 @@ def _capture_user_overrides(self, **kwargs): env = kwargs.get('env', {}) self.user_overrides.update({key: value for key, value in env.items() if key.startswith('KERNEL_') or - key in self.parent.parent.env_process_whitelist or - key in self.parent.parent.env_whitelist}) + key in self.env_process_whitelist or + key in self.env_whitelist}) def format_kernel_cmd(self, extra_arguments=None): """ Replace templated args (e.g. {response_address}, {port_range}, or {kernel_id}). """ @@ -325,16 +371,16 @@ def restart_kernel(self, now=False, **kwargs): kernel. """ self.restarting = True - kernel_id = os.path.basename(self.connection_file).replace('kernel-', '').replace('.json', '') + kernel_id = self.kernel_id or os.path.basename(self.connection_file).replace('kernel-', '').replace('.json', '') # Check if this is a remote process proxy and if now = True. If so, check its connection count. If no # connections, shutdown else perform the restart. Note: auto-restart sets now=True, but handlers use # the default value (False). - if isinstance(self.process_proxy, RemoteProcessProxy) and now: - if self.parent._kernel_connections.get(kernel_id, 0) == 0: + if isinstance(self.process_proxy, RemoteProcessProxy) and now and self.mapping_kernel_manager: + if self.mapping_kernel_manager._kernel_connections.get(kernel_id, 0) == 0: self.log.warning("Remote kernel ({}) will not be automatically restarted since there are no " "clients connected at this time.".format(kernel_id)) # Use the parent mapping kernel manager so activity monitoring and culling is also shutdown - self.parent.shutdown_kernel(kernel_id, now=now) + self.mapping_kernel_manager.shutdown_kernel(kernel_id, now=now) return super(RemoteKernelManager, self).restart_kernel(now, **kwargs) if isinstance(self.process_proxy, RemoteProcessProxy): # for remote kernels... @@ -342,9 +388,11 @@ def restart_kernel(self, now=False, **kwargs): if self._activity_stream: self._activity_stream.close() self._activity_stream = None - self.parent.start_watching_activity(kernel_id) + if self.mapping_kernel_manager: + self.mapping_kernel_manager.start_watching_activity(kernel_id) # Refresh persisted state. - self.parent.parent.kernel_session_manager.refresh_session(kernel_id) + if self.kernel_session_manager: + self.kernel_session_manager.refresh_session(kernel_id) self.restarting = False def signal_kernel(self, signum): @@ -423,3 +471,27 @@ def _get_process_proxy(self): format(self.kernel_spec.display_name, process_proxy_class_name)) process_proxy_class = import_item(process_proxy_class_name) self.process_proxy = process_proxy_class(kernel_manager=self, proxy_config=process_proxy_cfg.get('config')) + + # When this class is used by an EnterpriseGatewayApp instance, it will be able to + # access the app's configuration using the traitlet parent chain. + # When it's used independently, it should fall back to safe defaults. + @property + def kernel_session_manager(self): + try: + return self.parent.parent.kernel_session_manager + except AttributeError: + return None + + @property + def cull_idle_timeout(self): + try: + return self.parent.cull_idle_timeout + except AttributeError: + return 0 + + @property + def mapping_kernel_manager(self): + try: + return self.parent + except AttributeError: + return None diff --git a/enterprise_gateway/services/processproxies/conductor.py b/enterprise_gateway/services/processproxies/conductor.py index 29eb9bd8d..5ded63f2b 100644 --- a/enterprise_gateway/services/processproxies/conductor.py +++ b/enterprise_gateway/services/processproxies/conductor.py @@ -32,7 +32,7 @@ def __init__(self, kernel_manager, proxy_config): self.env = None self.rest_credential = None self.conductor_endpoint = proxy_config.get('conductor_endpoint', - kernel_manager.parent.parent.conductor_endpoint) + kernel_manager.conductor_endpoint) def launch_process(self, kernel_cmd, **kwargs): """Launches the specified process within a Conductor cluster environment.""" diff --git a/enterprise_gateway/services/processproxies/distributed.py b/enterprise_gateway/services/processproxies/distributed.py index 65bc39cb3..bf61a1008 100644 --- a/enterprise_gateway/services/processproxies/distributed.py +++ b/enterprise_gateway/services/processproxies/distributed.py @@ -28,7 +28,7 @@ def __init__(self, kernel_manager, proxy_config): if proxy_config.get('remote_hosts'): self.hosts = proxy_config.get('remote_hosts').split(',') else: - self.hosts = kernel_manager.parent.parent.remote_hosts # from command line or env + self.hosts = kernel_manager.remote_hosts # from command line or env def launch_process(self, kernel_cmd, **kwargs): """Launches a kernel process on a selected host.""" diff --git a/enterprise_gateway/services/processproxies/processproxy.py b/enterprise_gateway/services/processproxies/processproxy.py index 2524cd1f9..9d8b4ef5a 100644 --- a/enterprise_gateway/services/processproxies/processproxy.py +++ b/enterprise_gateway/services/processproxies/processproxy.py @@ -116,9 +116,12 @@ def __init__(self, kernel_manager, proxy_config): # relaunch (see jupyter_client.manager.start_kernel(). self.kernel_manager.ip = '0.0.0.0' self.log = kernel_manager.log + # extract the kernel_id string from the connection file and set the KERNEL_ID environment variable - self.kernel_manager.kernel_id = os.path.basename(self.kernel_manager.connection_file). \ - replace('kernel-', '').replace('.json', '') + if self.kernel_manager.kernel_id is None: + self.kernel_manager.kernel_id = os.path.basename(self.kernel_manager.connection_file). \ + replace('kernel-', '').replace('.json', '') + self.kernel_id = self.kernel_manager.kernel_id self.kernel_launch_timeout = default_kernel_launch_timeout self.lower_port = 0 @@ -127,7 +130,7 @@ def __init__(self, kernel_manager, proxy_config): # Handle authorization sets... # Take union of unauthorized users... - self.unauthorized_users = self.kernel_manager.parent.parent.unauthorized_users + self.unauthorized_users = self.kernel_manager.unauthorized_users if proxy_config.get('unauthorized_users'): self.unauthorized_users = self.unauthorized_users.union(proxy_config.get('unauthorized_users').split(',')) @@ -135,7 +138,7 @@ def __init__(self, kernel_manager, proxy_config): if proxy_config.get('authorized_users'): self.authorized_users = set(proxy_config.get('authorized_users').split(',')) else: - self.authorized_users = self.kernel_manager.parent.parent.authorized_users + self.authorized_users = self.kernel_manager.authorized_users # Represents the local process (from popen) if applicable. Note that we could have local_proc = None even when # the subclass is a LocalProcessProxy (or YarnProcessProxy). This will happen if EG is restarted and the @@ -432,7 +435,7 @@ def _enforce_authorization(self, **kwargs): # Although it may already be set in the env, just override in case it was only set via command line or config # Convert to string since execve() (called by Popen in base classes) wants string values. - env_dict['EG_IMPERSONATION_ENABLED'] = str(self.kernel_manager.parent.parent.impersonation_enabled) + env_dict['EG_IMPERSONATION_ENABLED'] = str(self.kernel_manager.impersonation_enabled) # Ensure KERNEL_USERNAME is set kernel_username = KernelSessionManager.get_kernel_username(**kwargs) @@ -460,16 +463,19 @@ def _enforce_limits(self, **kwargs): # if kernels-per-user is configured, ensure that this next kernel is still within the limit. If this # is due to a restart, skip enforcement since we're re-using that id. - max_kernels_per_user = self.kernel_manager.parent.parent.max_kernels_per_user + max_kernels_per_user = self.kernel_manager.max_kernels_per_user if max_kernels_per_user >= 0 and not self.kernel_manager.restarting: env_dict = kwargs.get('env') username = env_dict['KERNEL_USERNAME'] - current_kernel_count = self.kernel_manager.parent.parent.kernel_session_manager.active_sessions(username) - if current_kernel_count >= max_kernels_per_user: - error_message = "A max kernels per user limit has been set to {} and user '{}' currently has {} " \ - "active {}.".format(max_kernels_per_user, username, current_kernel_count, - "kernel" if max_kernels_per_user == 1 else "kernels") - self.log_and_raise(http_status_code=403, reason=error_message) + + # Per user limits are only meaningful if a session manager exists. + if self.kernel_manager.kernel_session_manager: + current_kernel_count = self.kernel_manager.kernel_session_manager.active_sessions(username) + if current_kernel_count >= max_kernels_per_user: + error_message = "A max kernels per user limit has been set to {} and user '{}' currently has {} " \ + "active {}.".format(max_kernels_per_user, username, current_kernel_count, + "kernel" if max_kernels_per_user == 1 else "kernels") + self.log_and_raise(http_status_code=403, reason=error_message) def get_process_info(self): """Captures the base information necessary for kernel persistence relative to process proxies. @@ -494,7 +500,7 @@ def load_process_info(self, process_info): def _validate_port_range(self, proxy_config): """Validates the port range configuration option to ensure appropriate values.""" # Let port_range override global value - if set on kernelspec... - port_range = self.kernel_manager.parent.parent.port_range + port_range = self.kernel_manager.port_range if proxy_config.get('port_range'): port_range = proxy_config.get('port_range') @@ -797,7 +803,7 @@ def _spawn_ssh_tunnel(self, kernel_channel, local_port, remote_port, remote_ip, return pexpect.spawn(cmd, env=os.environ.copy().pop('SSH_ASKPASS', None)) def _get_keep_alive_interval(self, kernel_channel): - cull_idle_timeout = self.kernel_manager.parent.cull_idle_timeout + cull_idle_timeout = self.kernel_manager.cull_idle_timeout if (kernel_channel == KernelChannel.COMMUNICATION or kernel_channel == KernelChannel.CONTROL or diff --git a/enterprise_gateway/services/processproxies/yarn.py b/enterprise_gateway/services/processproxies/yarn.py index bb0731c4d..80a83d98a 100644 --- a/enterprise_gateway/services/processproxies/yarn.py +++ b/enterprise_gateway/services/processproxies/yarn.py @@ -41,14 +41,14 @@ def __init__(self, kernel_manager, proxy_config): self.yarn_endpoint \ = proxy_config.get('yarn_endpoint', - kernel_manager.parent.parent.yarn_endpoint) + kernel_manager.yarn_endpoint) self.alt_yarn_endpoint \ = proxy_config.get('alt_yarn_endpoint', - kernel_manager.parent.parent.alt_yarn_endpoint) + kernel_manager.alt_yarn_endpoint) self.yarn_endpoint_security_enabled \ = proxy_config.get('yarn_endpoint_security_enabled', - kernel_manager.parent.parent.yarn_endpoint_security_enabled) + kernel_manager.yarn_endpoint_security_enabled) endpoints = None if self.yarn_endpoint: diff --git a/enterprise_gateway/tests/test_enterprise_gateway.py b/enterprise_gateway/tests/test_enterprise_gateway.py index 72089da28..837b76ec7 100644 --- a/enterprise_gateway/tests/test_enterprise_gateway.py +++ b/enterprise_gateway/tests/test_enterprise_gateway.py @@ -26,8 +26,8 @@ def setUp(self): def test_max_kernels_per_user(self): """Number of kernels should be limited per user.""" - app = self.get_app() - app.settings['kernel_manager'].parent.max_kernels_per_user = 1 + self.get_app() + self.app.max_kernels_per_user = 1 # Request a kernel for bob bob_response = yield self.http_client.fetch( @@ -74,9 +74,9 @@ def test_max_kernels_per_user(self): def test_authorization(self): """Verify authorized users can start a kernel, unauthorized users cannot""" - app = self.get_app() - app.settings['kernel_manager'].parent.authorized_users = {'bob', 'alice', 'bad_guy'} - app.settings['kernel_manager'].parent.unauthorized_users = {'bad_guy'} + self.get_app() + self.app.authorized_users = {'bob', 'alice', 'bad_guy'} + self.app.unauthorized_users = {'bad_guy'} # Request a kernel for alice alice_response = yield self.http_client.fetch( @@ -100,7 +100,7 @@ def test_port_range(self): """Verify port-range behaviors are correct""" app = self.get_app() - app.settings['kernel_manager'].parent.port_range = "10000..10999" # range too small + self.app.port_range = "10000..10999" # range too small # Request a kernel for alice - 500 expected alice_response = yield self.http_client.fetch( self.get_url('/api/kernels'), @@ -110,7 +110,7 @@ def test_port_range(self): ) self.assertEqual(alice_response.code, 500) - app.settings['kernel_manager'].parent.port_range = "100..11099" # invalid lower port + self.app.port_range = "100..11099" # invalid lower port # Request a kernel for alice - 500 expected alice_response = yield self.http_client.fetch( self.get_url('/api/kernels'), @@ -120,7 +120,7 @@ def test_port_range(self): ) self.assertEqual(alice_response.code, 500) - app.settings['kernel_manager'].parent.port_range = "10000..65537" # invalid upper port + self.app.port_range = "10000..65537" # invalid upper port # Request a kernel for alice - 500 expected alice_response = yield self.http_client.fetch( self.get_url('/api/kernels'), @@ -130,7 +130,7 @@ def test_port_range(self): ) self.assertEqual(alice_response.code, 500) - app.settings['kernel_manager'].parent.port_range = "30000..31000" # valid range + self.app.port_range = "30000..31000" # valid range # Request a kernel for alice - 201 expected alice_response = yield self.http_client.fetch( self.get_url('/api/kernels'),