From cd13de32c8a4ae1024d7ed5452fcf639883f8d21 Mon Sep 17 00:00:00 2001 From: adam Date: Mon, 26 Jul 2021 13:32:18 -0700 Subject: [PATCH 1/9] WIP --- requirements/main.txt | 34 +++---- src/mobu/business/jupyterpythonloop.py | 31 +++--- src/mobu/business/notebookrunner.py | 37 ++++--- src/mobu/jupyterclient.py | 135 +++++++++++++++++-------- 4 files changed, 147 insertions(+), 90 deletions(-) diff --git a/requirements/main.txt b/requirements/main.txt index fc132230..17473102 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -59,23 +59,23 @@ asgiref==3.4.1 \ --hash=sha256:4ef1ab46b484e3c706329cedeff284a5d40824200638503f5768edb6de7d58e9 \ --hash=sha256:ffc141aa908e6f175673e7b1b3b7af4fdb0ecb738fc5c8b88f69f055c2415214 # via uvicorn -astropy==4.2.1 \ - --hash=sha256:009a26f795adad1f0b26ba3a434e5be9cfa82cb629ba87c0547b567bad6e1695 \ - --hash=sha256:03428ca1baa4fba99e37d3767c12c038c456a27176bcb8f407f9b2b0743ef8ee \ - --hash=sha256:09965d5e8ffd7e96e7fcc596b631f366cf729df75efb792918cb6637acf1ad4e \ - --hash=sha256:12c76c119f7a0a8fcf0e72269be9faa88319f12e4ba346180d910e58fda36bf2 \ - --hash=sha256:2035ca521d86c88ea6d8da07f977a9727f0d7d8f85b5c287558c1891f885e548 \ - --hash=sha256:223610cc612aaddac890fefd9e20dc6a39c92ef01692354e2bcb273c79fb8842 \ - --hash=sha256:2d4d328892c7b09a23361f44182cf89be3dadaec60a270bd4fe754f3829052a4 \ - --hash=sha256:2ff194e15b03afd575f278e2187b71d7ee9d85f302356050b2257b6c4788f1cc \ - --hash=sha256:3d5516ba20e6cbc208250dd8f414243839cc40e957616e3f336a517967ee34d0 \ - --hash=sha256:5be2f01d1b35202c0989f4502d25fe850ae5e891acbd3be107eaf6eeab81826d \ - --hash=sha256:6d8c8bc1eef048ad873395d2a620b9b5f308bef9a508f542e6dc3b33fbfbe66d \ - --hash=sha256:a1f707283822c2f7df97d9de151c29d49ed9cc0bf3ae952f91012d7a4c5872a7 \ - --hash=sha256:a6164013de3732a67a5a1a2743565f5aaef0d895ce33d5aef482d88b05318893 \ - --hash=sha256:c327cfdede8d5fce1224153b8d3a060226161ddc2e1b2170f076aaddb4953965 \ - --hash=sha256:db694c10eb3cc10068859ba1eab30b38b7e821dbfff142960c5a99c4af059747 \ - --hash=sha256:ed483e472241153daec45f4b0c318c2c63d9f47305b78e6e63d32fc388c18427 +astropy==4.3 \ + --hash=sha256:134c894381be801d7c2992b997d7f57ae7ae41a36955e883186df8bff5839c7b \ + --hash=sha256:2361972fc3bef5e37f9f8d3237634511428e93f17d42fbd13ee965689bd47350 \ + --hash=sha256:32c9752bec83da4905cdf55e6be586941be06acca671fbfda0bc1f2ef94aea79 \ + --hash=sha256:33501223e48224177eb72c7c746b000cf9a40473cb9bcbcb8c9ac3256c8a6f6d \ + --hash=sha256:6c1d1f0896e5d33851442865aa0ec4a2643dfaecb8853e9f4dca6d0541798744 \ + --hash=sha256:7f09d43326a3d8cb3098aad61fe6353497d6583abf5d1abe34076180c28b00a0 \ + --hash=sha256:819bfaec7a9d0f3968006d0528d9c3af5affd6d36644b7ff2b0c0b0435c19cfe \ + --hash=sha256:83a98369265a5f90acca884ef046e791e2a6d17d51871c7016ecb954c7b34adb \ + --hash=sha256:859aee27f0b4cee8ea9d8d35cb186cfba58597e618931e277054c483f58b5265 \ + --hash=sha256:8e72ec80c1846b87267616a7739ba7639acd32c59880e113994b771bbb274e1e \ + --hash=sha256:979ea0469eaebc55be444f47be73dbbe7cc1d60679bc066a860ce796840c2e98 \ + --hash=sha256:a9d0d202e5b35fbf602470175f0461306f7b0b373e59aaa242e220542ef572d2 \ + --hash=sha256:c4eeceb341d07f4ce53ca387a48f399f615a59fd8d67be711d886066c1e64f20 \ + --hash=sha256:cdf785c4479eec741df612ab05681c8eb0ed99d5e915e3376ff874cccba1e35e \ + --hash=sha256:fb2cf1e8dca429fdefabe9c963ad8e99d639b76551d4f213838862adaf213b7a \ + --hash=sha256:fcb02b46b5266d80086f66ca0042f69e34bc45546a8aca9ff62ca716d9b4c08e # via pyvo async-timeout==3.0.1 \ --hash=sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f \ diff --git a/src/mobu/business/jupyterpythonloop.py b/src/mobu/business/jupyterpythonloop.py index b63cb641..2e0d9cd6 100644 --- a/src/mobu/business/jupyterpythonloop.py +++ b/src/mobu/business/jupyterpythonloop.py @@ -6,6 +6,7 @@ import asyncio +from ..jupyterclient import JupyterLabSession from .jupyterloginloop import JupyterLoginLoop __all__ = ["JupyterPythonLoop"] @@ -18,21 +19,23 @@ class JupyterPythonLoop(JupyterLoginLoop): """Run simple Python code in a loop inside a lab kernel.""" async def lab_business(self) -> None: - kernel = await self.create_kernel() + session = await self.create_session() for count in range(MAX_EXECUTIONS): - await self.execute_code(kernel, "print(2+2, end='')") + await self.execute_code(session, "2+2") await self.lab_wait() - await self.delete_kernel(kernel) + await self.delete_session(session) - async def create_kernel(self) -> str: - self.logger.info("create_kernel") - with self.timings.start("create_kernel"): - kernel = await self._client.create_kernel() - return kernel + async def create_session(self) -> JupyterLabSession: + self.logger.info("create_session") + with self.timings.start("create_session"): + session = await self._client.create_labsession() + return session - async def execute_code(self, kernel: str, code: str) -> None: + async def execute_code( + self, session: JupyterLabSession, code: str + ) -> None: with self.timings.start("execute_code", {"code": code}) as sw: - reply = await self._client.run_python(kernel, code) + reply = await self._client.run_python(session, code) sw.annotation["result"] = reply self.logger.info(f"{code} -> {reply}") @@ -40,7 +43,7 @@ async def lab_wait(self) -> None: with self.timings.start("lab_wait"): await asyncio.sleep(SLEEP_TIME) - async def delete_kernel(self, kernel: str) -> None: - self.logger.info("delete_kernel") - with self.timings.start("delete_kernel"): - await self._client.delete_kernel(kernel) + async def delete_session(self, session: JupyterLabSession) -> None: + self.logger.info("delete_session") + with self.timings.start("delete_session"): + await self._client.delete_labsession(session) diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index f9ddc784..d2339886 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -16,7 +16,7 @@ import git -from ..jupyterclient import NotebookException +from ..jupyterclient import JupyterLabSession, NotebookException from ..models.business import BusinessData from .jupyterloginloop import JupyterLoginLoop @@ -100,7 +100,7 @@ async def lab_business(self) -> None: await self.ensure_lab() await self.lab_settle() - kernel = await self.create_kernel() + session = await self.create_session() self.logger.info(f"Starting notebook: {self.notebook.name}") cells = self.read_notebook(self.notebook.name, self.notebook.path) @@ -113,10 +113,10 @@ async def lab_business(self) -> None: for cell in cells: self.running_code = "".join(cell["source"]) - await self.execute_code(kernel, self.running_code) + await self.execute_code(session, self.running_code) self.running_code = None - await self.delete_kernel(kernel) + await self.delete_session(session) self.logger.info(f"Success running notebook: {self.notebook.name}") async def lab_settle(self) -> None: @@ -129,23 +129,30 @@ def read_notebook(self, name: str, path: str) -> List[Dict[str, Any]]: cells = json.loads(notebook_text)["cells"] return [c for c in cells if c["cell_type"] == "code"] - async def create_kernel(self) -> str: - self.logger.info("create_kernel") - with self.timings.start("create_kernel"): - kernel = await self._client.create_kernel() - return kernel + async def create_session(self) -> JupyterLabSession: + self.logger.info("create_session") + notebook_name = "" + if self.notebook: + notebook_name = self.notebook.name + with self.timings.start("create_session"): + session = await self._client.create_labsession( + notebook_name=notebook_name, + ) + return session - async def execute_code(self, kernel: str, code: str) -> None: + async def execute_code( + self, session: JupyterLabSession, code: str + ) -> None: self.logger.info("Executing:\n%s\n", code) with self.timings.start("run_code", {"code": code}) as sw: - reply = await self._client.run_python(kernel, code) + reply = await self._client.run_python(session, code) sw.annotation["result"] = reply self.logger.info(f"Result:\n{reply}\n") - async def delete_kernel(self, kernel: str) -> None: - self.logger.info(f"Deleting kernel {kernel}") - with self.timings.start("delete_kernel"): - await self._client.delete_kernel(kernel) + async def delete_session(self, session: JupyterLabSession) -> None: + self.logger.info(f"Deleting session {session}") + with self.timings.start("delete_session"): + await self._client.delete_labsession(session) def dump(self) -> BusinessData: data = super().dump() diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index f23d9c64..fd77ab8f 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -10,17 +10,23 @@ import random import re import string +from dataclasses import dataclass from http.cookies import BaseCookie from typing import TYPE_CHECKING from uuid import uuid4 -from aiohttp import ClientResponse, ClientSession, TCPConnector +from aiohttp import ( + ClientResponse, + ClientSession, + ClientWebSocketResponse, + TCPConnector, +) from .config import config from .exceptions import NotebookException if TYPE_CHECKING: - from typing import Any + from typing import Any, Optional from aiohttp.client import _RequestContextManager, _WSRequestContextManager from structlog import BoundLogger @@ -31,6 +37,16 @@ __all__ = ["JupyterClient"] +@dataclass(frozen=True) +class JupyterLabSession: + """This holds the information a client needs to talk to the Lab in order + to execute code.""" + + session_id: str = "" + kernel_id: str = "" + websocket: Optional[ClientWebSocketResponse] = None + + class JupyterClientSession: """Wrapper around `aiohttp.ClientSession` using token authentication. @@ -212,42 +228,75 @@ async def delete_lab(self) -> None: if r.status not in [200, 202, 204]: await self._raise_error("Error deleting lab", r) - async def create_kernel(self, kernel_name: str = "LSST") -> str: - kernel_url = ( - self.jupyter_url + f"user/{self.user.username}/api/kernels" + async def create_labsession( + self, kernel_name: str = "LSST", notebook_name: str = "" + ) -> JupyterLabSession: + session_url = ( + self.jupyter_url + f"user/{self.user.username}/api/sessions" ) - body = {"name": kernel_name} + session_type = "console" + if notebook_name != "": + session_type = "notebook" + body = { + "kernel": {"name": kernel_name}, + "name": notebook_name or "(no notebook)", + "path": uuid4().hex, + "type": session_type, + } - async with self.session.post(kernel_url, json=body) as r: + async with self.session.post(session_url, json=body) as r: if r.status != 201: - await self._raise_error("Error creating kernel", r) + await self._raise_error("Error creating session", r) response = await r.json() - return response["id"] + session_id = response["id"] + kernel_id = response["kernel"]["id"] + ws = await self._websocket_connect(kernel_id) + labsession = JupyterLabSession( + session_id=session_id, kernel_id=kernel_id, websocket=ws + ) + self.log.info(f"Created JupyterLabSession {labsession}") + return labsession + + async def _websocket_connect( + self, kernel_id: str + ) -> Optional[ClientWebSocketResponse]: + # It's Optional for easier mocking: we will mock out this call in + # the test suite rather than trying to mock the WebSocket itself. + channels_url = ( + self.jupyter_url + + f"user/{self.user.username}/api/kernels/" + + f"{kernel_id}/channels" + ) + self.log.info(f"Attempting WebSocket connection to {channels_url}") + return await self.session.ws_connect(channels_url) - async def delete_kernel(self, kernel_id: str) -> None: - kernel_url = ( + async def delete_labsession(self, session: JupyterLabSession) -> None: + session_url = ( self.jupyter_url - + f"user/{self.user.username}/api/kernels/{kernel_id}" + + f"user/{self.user.username}/api/kernels/{session.session_id}" ) - async with self.session.delete(kernel_url, raise_for_status=True) as r: + async with self.session.delete( + session_url, raise_for_status=True + ) as r: if r.status != 204: - self.log.warning(f"Delete kernel {kernel_id}: {r}") + self.log.warning(f"Delete session {session}: {r}") + if session.websocket is not None: + await session.websocket.close() return - async def run_python(self, kernel_id: str, code: str) -> str: - kernel_url = ( - self.jupyter_url - + f"user/{self.user.username}/api/kernels/{kernel_id}/channels" - ) + async def run_python(self, session: JupyterLabSession, code: str) -> str: + if not session.websocket: + self.log.error("Cannot run_python without a websocket!") + raise Exception("No WebSocket for code execution: {session}") msg_id = uuid4().hex msg = { "header": { - "username": "", + "username": self.user.username, "version": "5.0", - "session": "", + "session": session.session_id, "msg_id": msg_id, "msg_type": "execute_request", }, @@ -264,29 +313,27 @@ async def run_python(self, kernel_id: str, code: str) -> str: "buffers": {}, } - async with self.session.ws_connect(kernel_url) as ws: - await ws.send_json(msg) - - while True: - r = await ws.receive_json() - self.log.debug(f"Recieved kernel message: {r}") - msg_type = r["msg_type"] - if msg_type == "error": - error_message = "".join(r["content"]["traceback"]) - raise NotebookException(self._ansi_escape(error_message)) - elif ( - msg_type == "stream" - and msg_id == r["parent_header"]["msg_id"] - ): - return r["content"]["text"] - elif msg_type == "execute_reply": - status = r["content"]["status"] - if status == "ok": - return "" - else: - raise NotebookException( - f"Error content status is {status}" - ) + await session.websocket.send_json(msg) + + while True: + r = await session.websocket.receive_json() + self.log.debug(f"Recieved kernel message: {r}") + msg_type = r["msg_type"] + if msg_type == "error": + error_message = "".join(r["content"]["traceback"]) + raise NotebookException(self._ansi_escape(error_message)) + elif ( + msg_type == "stream" and msg_id == r["parent_header"]["msg_id"] + ): + return r["content"]["text"] + elif msg_type == "execute_reply": + status = r["content"]["status"] + if status == "ok": + return "" + else: + raise NotebookException( + f"Error content status is {status}" + ) async def _raise_error(self, msg: str, r: ClientResponse) -> None: raise Exception(f"{msg}: {r.status} {r.url}: {r.headers}") From 62376d6d0bc163a71710ff68c9506c72a153b54b Mon Sep 17 00:00:00 2001 From: adam Date: Mon, 26 Jul 2021 16:19:10 -0700 Subject: [PATCH 2/9] Fix tests --- src/mobu/jupyterclient.py | 2 +- tests/conftest.py | 6 ++++- tests/support/jupyterhub.py | 45 ++++++++++++++++++++++++------------- 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index fd77ab8f..5dc6100d 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -274,7 +274,7 @@ async def _websocket_connect( async def delete_labsession(self, session: JupyterLabSession) -> None: session_url = ( self.jupyter_url - + f"user/{self.user.username}/api/kernels/{session.session_id}" + + f"user/{self.user.username}/api/sessions/{session.session_id}" ) async with self.session.delete( session_url, raise_for_status=True diff --git a/tests/conftest.py b/tests/conftest.py index e2c1c903..9aa1d64c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -67,7 +67,11 @@ def jupyterhub(mock_aioresponses: aioresponses) -> Iterator[None]: # test JupyterClient.run_python. For now, just mock it out entirely. with patch.object(JupyterClient, "run_python") as mock: mock.return_value = "4" - yield + # Same problem, but up a layer now that we're using sessions and + # reusing the websocket. + with patch.object(JupyterClient, "_websocket_connect") as mock2: + mock2.return_value = None + yield @pytest.fixture diff --git a/tests/support/jupyterhub.py b/tests/support/jupyterhub.py index 899159d0..b381a8e5 100644 --- a/tests/support/jupyterhub.py +++ b/tests/support/jupyterhub.py @@ -11,6 +11,7 @@ from aioresponses import CallbackResult from mobu.config import config +from mobu.jupyterclient import JupyterLabSession if TYPE_CHECKING: from re import Pattern @@ -43,7 +44,7 @@ class MockJupyterHub: """ def __init__(self) -> None: - self.kernels: Dict[str, str] = {} + self.sessions: Dict[str, Any] = {} self.state: Dict[str, JupyterHubState] = {} def login(self, url: str, **kwargs: Any) -> CallbackResult: @@ -97,24 +98,36 @@ def delete_lab(self, url: str, **kwargs: Any) -> CallbackResult: self.state[user] = JupyterHubState.LOGGED_OUT return CallbackResult(status=202) - def create_kernel(self, url: str, **kwargs: Any) -> CallbackResult: + def create_session(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - assert str(url).endswith(f"/user/{user}/api/kernels") - assert user not in self.kernels + assert str(url).endswith(f"/user/{user}/api/sessions") + assert user not in self.sessions state = self.state.get(user, JupyterHubState.LOGGED_OUT) assert state == JupyterHubState.LAB_RUNNING - assert kwargs["json"] == {"name": "LSST"} - kernel = uuid4().hex - self.kernels[user] = kernel - return CallbackResult(status=201, payload={"id": kernel}) + assert kwargs["json"]["kernel"]["name"] == "LSST" + assert kwargs["json"]["name"] == "(no notebook)" + assert kwargs["json"]["type"] == "console" + session = JupyterLabSession( + session_id=uuid4().hex, kernel_id=uuid4().hex, websocket=None + ) + self.sessions[user] = session + return CallbackResult( + status=201, + payload={ + "id": session.session_id, + "kernel": {"id": session.kernel_id}, + }, + ) - def delete_kernel(self, url: str, **kwargs: Any) -> CallbackResult: + def delete_session(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - kernel = self.kernels[user] - assert str(url).endswith(f"/user/{user}/api/kernels/{kernel}") + session = self.sessions[user] + assert str(url).endswith( + f"/user/{user}/api/sessions/{session.session_id}" + ) state = self.state.get(user, JupyterHubState.LOGGED_OUT) assert state == JupyterHubState.LAB_RUNNING - del self.kernels[user] + del self.sessions[user] return CallbackResult(status=204) @staticmethod @@ -151,12 +164,12 @@ def mock_jupyterhub(mocked: aioresponses) -> None: repeat=True, ) mocked.post( - _url("user/[^/]+/api/kernels", regex=True), - callback=mock.create_kernel, + _url("user/[^/]+/api/sessions", regex=True), + callback=mock.create_session, repeat=True, ) mocked.delete( - _url("user/[^/]+/api/kernels/[^/]+$", regex=True), - callback=mock.delete_kernel, + _url("user/[^/]+/api/sessions/[^/]+$", regex=True), + callback=mock.delete_session, repeat=True, ) From 952c33e43828290d265a5ff6c1108526cd572888 Mon Sep 17 00:00:00 2001 From: adam Date: Mon, 26 Jul 2021 16:42:47 -0700 Subject: [PATCH 3/9] jupyterhub->jupyter in test framework (since it also does lab) --- tests/autostart_test.py | 2 +- tests/business/jupyterloginloop_test.py | 2 +- tests/business/jupyterpythonloop_test.py | 2 +- tests/conftest.py | 8 +-- tests/handlers/flock_test.py | 6 +-- tests/monkeyflocker_test.py | 4 +- tests/support/{jupyterhub.py => jupyter.py} | 58 ++++++++++----------- 7 files changed, 41 insertions(+), 41 deletions(-) rename tests/support/{jupyterhub.py => jupyter.py} (77%) diff --git a/tests/autostart_test.py b/tests/autostart_test.py index 57d0c961..04828d4d 100644 --- a/tests/autostart_test.py +++ b/tests/autostart_test.py @@ -47,7 +47,7 @@ @pytest.fixture(autouse=True) def configure_autostart( - tmp_path: Path, jupyterhub: None, mock_aioresponses: aioresponses + tmp_path: Path, jupyter: None, mock_aioresponses: aioresponses ) -> Iterator[None]: """Set up the autostart configuration.""" mock_gafaelfawr(mock_aioresponses) diff --git a/tests/business/jupyterloginloop_test.py b/tests/business/jupyterloginloop_test.py index 130273f5..4695826a 100644 --- a/tests/business/jupyterloginloop_test.py +++ b/tests/business/jupyterloginloop_test.py @@ -17,7 +17,7 @@ @pytest.mark.asyncio async def test_run( - client: AsyncClient, jupyterhub: None, mock_aioresponses: aioresponses + client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) diff --git a/tests/business/jupyterpythonloop_test.py b/tests/business/jupyterpythonloop_test.py index 5eea24f7..48fd44f9 100644 --- a/tests/business/jupyterpythonloop_test.py +++ b/tests/business/jupyterpythonloop_test.py @@ -17,7 +17,7 @@ @pytest.mark.asyncio async def test_run( - client: AsyncClient, jupyterhub: None, mock_aioresponses: aioresponses + client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) diff --git a/tests/conftest.py b/tests/conftest.py index 9aa1d64c..2961785d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ from mobu.config import config from mobu.jupyterclient import JupyterClient from tests.support.gafaelfawr import make_gafaelfawr_token -from tests.support.jupyterhub import mock_jupyterhub +from tests.support.jupyter import mock_jupyter if TYPE_CHECKING: from typing import AsyncIterator, Iterator @@ -59,9 +59,9 @@ async def client(app: FastAPI) -> AsyncIterator[AsyncClient]: @pytest.fixture -def jupyterhub(mock_aioresponses: aioresponses) -> Iterator[None]: - """Mock out JupyterHub.""" - mock_jupyterhub(mock_aioresponses) +def jupyter(mock_aioresponses: aioresponses) -> Iterator[None]: + """Mock out JupyterHub/Lab.""" + mock_jupyter(mock_aioresponses) # aioresponses has no mechanism to mock ws_connect, so we can't properly # test JupyterClient.run_python. For now, just mock it out entirely. diff --git a/tests/handlers/flock_test.py b/tests/handlers/flock_test.py index 3fb8ff0d..4878fc7e 100644 --- a/tests/handlers/flock_test.py +++ b/tests/handlers/flock_test.py @@ -18,7 +18,7 @@ @pytest.mark.asyncio async def test_start_stop( - client: AsyncClient, jupyterhub: None, mock_aioresponses: aioresponses + client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) @@ -116,7 +116,7 @@ async def test_start_stop( @pytest.mark.asyncio async def test_user_list( - client: AsyncClient, jupyterhub: None, mock_aioresponses: aioresponses + client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) @@ -195,7 +195,7 @@ async def test_user_list( @pytest.mark.asyncio async def test_errors( - client: AsyncClient, jupyterhub: None, mock_aioresponses: aioresponses + client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) diff --git a/tests/monkeyflocker_test.py b/tests/monkeyflocker_test.py index e1c51edc..5d3073d1 100644 --- a/tests/monkeyflocker_test.py +++ b/tests/monkeyflocker_test.py @@ -30,7 +30,7 @@ from mobu.config import config from mobu.main import app from tests.support.gafaelfawr import make_gafaelfawr_token, mock_gafaelfawr -from tests.support.jupyterhub import mock_jupyterhub +from tests.support.jupyter import mock_jupyter @app.on_event("startup") @@ -39,7 +39,7 @@ async def startup_event() -> None: mocked = aioresponses() mocked.start() mock_gafaelfawr(mocked) - mock_jupyterhub(mocked) + mock_jupyter(mocked) """ FLOCK_CONFIG = """ diff --git a/tests/support/jupyterhub.py b/tests/support/jupyter.py similarity index 77% rename from tests/support/jupyterhub.py rename to tests/support/jupyter.py index b381a8e5..7f132cf1 100644 --- a/tests/support/jupyterhub.py +++ b/tests/support/jupyter.py @@ -1,4 +1,4 @@ -"""A mock JupyterHub for tests.""" +"""A mock JupyterHub/Lab for tests.""" from __future__ import annotations @@ -20,7 +20,7 @@ from aioresponses import aioresponses -class JupyterHubState(Enum): +class JupyterState(Enum): LOGGED_OUT = "logged out" LOGGED_IN = "logged in" SPAWN_PENDING = "spawn pending" @@ -28,7 +28,7 @@ class JupyterHubState(Enum): def _url(route: str, regex: bool = False) -> Union[str, Pattern[str]]: - """Construct a URL for JupyterHub.""" + """Construct a URL for JupyterHub/Proxy.""" if not regex: return f"{config.environment_url}/nb/{route}" @@ -36,40 +36,40 @@ def _url(route: str, regex: bool = False) -> Union[str, Pattern[str]]: return re.compile(prefix + route) -class MockJupyterHub: - """A mock JupyterHub state machine. +class MockJupyter: + """A mock Jupyter state machine. This should be invoked via mocked HTTP calls so that tests can simulate - making REST calls to the real JupyterHub. + making REST calls to the real JupyterHub/Lab. """ def __init__(self) -> None: self.sessions: Dict[str, Any] = {} - self.state: Dict[str, JupyterHubState] = {} + self.state: Dict[str, JupyterState] = {} def login(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - self.state[user] = JupyterHubState.LOGGED_IN + self.state[user] = JupyterState.LOGGED_IN return CallbackResult(status=200) def hub(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - state = self.state.get(user, JupyterHubState.LOGGED_OUT) - if state == JupyterHubState.LOGGED_OUT: + state = self.state.get(user, JupyterState.LOGGED_OUT) + if state == JupyterState.LOGGED_OUT: redirect_to = _url("hub/login") - elif state == JupyterHubState.LOGGED_IN: + elif state == JupyterState.LOGGED_IN: redirect_to = _url("hub/spawn") - elif state == JupyterHubState.SPAWN_PENDING: + elif state == JupyterState.SPAWN_PENDING: redirect_to = _url(f"hub/spawn-pending/{user}") - elif state == JupyterHubState.LAB_RUNNING: + elif state == JupyterState.LAB_RUNNING: redirect_to = _url(f"hub/spawn-pending/{user}") return CallbackResult(status=307, headers={"Location": redirect_to}) def spawn(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - state = self.state.get(user, JupyterHubState.LOGGED_OUT) - assert state == JupyterHubState.LOGGED_IN - self.state[user] = JupyterHubState.SPAWN_PENDING + state = self.state.get(user, JupyterState.LOGGED_OUT) + assert state == JupyterState.LOGGED_IN + self.state[user] = JupyterState.SPAWN_PENDING return CallbackResult( status=302, headers={"Location": f"/nb/hub/spawn-pending/{user}"}, @@ -78,9 +78,9 @@ def spawn(self, url: str, **kwargs: Any) -> CallbackResult: def finish_spawn(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) assert str(url).endswith(f"/hub/spawn-pending/{user}") - state = self.state.get(user, JupyterHubState.LOGGED_OUT) - assert state == JupyterHubState.SPAWN_PENDING - self.state[user] = JupyterHubState.LAB_RUNNING + state = self.state.get(user, JupyterState.LOGGED_OUT) + assert state == JupyterState.SPAWN_PENDING + self.state[user] = JupyterState.LAB_RUNNING return CallbackResult( status=307, headers={"Location": _url(f"user/{user}/lab")} ) @@ -88,22 +88,22 @@ def finish_spawn(self, url: str, **kwargs: Any) -> CallbackResult: def lab(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) assert str(url).endswith(f"/user/{user}/lab") - state = self.state.get(user, JupyterHubState.LOGGED_OUT) - assert state == JupyterHubState.LAB_RUNNING + state = self.state.get(user, JupyterState.LOGGED_OUT) + assert state == JupyterState.LAB_RUNNING return CallbackResult(status=200) def delete_lab(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) assert str(url).endswith(f"/users/{user}/server") - self.state[user] = JupyterHubState.LOGGED_OUT + self.state[user] = JupyterState.LOGGED_OUT return CallbackResult(status=202) def create_session(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) assert str(url).endswith(f"/user/{user}/api/sessions") assert user not in self.sessions - state = self.state.get(user, JupyterHubState.LOGGED_OUT) - assert state == JupyterHubState.LAB_RUNNING + state = self.state.get(user, JupyterState.LOGGED_OUT) + assert state == JupyterState.LAB_RUNNING assert kwargs["json"]["kernel"]["name"] == "LSST" assert kwargs["json"]["name"] == "(no notebook)" assert kwargs["json"]["type"] == "console" @@ -125,8 +125,8 @@ def delete_session(self, url: str, **kwargs: Any) -> CallbackResult: assert str(url).endswith( f"/user/{user}/api/sessions/{session.session_id}" ) - state = self.state.get(user, JupyterHubState.LOGGED_OUT) - assert state == JupyterHubState.LAB_RUNNING + state = self.state.get(user, JupyterState.LOGGED_OUT) + assert state == JupyterState.LAB_RUNNING del self.sessions[user] return CallbackResult(status=204) @@ -139,13 +139,13 @@ def _get_user(authorization: str) -> str: return user.decode() -def mock_jupyterhub(mocked: aioresponses) -> None: - """Set up a mock JupyterHub that always returns success. +def mock_jupyter(mocked: aioresponses) -> None: + """Set up a mock JupyterHub/Lab that always returns success. Currently only handles a lab spawn and then shutdown. Behavior will eventually be configurable. """ - mock = MockJupyterHub() + mock = MockJupyter() mocked.get(_url("hub/login"), callback=mock.login, repeat=True) mocked.get(_url("hub"), callback=mock.hub, repeat=True) mocked.get(_url("hub/spawn"), repeat=True) From 06cba4d09b8fade9084842579698360725d07b15 Mon Sep 17 00:00:00 2001 From: adam Date: Tue, 27 Jul 2021 09:08:32 -0700 Subject: [PATCH 4/9] fix merge conflicts --- src/monkeyflocker/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/monkeyflocker/client.py b/src/monkeyflocker/client.py index 72e8e343..5f6f005e 100644 --- a/src/monkeyflocker/client.py +++ b/src/monkeyflocker/client.py @@ -67,7 +67,7 @@ async def start(self, spec_file: Path) -> None: assert self._session, "Must be used as a context manager" with spec_file.open("r") as f: spec = yaml.safe_load(f) - self._logger.info("Starting flock %s", spec["name"]) + self._logger.info(f"Starting flock {spec['name']}") url = urljoin(self._base_url, "/mobu/flocks") await self._session.put(url, json=spec) self._logger.info("Flock %s started", spec["name"]) From a19386553af8e0c01bfc26bb7e2e6492fce29318 Mon Sep 17 00:00:00 2001 From: adam Date: Tue, 27 Jul 2021 09:30:26 -0700 Subject: [PATCH 5/9] update deps --- requirements/main.txt | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/requirements/main.txt b/requirements/main.txt index 17473102..2c016761 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -59,23 +59,23 @@ asgiref==3.4.1 \ --hash=sha256:4ef1ab46b484e3c706329cedeff284a5d40824200638503f5768edb6de7d58e9 \ --hash=sha256:ffc141aa908e6f175673e7b1b3b7af4fdb0ecb738fc5c8b88f69f055c2415214 # via uvicorn -astropy==4.3 \ - --hash=sha256:134c894381be801d7c2992b997d7f57ae7ae41a36955e883186df8bff5839c7b \ - --hash=sha256:2361972fc3bef5e37f9f8d3237634511428e93f17d42fbd13ee965689bd47350 \ - --hash=sha256:32c9752bec83da4905cdf55e6be586941be06acca671fbfda0bc1f2ef94aea79 \ - --hash=sha256:33501223e48224177eb72c7c746b000cf9a40473cb9bcbcb8c9ac3256c8a6f6d \ - --hash=sha256:6c1d1f0896e5d33851442865aa0ec4a2643dfaecb8853e9f4dca6d0541798744 \ - --hash=sha256:7f09d43326a3d8cb3098aad61fe6353497d6583abf5d1abe34076180c28b00a0 \ - --hash=sha256:819bfaec7a9d0f3968006d0528d9c3af5affd6d36644b7ff2b0c0b0435c19cfe \ - --hash=sha256:83a98369265a5f90acca884ef046e791e2a6d17d51871c7016ecb954c7b34adb \ - --hash=sha256:859aee27f0b4cee8ea9d8d35cb186cfba58597e618931e277054c483f58b5265 \ - --hash=sha256:8e72ec80c1846b87267616a7739ba7639acd32c59880e113994b771bbb274e1e \ - --hash=sha256:979ea0469eaebc55be444f47be73dbbe7cc1d60679bc066a860ce796840c2e98 \ - --hash=sha256:a9d0d202e5b35fbf602470175f0461306f7b0b373e59aaa242e220542ef572d2 \ - --hash=sha256:c4eeceb341d07f4ce53ca387a48f399f615a59fd8d67be711d886066c1e64f20 \ - --hash=sha256:cdf785c4479eec741df612ab05681c8eb0ed99d5e915e3376ff874cccba1e35e \ - --hash=sha256:fb2cf1e8dca429fdefabe9c963ad8e99d639b76551d4f213838862adaf213b7a \ - --hash=sha256:fcb02b46b5266d80086f66ca0042f69e34bc45546a8aca9ff62ca716d9b4c08e +astropy==4.3.post1 \ + --hash=sha256:1b9f976f32675ae715bd38fcfb3540fcf01f6be1d75266c39b6fb085adac9410 \ + --hash=sha256:1d879b91bbd61d69682c72b773d9c8851eff23e51cf1fb86b32dc80e783be643 \ + --hash=sha256:460e07cb72789976421689833556a0a364dba89e25fe9183e669702dbcbf72c2 \ + --hash=sha256:52c1038bad27e7ae1154eda0447ba790f13cb9bcb98e04c60493c2e547a87e6c \ + --hash=sha256:607739958f8fcc37d4f2efdb8b9bbcc3bf063c880e2d83104cf72a81c3e89b91 \ + --hash=sha256:6779cc4c840128d12a6721d6fc835c826646967b019daaeacc6e71ed79850973 \ + --hash=sha256:6ac307416a8718e4f4ed42f42c14587754fe6df38d822533c6d795db64a29453 \ + --hash=sha256:8062ece72d6fe6f4349219c9e2e0f213847de4f7f3bc3796ce7949bc1a202bf9 \ + --hash=sha256:8d70811b68f702d7b2e21fc0d644f88baf977482b1f915011460cc1b7fa285ea \ + --hash=sha256:a1b57aa21fdc8526dbca96b69cdcaba83066d7688ff52b87b7cdf5362e75f112 \ + --hash=sha256:a420917baf9e02252e3477da265675167a324716f8a09e78607577b5bd27a159 \ + --hash=sha256:be62ba888f0dd82a7265698a602e64ca2466f1f17bd6c4ef085a055c5968e458 \ + --hash=sha256:dc8acf92a20c9c916f25b17dbb0f618f91751941ec36f240452871e72fb608ca \ + --hash=sha256:e2670fc08868cd8d0d1e653dcfcf224b506235b30434742538702f74c6859d24 \ + --hash=sha256:e95ddb6fb0850fe28690e498fe8d2c3cee72d54c098083a2fe0e01b7b3895771 \ + --hash=sha256:ffe13ed14298e819f2be5224aa649c44f8ddc9fcf32b5d4296fc62624ae867cd # via pyvo async-timeout==3.0.1 \ --hash=sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f \ From bb75bba6c71141839363343dfc60ba6b7d798b87 Mon Sep 17 00:00:00 2001 From: adam Date: Tue, 27 Jul 2021 09:35:00 -0700 Subject: [PATCH 6/9] tweak formatting --- src/monkeyflocker/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/monkeyflocker/client.py b/src/monkeyflocker/client.py index 5f6f005e..05f9a486 100644 --- a/src/monkeyflocker/client.py +++ b/src/monkeyflocker/client.py @@ -70,7 +70,7 @@ async def start(self, spec_file: Path) -> None: self._logger.info(f"Starting flock {spec['name']}") url = urljoin(self._base_url, "/mobu/flocks") await self._session.put(url, json=spec) - self._logger.info("Flock %s started", spec["name"]) + self._logger.info(f"Flock {spec['name']} started") async def report(self, name: str, output: Path) -> None: """Generate status and output data for all monkeys.""" From 26b42c7809dd93c7b8147e3a7e2c1fa9d7b51b5f Mon Sep 17 00:00:00 2001 From: adam Date: Tue, 27 Jul 2021 11:06:30 -0700 Subject: [PATCH 7/9] make delays configurable --- src/mobu/business/jupyterloginloop.py | 5 ++--- src/mobu/business/jupyterpythonloop.py | 15 +++------------ src/mobu/business/notebookrunner.py | 1 + src/mobu/models/business.py | 24 +++++++++++++++++++++++- src/monkeyflocker/client.py | 4 ++-- tests/business/jupyterloginloop_test.py | 1 + tests/business/jupyterpythonloop_test.py | 1 + 7 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/mobu/business/jupyterloginloop.py b/src/mobu/business/jupyterloginloop.py index c53e439e..825ca0e7 100644 --- a/src/mobu/business/jupyterloginloop.py +++ b/src/mobu/business/jupyterloginloop.py @@ -77,8 +77,7 @@ async def lab_business(self) -> None: Placeholder function intended to be overridden by subclasses. """ - with self.timings.start("lab_wait"): - await asyncio.sleep(60) + await self.idle() async def idle(self) -> None: """Executed at the end of each iteration. @@ -87,7 +86,7 @@ async def idle(self) -> None: behavior. """ with self.timings.start("idle"): - await asyncio.sleep(60) + await asyncio.sleep(self.config.idle_time) async def stop(self) -> None: with self.timings.start("delete_lab_on_stop"): diff --git a/src/mobu/business/jupyterpythonloop.py b/src/mobu/business/jupyterpythonloop.py index 2e0d9cd6..6d4c92ae 100644 --- a/src/mobu/business/jupyterpythonloop.py +++ b/src/mobu/business/jupyterpythonloop.py @@ -4,25 +4,20 @@ over again. """ -import asyncio - from ..jupyterclient import JupyterLabSession from .jupyterloginloop import JupyterLoginLoop __all__ = ["JupyterPythonLoop"] -MAX_EXECUTIONS = 20 -SLEEP_TIME = 1 - class JupyterPythonLoop(JupyterLoginLoop): """Run simple Python code in a loop inside a lab kernel.""" async def lab_business(self) -> None: session = await self.create_session() - for count in range(MAX_EXECUTIONS): - await self.execute_code(session, "2+2") - await self.lab_wait() + for count in range(self.config.max_executions): + await self.execute_code(session, self.config.code) + await self.idle() await self.delete_session(session) async def create_session(self) -> JupyterLabSession: @@ -39,10 +34,6 @@ async def execute_code( sw.annotation["result"] = reply self.logger.info(f"{code} -> {reply}") - async def lab_wait(self) -> None: - with self.timings.start("lab_wait"): - await asyncio.sleep(SLEEP_TIME) - async def delete_session(self, session: JupyterLabSession) -> None: self.logger.info("delete_session") with self.timings.start("delete_session"): diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index d2339886..fa17ea7a 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -116,6 +116,7 @@ async def lab_business(self) -> None: await self.execute_code(session, self.running_code) self.running_code = None + await self.idle() await self.delete_session(session) self.logger.info(f"Success running notebook: {self.notebook.name}") diff --git a/src/mobu/models/business.py b/src/mobu/models/business.py index 691107b2..38045c59 100644 --- a/src/mobu/models/business.py +++ b/src/mobu/models/business.py @@ -39,6 +39,13 @@ class BusinessConfig(BaseModel): example=10, ) + code: str = Field( + "print(2+2)", + title="Python code to execute", + description="Only used by JupyterPythonLoop", + example="print(2+2)", + ) + repo_url: str = Field( NOTEBOOK_REPO_URL, title="Git URL of notebook repository to execute", @@ -52,12 +59,27 @@ class BusinessConfig(BaseModel): ) settle_time: int = Field( - 0, + 10, title="How long to wait after lab creation in seconds", description="Only used by the NotebookRunner", example=10, ) + idle_time: int = Field( + 20, + title="How long to wait at end of lab loop in seconds", + description="Used by JupyterLoginLoop, JupyterPythonLoop, " + " and NotebookRunner", + example=20, + ) + + max_executions: int = Field( + 25, + title="How many cells to execute in a given kernel session", + description="Only used by JupyterPythonLoop", + example=25, + ) + class BusinessData(BaseModel): """Status of a running business.""" diff --git a/src/monkeyflocker/client.py b/src/monkeyflocker/client.py index 05f9a486..2c3e0bbe 100644 --- a/src/monkeyflocker/client.py +++ b/src/monkeyflocker/client.py @@ -77,7 +77,7 @@ async def report(self, name: str, output: Path) -> None: assert self._session, "Must be used as a context manager" output.mkdir(parents=True, exist_ok=True) - self._logger.info("Getting status of monkeys in flock %s", name) + self._logger.info(f"Getting status of monkeys in flock {name}") flock_url = urljoin(self._base_url, f"/mobu/flocks/{name}") async with self._session.get(flock_url) as r: data = await r.json() @@ -85,7 +85,7 @@ async def report(self, name: str, output: Path) -> None: for monkey in monkeys: user = monkey["name"] - self._logger.info("Requesting log for %s", user) + self._logger.info(f"Requesting log for {user}") log_url = flock_url + f"/monkeys/{user}/log" async with self._session.get(log_url) as r: (output / f"{user}_log.txt").write_text(await r.text()) diff --git a/tests/business/jupyterloginloop_test.py b/tests/business/jupyterloginloop_test.py index 4695826a..c51e0838 100644 --- a/tests/business/jupyterloginloop_test.py +++ b/tests/business/jupyterloginloop_test.py @@ -28,6 +28,7 @@ async def test_run( "count": 1, "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, "scopes": ["exec:notebook"], + "options": {"idle_time": 2}, "business": "JupyterLoginLoop", }, ) diff --git a/tests/business/jupyterpythonloop_test.py b/tests/business/jupyterpythonloop_test.py index 48fd44f9..cfd39acb 100644 --- a/tests/business/jupyterpythonloop_test.py +++ b/tests/business/jupyterpythonloop_test.py @@ -28,6 +28,7 @@ async def test_run( "count": 1, "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, "scopes": ["exec:notebook"], + "options": {"idle_time": 2, "settle_time": 2, "max_executions": 2}, "business": "JupyterPythonLoop", }, ) From 1510794e094285401a949c40f83325f89bc1ac45 Mon Sep 17 00:00:00 2001 From: adam Date: Tue, 27 Jul 2021 13:24:29 -0700 Subject: [PATCH 8/9] Address review issues --- src/mobu/business/jupyterloginloop.py | 42 ++++++++++++++++++++---- src/mobu/business/jupyterpythonloop.py | 3 +- src/mobu/business/notebookrunner.py | 27 ++------------- src/mobu/jupyterclient.py | 16 ++++----- src/mobu/models/business.py | 20 +++++++++-- tests/business/jupyterpythonloop_test.py | 2 +- tests/conftest.py | 4 +-- tests/support/jupyter.py | 2 +- 8 files changed, 68 insertions(+), 48 deletions(-) diff --git a/src/mobu/business/jupyterloginloop.py b/src/mobu/business/jupyterloginloop.py index 825ca0e7..d10e1c35 100644 --- a/src/mobu/business/jupyterloginloop.py +++ b/src/mobu/business/jupyterloginloop.py @@ -7,6 +7,7 @@ from __future__ import annotations import asyncio +from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING from ..jupyterclient import JupyterClient @@ -37,11 +38,13 @@ def __init__( ) -> None: super().__init__(logger, business_config, user) self._client = JupyterClient(user, logger, business_config) + self._last_login = datetime.fromtimestamp(0, tz=timezone.utc) async def run(self) -> None: self.logger.info("Starting up...") await self.startup() while True: + await self.reauth_if_needed() self.logger.info("Starting next iteration") try: await self.ensure_lab() @@ -51,7 +54,7 @@ async def run(self) -> None: except Exception: self.failure_count += 1 raise - await self.idle() + await self.lab_idle() async def startup(self) -> None: """Run before the start of the first iteration and then not again.""" @@ -60,6 +63,7 @@ async def startup(self) -> None: async def hub_login(self) -> None: with self.timings.start("hub_login"): await self._client.hub_login() + self._last_login = self._now() async def ensure_lab(self) -> None: with self.timings.start("ensure_lab"): @@ -77,16 +81,42 @@ async def lab_business(self) -> None: Placeholder function intended to be overridden by subclasses. """ - await self.idle() + with self.timings.start("lab_wait"): + await asyncio.sleep(5) - async def idle(self) -> None: - """Executed at the end of each iteration. + async def lab_idle(self) -> None: + """Executed at the end of each iteration for a given lab. Intended to be overridden by subclasses if they want different idle behavior. """ - with self.timings.start("idle"): - await asyncio.sleep(self.config.idle_time) + delay = self.config.lab_idle_time + if delay > 0: + with self.timings.start("idle"): + await asyncio.sleep(delay) + + async def execution_idle(self) -> None: + """Executed between each unit of work execution (usually a Lab + cell). + """ + delay = self.config.execution_idle_time + if delay > 0: + with self.timings.start("execution_idle"): + await asyncio.sleep(self.config.execution_idle_time) + + def _now(self) -> datetime: + return datetime.now(timezone.utc) + + async def reauth_if_needed(self) -> None: + now = self._now() + elapsed = now - self._last_login + if elapsed > timedelta(self.config.reauth_interval): + await self.hub_reauth() + + async def hub_reauth(self) -> None: + self.logger.info("Reauthenticating to Hub") + with self.timings.start("hub_reauth"): + await self._client.hub_login() async def stop(self) -> None: with self.timings.start("delete_lab_on_stop"): diff --git a/src/mobu/business/jupyterpythonloop.py b/src/mobu/business/jupyterpythonloop.py index 6d4c92ae..749816da 100644 --- a/src/mobu/business/jupyterpythonloop.py +++ b/src/mobu/business/jupyterpythonloop.py @@ -14,10 +14,11 @@ class JupyterPythonLoop(JupyterLoginLoop): """Run simple Python code in a loop inside a lab kernel.""" async def lab_business(self) -> None: + await self.reauth_if_needed() session = await self.create_session() for count in range(self.config.max_executions): await self.execute_code(session, self.config.code) - await self.idle() + await self.execution_idle() await self.delete_session(session) async def create_session(self) -> JupyterLabSession: diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index fa17ea7a..2a13322c 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -9,7 +9,6 @@ import asyncio import json import os -from datetime import datetime, timedelta, timezone from pathlib import Path from tempfile import TemporaryDirectory from typing import TYPE_CHECKING @@ -44,7 +43,6 @@ def __init__( self.notebook: Optional[os.DirEntry] = None self.running_code: Optional[str] = None self._failed_notebooks: List[str] = [] - self._last_login = datetime.fromtimestamp(0, tz=timezone.utc) self._repo_dir = TemporaryDirectory() self._repo: Optional[git.Repo] = None self._notebook_iterator: Optional[Iterator[os.DirEntry]] = None @@ -80,7 +78,6 @@ async def startup(self) -> None: self._notebook_iterator = os.scandir(self._repo_dir.name) self.logger.info("Repository cloned and ready") await super().startup() - self._last_login = self._now() await self.initial_delete_lab() def clone_repo(self) -> None: @@ -108,15 +105,14 @@ async def lab_business(self) -> None: iteration = f"{count + 1}/{self.config.notebook_iterations}" msg = f"Notebook '{self.notebook.name}' iteration {iteration}" self.logger.info(msg) - - await self._reauth_if_needed() + await self.reauth_if_needed() for cell in cells: self.running_code = "".join(cell["source"]) await self.execute_code(session, self.running_code) + await self.execution_idle() self.running_code = None - await self.idle() await self.delete_session(session) self.logger.info(f"Success running notebook: {self.notebook.name}") @@ -132,9 +128,7 @@ def read_notebook(self, name: str, path: str) -> List[Dict[str, Any]]: async def create_session(self) -> JupyterLabSession: self.logger.info("create_session") - notebook_name = "" - if self.notebook: - notebook_name = self.notebook.name + notebook_name = self.notebook.name if self.notebook else None with self.timings.start("create_session"): session = await self._client.create_labsession( notebook_name=notebook_name, @@ -173,18 +167,3 @@ def _next_notebook(self) -> None: ) self._notebook_iterator = os.scandir(self._repo_dir.name) self._next_notebook() - - def _now(self) -> datetime: - return datetime.now(timezone.utc) - - async def _reauth_if_needed(self) -> None: - now = self._now() - elapsed = now - self._last_login - if elapsed > timedelta(minutes=45): - await self.hub_reauth() - self._last_login = now - - async def hub_reauth(self) -> None: - self.logger.info("Reauthenticating to Hub") - with self.timings.start("hub_reauth"): - await self._client.hub_login() diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index 5dc6100d..4908620a 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -42,9 +42,9 @@ class JupyterLabSession: """This holds the information a client needs to talk to the Lab in order to execute code.""" - session_id: str = "" - kernel_id: str = "" - websocket: Optional[ClientWebSocketResponse] = None + session_id: str + kernel_id: str + websocket: Optional[ClientWebSocketResponse] class JupyterClientSession: @@ -229,14 +229,12 @@ async def delete_lab(self) -> None: await self._raise_error("Error deleting lab", r) async def create_labsession( - self, kernel_name: str = "LSST", notebook_name: str = "" + self, kernel_name: str = "LSST", notebook_name: Optional[str] = None ) -> JupyterLabSession: session_url = ( self.jupyter_url + f"user/{self.user.username}/api/sessions" ) - session_type = "console" - if notebook_name != "": - session_type = "notebook" + session_type = "notebook" if notebook_name else "console" body = { "kernel": {"name": kernel_name}, "name": notebook_name or "(no notebook)", @@ -260,9 +258,7 @@ async def create_labsession( async def _websocket_connect( self, kernel_id: str - ) -> Optional[ClientWebSocketResponse]: - # It's Optional for easier mocking: we will mock out this call in - # the test suite rather than trying to mock the WebSocket itself. + ) -> ClientWebSocketResponse: channels_url = ( self.jupyter_url + f"user/{self.user.username}/api/kernels/" diff --git a/src/mobu/models/business.py b/src/mobu/models/business.py index 38045c59..fe867b1f 100644 --- a/src/mobu/models/business.py +++ b/src/mobu/models/business.py @@ -65,14 +65,28 @@ class BusinessConfig(BaseModel): example=10, ) - idle_time: int = Field( + lab_idle_time: int = Field( 20, title="How long to wait at end of lab loop in seconds", - description="Used by JupyterLoginLoop, JupyterPythonLoop, " - " and NotebookRunner", + description="Used by JupyterLoginLoop", example=20, ) + execution_idle_time: int = Field( + 0, + title="How long to wait between cell executions in seconds", + description="Used by JupyterPythonLoop and NotebookRunner", + example=1, + ) + + reauth_interval: int = Field( + 2700, + title="Time between reauthentication attempts in seconds", + description="Used by JupyterLoginLoop, JupyterPythonLoop, and" + " NotebookRunner", + example=2700, + ) + max_executions: int = Field( 25, title="How many cells to execute in a given kernel session", diff --git a/tests/business/jupyterpythonloop_test.py b/tests/business/jupyterpythonloop_test.py index cfd39acb..a924978a 100644 --- a/tests/business/jupyterpythonloop_test.py +++ b/tests/business/jupyterpythonloop_test.py @@ -28,7 +28,7 @@ async def test_run( "count": 1, "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, "scopes": ["exec:notebook"], - "options": {"idle_time": 2, "settle_time": 2, "max_executions": 2}, + "options": {"max_executions": 3}, "business": "JupyterPythonLoop", }, ) diff --git a/tests/conftest.py b/tests/conftest.py index 2961785d..76e2c161 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from aioresponses import aioresponses @@ -70,7 +70,7 @@ def jupyter(mock_aioresponses: aioresponses) -> Iterator[None]: # Same problem, but up a layer now that we're using sessions and # reusing the websocket. with patch.object(JupyterClient, "_websocket_connect") as mock2: - mock2.return_value = None + mock2.return_value = MagicMock() yield diff --git a/tests/support/jupyter.py b/tests/support/jupyter.py index 7f132cf1..e29df981 100644 --- a/tests/support/jupyter.py +++ b/tests/support/jupyter.py @@ -44,7 +44,7 @@ class MockJupyter: """ def __init__(self) -> None: - self.sessions: Dict[str, Any] = {} + self.sessions: Dict[str, JupyterLabSession] = {} self.state: Dict[str, JupyterState] = {} def login(self, url: str, **kwargs: Any) -> CallbackResult: From a128f4eee890cc6c454ab3f1de6b1e0ce5b97427 Mon Sep 17 00:00:00 2001 From: adam Date: Tue, 27 Jul 2021 14:06:26 -0700 Subject: [PATCH 9/9] fix test cases post-review-cleanup --- src/mobu/monkey.py | 11 +++++++++-- tests/business/jupyterpythonloop_test.py | 2 +- tests/conftest.py | 4 ++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/mobu/monkey.py b/src/mobu/monkey.py index 5ff750be..609a1bfc 100644 --- a/src/mobu/monkey.py +++ b/src/mobu/monkey.py @@ -11,6 +11,7 @@ import structlog from aiohttp import ClientSession +from aiohttp.client_exceptions import ClientConnectorError from aiojobs import Scheduler from aiojobs._job import Job @@ -108,7 +109,13 @@ async def _runner(self) -> None: self.state = MonkeyState.STOPPING self.log.info("Shutting down") run = False - await self.business.stop() + try: + await self.business.stop() + except ClientConnectorError: + # Ripping down async sessions can cause various parts of + # a communication in flight to fail. Just swallow it, + # since we're shutting down anyway. + pass self.state = MonkeyState.FINISHED except Exception as e: self.state = MonkeyState.ERROR @@ -129,7 +136,7 @@ async def stop(self) -> None: if self._job: try: await self._job.close(timeout=0) - except asyncio.TimeoutError: + except (asyncio.TimeoutError, asyncio.exceptions.CancelledError): # Close will normally wait for a timeout to occur before # throwing a timeout exception, but we'll just shut it down # right away and eat the exception. diff --git a/tests/business/jupyterpythonloop_test.py b/tests/business/jupyterpythonloop_test.py index a924978a..90c5bf7b 100644 --- a/tests/business/jupyterpythonloop_test.py +++ b/tests/business/jupyterpythonloop_test.py @@ -41,7 +41,7 @@ async def test_run( "business": { "failure_count": 0, "name": "JupyterPythonLoop", - "success_count": 0, + "success_count": ANY, "timings": ANY, }, "restart": False, diff --git a/tests/conftest.py b/tests/conftest.py index 76e2c161..33bf9043 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, patch import pytest from aioresponses import aioresponses @@ -70,7 +70,7 @@ def jupyter(mock_aioresponses: aioresponses) -> Iterator[None]: # Same problem, but up a layer now that we're using sessions and # reusing the websocket. with patch.object(JupyterClient, "_websocket_connect") as mock2: - mock2.return_value = MagicMock() + mock2.return_value = AsyncMock() yield