Skip to content

Commit

Permalink
Notebook runner refreshes by recloning and reexecuting
Browse files Browse the repository at this point in the history
  • Loading branch information
fajpunk committed May 29, 2024
1 parent 684bf93 commit e7a8ccc
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 6 deletions.
24 changes: 21 additions & 3 deletions src/mobu/services/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,33 @@ def annotations(self, cell_id: str | None = None) -> dict[str, str]:
return result

async def startup(self) -> None:
await self.initialize()
await super().startup()

async def cleanup(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None

async def initialize(self) -> None:
if self._repo_dir is None:
self._repo_dir = Path(TemporaryDirectory().name)
await self.clone_repo()

self._exclude_paths = {
(self._repo_dir / path) for path in self.options.exclude_dirs
}
self._notebook_paths = self.find_notebooks()
self.logger.info("Repository cloned and ready")
await super().startup()

async def shutdown(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None
await self.cleanup()
await super().shutdown()

async def refresh(self) -> None:
await self.cleanup()
await self.initialize()
self.refreshing = False

async def clone_repo(self) -> None:
url = self.options.repo_url
branch = self.options.repo_branch
Expand Down Expand Up @@ -158,6 +170,12 @@ async def execute_code(self, session: JupyterLabSession) -> None:
self.logger.info(msg)

for cell in self.read_notebook(self._notebook):
if self.refreshing:
await self.refresh()
self.logger.info(
"Recloning notebooks and forcing new execution"
)
return
code = "".join(cell["source"])
if "id" in cell:
cell_id = f'`{cell["id"]}` (#{cell["_index"]})'
Expand Down
84 changes: 83 additions & 1 deletion tests/business/notebookrunner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from mobu.storage.git import Git

from ..support.gafaelfawr import mock_gafaelfawr
from ..support.util import wait_for_business
from ..support.jupyter import MockJupyter
from ..support.util import wait_for_business, wait_for_log_message


async def setup_git_repo(repo_path: Path) -> None:
Expand Down Expand Up @@ -207,6 +208,87 @@ async def test_run_recursive(
assert "Done with this cycle of notebooks" in r.text


@pytest.mark.asyncio
async def test_refresh(
client: AsyncClient,
jupyter: MockJupyter,
respx_mock: respx.Router,
tmp_path: Path,
) -> None:
mock_gafaelfawr(respx_mock)
cwd = Path.cwd()

# Set up a notebook repository.
source_path = Path(__file__).parent.parent / "notebooks"
repo_path = tmp_path / "notebooks"

shutil.copytree(str(source_path), str(repo_path))

# Set up git repo
await setup_git_repo(repo_path)

# Start a monkey. We have to do this in a try/finally block since the
# runner will change working directories, which because working
# directories are process-global may mess up future tests.
try:
r = await client.put(
"/mobu/flocks",
json={
"name": "test",
"count": 1,
"user_spec": {"username_prefix": "testuser"},
"scopes": ["exec:notebook"],
"business": {
"type": "NotebookRunner",
"options": {
"spawn_settle_time": 0,
"execution_idle_time": 1,
"idle_time": 1,
"max_executions": 1000,
"repo_url": str(repo_path),
"repo_branch": "main",
"working_directory": str(repo_path),
},
},
},
)
assert r.status_code == 201

# We should see a message from the notebook execution in the logs.
assert await wait_for_log_message(
client, "testuser1", msg="This is a test"
)

# Change the notebook and git commit it
notebook = repo_path / "test-notebook.ipynb"
contents = notebook.read_text()
new_contents = contents.replace("This is a test", "This is a NEW test")
notebook.write_text(new_contents)

git = Git(repo=repo_path)
await git.add(str(notebook))
await git.commit("-m", "Updating notebook")

jupyter.expected_session_name = "test-notebook.ipynb"
jupyter.expected_session_type = "notebook"

# Refresh the notebook
r = await client.put("/mobu/flocks/test")
assert r.status_code == 202

# The refresh should have forced a new execution
assert await wait_for_log_message(
client, "testuser1", msg="Deleting lab"
)

# We should see a message from the updated notebook.
assert await wait_for_log_message(
client, "testuser1", msg="This is a NEW test"
)
finally:
os.chdir(cwd)


@pytest.mark.asyncio
async def test_exclude_dirs(
client: AsyncClient, respx_mock: respx.Router, tmp_path: Path
Expand Down
6 changes: 4 additions & 2 deletions tests/support/jupyter.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def __init__(self) -> None:
self.spawn_timeout = False
self.redirect_loop = False
self.lab_form: dict[str, dict[str, str]] = {}
self.expected_session_name = "(no notebook)"
self.expected_session_type = "console"
self._delete_at: dict[str, datetime | None] = {}
self._fail: dict[str, dict[JupyterAction, bool]] = {}
self._hub_xsrf = os.urandom(8).hex()
Expand Down Expand Up @@ -278,8 +280,8 @@ def create_session(self, request: Request) -> Response:
assert state == JupyterState.LAB_RUNNING
body = json.loads(request.content.decode())
assert body["kernel"]["name"] == "LSST"
assert body["name"] == "(no notebook)"
assert body["type"] == "console"
assert body["name"] == self.expected_session_name
assert body["type"] == self.expected_session_type
session = JupyterLabSession(
session_id=uuid4().hex, kernel_id=uuid4().hex
)
Expand Down
13 changes: 13 additions & 0 deletions tests/support/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ async def wait_for_business(
return data


async def wait_for_log_message(
client: AsyncClient, username: str, *, flock: str = "test", msg: str
) -> bool:
"""Wait for one loop of business to complete and return its data."""
for _ in range(1, 10):
await asyncio.sleep(0.5)
r = await client.get(f"/mobu/flocks/{flock}/monkeys/{username}/log")
assert r.status_code == 200
if msg in r.text:
return True
return False


async def wait_for_flock_start(client: AsyncClient, flock: str) -> None:
"""Wait for all the monkeys in a flock to have started."""
for _ in range(1, 10):
Expand Down

0 comments on commit e7a8ccc

Please sign in to comment.