Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44461: Refresh notebooks #349

Merged
merged 6 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: check-toml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.4
rev: v0.4.6
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
6 changes: 6 additions & 0 deletions changelog.d/20240530_151558_danfuchs_notebook_refresh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### New features

- `NotebookRunner` flocks can now pick up changes to their notebooks without having to restart the whole mobu process. This refresh can happen via:
- GitHub `push` webhook post to `/mobu/github/webhook` with changes to a repo and branch that matches the flock config
- `monkeyflocker refresh <flock>`
- `POST` to `/mobu/flocks/{flock}/refresh`
176 changes: 86 additions & 90 deletions requirements/dev.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ uvicorn[standard]
# Other dependencies.
aiojobs
click!=8.1.4,!=8.1.5 # see https://github.com/pallets/click/issues/2558
gidgethub
httpx
httpx-sse
jinja2
Expand Down
364 changes: 183 additions & 181 deletions requirements/main.txt

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions src/mobu/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class Configuration(BaseSettings):
examples=["gt-vilSCi1ifK_MyuaQgMD2dQ.d6SIJhowv5Hs3GvujOyUig"],
)

github_webhook_secret: str | None = Field(
None,
title="Github webhook secret",
description=(
"Any repo that wants mobu to automatically respawn labs when"
" notebooks change must use this secret in its webhook"
" configuration in GitHub."
),
validation_alias="MOBU_GITHUB_WEBHOOK_SECRET",
)

name: str = Field(
"mobu",
title="Name of application",
Expand Down
4 changes: 4 additions & 0 deletions src/mobu/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
from datetime import timedelta

__all__ = [
"GITHUB_WEBHOOK_WAIT_SECONDS",
"NOTEBOOK_REPO_URL",
"NOTEBOOK_REPO_BRANCH",
"TOKEN_LIFETIME",
"USERNAME_REGEX",
"WEBSOCKET_OPEN_TIMEOUT",
]

GITHUB_WEBHOOK_WAIT_SECONDS = 1
"""GithHub needs some time to actually be in the state in a webhook payload."""

NOTEBOOK_REPO_URL = "https://github.com/lsst-sqre/notebook-demo.git"
"""Default notebook repository for NotebookRunner."""

Expand Down
23 changes: 22 additions & 1 deletion src/mobu/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"""

from dataclasses import dataclass
from typing import Annotated
from typing import Annotated, Any

from fastapi import Depends, Request
from safir.dependencies.gafaelfawr import auth_logger_dependency
from safir.dependencies.http_client import http_client_dependency
from safir.dependencies.logger import logger_dependency
from structlog.stdlib import BoundLogger

from ..factory import Factory, ProcessContext
Expand All @@ -21,6 +22,7 @@
"ContextDependency",
"RequestContext",
"context_dependency",
"anonymous_context_dependency",
]


Expand All @@ -40,6 +42,17 @@ class RequestContext:
factory: Factory
"""Component factory."""

def rebind_logger(self, **values: Any) -> None:
"""Add the given values to the logging context.

Parameters
----------
**values
Additional values that should be added to the logging context.
"""
self.logger = self.logger.bind(**values)
self.factory.set_logger(self.logger)


class ContextDependency:
"""Provide a per-request context as a FastAPI dependency.
Expand Down Expand Up @@ -90,3 +103,11 @@ async def aclose(self) -> None:

context_dependency = ContextDependency()
"""The dependency that will return the per-request context."""


async def anonymous_context_dependency(
request: Request,
logger: Annotated[BoundLogger, Depends(logger_dependency)],
) -> RequestContext:
"""Per-request context for non-gafaelfawr-auth'd requests."""
return await context_dependency(request=request, logger=logger)
13 changes: 13 additions & 0 deletions src/mobu/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,16 @@ def create_solitary(self, solitary_config: SolitaryConfig) -> Solitary:
http_client=self._context.http_client,
logger=self._logger,
)

def set_logger(self, logger: BoundLogger) -> None:
"""Replace the internal logger.

Used by the context dependency to update the logger for all
newly-created components when it's rebound with additional context.

Parameters
----------
logger
New logger.
"""
self._logger = logger
14 changes: 14 additions & 0 deletions src/mobu/handlers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ async def get_flock(
return context.manager.get_flock(flock).dump()


@external_router.post(
"/flocks/{flock}/refresh",
responses={404: {"description": "Flock not found", "model": ErrorModel}},
status_code=202,
summary="Signal a flock to refresh",
)
async def refresh_flock(
flock: str,
context: Annotated[RequestContext, Depends(context_dependency)],
) -> None:
context.logger.info("Signaling flock to refresh", flock=flock)
context.manager.refresh_flock(flock)


@external_router.delete(
"/flocks/{flock}",
responses={404: {"description": "Flock not found", "model": ErrorModel}},
Expand Down
45 changes: 45 additions & 0 deletions src/mobu/handlers/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Handlers for requests from GitHub, ``/mobu/github``."""

import asyncio
from typing import Annotated

from fastapi import APIRouter, Depends
from gidgethub.sansio import Event
from safir.slack.webhook import SlackRouteErrorHandler

from ..config import config
from ..constants import GITHUB_WEBHOOK_WAIT_SECONDS
from ..dependencies.context import RequestContext, anonymous_context_dependency
from .github_webhooks import webhook_router

github_router = APIRouter(route_class=SlackRouteErrorHandler)


@github_router.post(
"/webhook",
summary="GitHub webhooks",
description="This endpoint receives webhook events from GitHub.",
status_code=202,
)
async def post_github_webhook(
context: Annotated[RequestContext, Depends(anonymous_context_dependency)],
) -> None:
"""Process GitHub webhook events.

This should be exposed via a Gafaelfawr anonymous ingress.
"""
webhook_secret = config.github_webhook_secret
body = await context.request.body()
event = Event.from_http(
context.request.headers, body, secret=webhook_secret
)

# Bind the X-GitHub-Delivery header to the logger context; this
# identifies the webhook request in GitHub's API and UI for
# diagnostics
context.rebind_logger(github_delivery=event.delivery_id)

context.logger.debug("Received GitHub webhook", payload=event.data)
# Give GitHub some time to reach internal consistency.
await asyncio.sleep(GITHUB_WEBHOOK_WAIT_SECONDS)
await webhook_router.dispatch(event=event, context=context)
39 changes: 39 additions & 0 deletions src/mobu/handlers/github_webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Github webhook handlers."""

from gidgethub import routing
from gidgethub.sansio import Event

from ..dependencies.context import RequestContext

__all__ = ["webhook_router"]

webhook_router = routing.Router()


@webhook_router.register("push")
async def handle_push(event: Event, context: RequestContext) -> None:
"""Handle a push event."""
ref = event.data["ref"]
url = event.data["repository"]["clone_url"]
fajpunk marked this conversation as resolved.
Show resolved Hide resolved
context.rebind_logger(ref=ref, url=url)

prefix, branch = ref.rsplit("/", 1)
if prefix != "refs/heads":
context.logger.debug(
"github webhook ignored: ref is not a branch",
)
return

flocks = context.manager.list_flocks_for_repo(
repo_url=url, repo_branch=branch
)
if not flocks:
context.logger.debug(
"github webhook ignored: no flocks match repo and branch",
)
return

for flock in flocks:
context.manager.refresh_flock(flock)

context.logger.info("github webhook handled")
2 changes: 2 additions & 0 deletions src/mobu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .config import config
from .dependencies.context import context_dependency
from .handlers.external import external_router
from .handlers.github import github_router
from .handlers.internal import internal_router
from .status import post_status

Expand Down Expand Up @@ -69,6 +70,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Attach the routers.
app.include_router(internal_router)
app.include_router(external_router, prefix=config.path_prefix)
app.include_router(github_router, prefix=f"{config.path_prefix}/github")

# Add middleware.
app.add_middleware(XForwardedMiddleware)
Expand Down
4 changes: 4 additions & 0 deletions src/mobu/models/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class BusinessData(BaseModel):

success_count: int = Field(..., title="Number of successes", examples=[25])

refreshing: bool = Field(
..., title="If the business is currently in the process of refreshing"
)

timings: list[StopwatchData] = Field(..., title="Timings of events")

model_config = ConfigDict(extra="forbid")
5 changes: 5 additions & 0 deletions src/mobu/services/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(
self.timings = Timings()
self.control: Queue[BusinessCommand] = Queue()
self.stopping = False
self.refreshing = False

# Methods that should be overridden by child classes if needed.

Expand Down Expand Up @@ -204,6 +205,9 @@ async def stop(self) -> None:
await self.control.join()
self.logger.info("Stopped")

def signal_refresh(self) -> None:
self.refreshing = True

# Utility functions that can be used by child classes.

async def pause(self, interval: timedelta) -> bool:
Expand Down Expand Up @@ -299,6 +303,7 @@ def dump(self) -> BusinessData:
name=type(self).__name__,
failure_count=self.failure_count,
success_count=self.success_count,
refreshing=self.refreshing,
timings=self.timings.dump(),
)

Expand Down
23 changes: 20 additions & 3 deletions src/mobu/services/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,34 @@ def annotations(self, cell_id: str | None = None) -> dict[str, str]:
return result

async def startup(self) -> None:
await self.initialize()
await super().startup()

async def cleanup(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None

async def initialize(self) -> None:
if self._repo_dir is None:
self._repo_dir = Path(TemporaryDirectory().name)
await self.clone_repo()

self._exclude_paths = {
(self._repo_dir / path) for path in self.options.exclude_dirs
}
self._notebook_paths = self.find_notebooks()
self.logger.info("Repository cloned and ready")
await super().startup()

async def shutdown(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None
await self.cleanup()
await super().shutdown()

async def refresh(self) -> None:
self.logger.info("Recloning notebooks and forcing new execution")
await self.cleanup()
await self.initialize()
self.refreshing = False

async def clone_repo(self) -> None:
url = self.options.repo_url
branch = self.options.repo_branch
Expand Down Expand Up @@ -151,6 +164,10 @@ async def open_session(

async def execute_code(self, session: JupyterLabSession) -> None:
for count in range(self.options.max_executions):
if self.refreshing:
await self.refresh()
return

self._notebook = self.next_notebook()

iteration = f"{count + 1}/{self.options.max_executions}"
Expand Down
23 changes: 23 additions & 0 deletions src/mobu/services/flock.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from structlog.stdlib import BoundLogger

from ..exceptions import MonkeyNotFoundError
from ..models.business.notebookrunner import (
NotebookRunnerConfig,
NotebookRunnerOptions,
)
from ..models.flock import FlockConfig, FlockData, FlockSummary
from ..models.user import AuthenticatedUser, User, UserSpec
from ..storage.gafaelfawr import GafaelfawrStorage
Expand Down Expand Up @@ -130,6 +134,25 @@ async def stop(self) -> None:
awaits = [m.stop() for m in self._monkeys.values()]
await asyncio.gather(*awaits)

def signal_refresh(self) -> None:
"""Signal all the monkeys to refresh their busniess."""
self._logger.info("Signaling monkeys to refresh")
for monkey in self._monkeys.values():
monkey.signal_refresh()

def uses_repo(self, repo_url: str, repo_branch: str) -> bool:
match self._config:
case FlockConfig(
business=NotebookRunnerConfig(
options=NotebookRunnerOptions(
repo_url=url,
repo_branch=branch,
)
)
) if (url, branch) == (repo_url, repo_branch):
fajpunk marked this conversation as resolved.
Show resolved Hide resolved
return True
return False

def _create_monkey(self, user: AuthenticatedUser) -> Monkey:
"""Create a monkey that will run as a given user."""
return Monkey(
Expand Down
Loading