Skip to content

Commit

Permalink
switch to jupyter_events (#862)
Browse files Browse the repository at this point in the history
* switch to jupyter_events

* update to jupyter_events 0.2.0

* remove unused import

* fix payload in events unit tests

* update to use latest jupyter_events

* add unit test for extensions

* get tests working
  • Loading branch information
Zsailer authored Aug 29, 2022
1 parent 0028483 commit 63bb2a9
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 109 deletions.
5 changes: 3 additions & 2 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import prometheus_client
from jinja2 import TemplateNotFound
from jupyter_core.paths import is_hidden
from jupyter_events import EventLogger
from tornado import web
from tornado.log import app_log
from traitlets.config import Application
Expand Down Expand Up @@ -335,8 +336,8 @@ def config_manager(self):
return self.settings["config_manager"]

@property
def event_bus(self):
return self.settings["event_bus"]
def event_logger(self) -> EventLogger:
return self.settings["event_logger"]

# ---------------------------------------------------------------
# CORS
Expand Down
28 changes: 13 additions & 15 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from jupyter_client.session import Session
from jupyter_core.application import JupyterApp, base_aliases, base_flags
from jupyter_core.paths import jupyter_runtime_dir
from jupyter_events.logger import EventLogger
from nbformat.sign import NotebookNotary
from traitlets import (
Any,
Expand Down Expand Up @@ -123,7 +124,6 @@
AsyncContentsManager,
ContentsManager,
)
from jupyter_server.services.events.bus import EventBus
from jupyter_server.services.kernels.kernelmanager import (
AsyncMappingKernelManager,
MappingKernelManager,
Expand Down Expand Up @@ -212,7 +212,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
event_logger,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -248,7 +248,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
event_logger,
extra_services,
log,
base_url,
Expand All @@ -270,7 +270,7 @@ def init_settings(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
event_logger,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -359,7 +359,7 @@ def init_settings(
config_manager=config_manager,
authorizer=authorizer,
identity_provider=identity_provider,
event_bus=event_bus,
event_logger=event_logger,
# handlers
extra_services=extra_services,
# Jupyter stuff
Expand Down Expand Up @@ -770,7 +770,7 @@ class ServerApp(JupyterApp):
GatewaySessionManager,
GatewayClient,
Authorizer,
EventBus,
EventLogger,
]

subcommands = dict(
Expand Down Expand Up @@ -1552,10 +1552,10 @@ def _default_kernel_spec_manager_class(self):
),
)

event_bus = Instance(
EventBus,
event_logger = Instance(
EventLogger,
allow_none=True,
help="An EventBus for emitting structured event data from Jupyter Server and extensions.",
help="An EventLogger for emitting structured event data from Jupyter Server and extensions.",
)

info_file = Unicode()
Expand Down Expand Up @@ -1948,9 +1948,9 @@ def init_logging(self):
logger.parent = self.log
logger.setLevel(self.log.level)

def init_eventbus(self):
def init_event_logger(self):
"""Initialize the Event Bus."""
self.event_bus = EventBus.instance(parent=self)
self.event_logger = EventLogger(parent=self)

def init_webapp(self):
"""initialize tornado webapp"""
Expand Down Expand Up @@ -2012,7 +2012,7 @@ def init_webapp(self):
self.session_manager,
self.kernel_spec_manager,
self.config_manager,
self.event_bus,
self.event_logger,
self.extra_services,
self.log,
self.base_url,
Expand Down Expand Up @@ -2487,7 +2487,7 @@ def initialize(
if find_extensions:
self.find_server_extensions()
self.init_logging()
self.init_eventbus()
self.init_event_logger()
self.init_server_extensions()

# Special case the starter extension and load
Expand Down Expand Up @@ -2814,8 +2814,6 @@ async def _cleanup(self):
await self.cleanup_kernels()
if getattr(self, "session_manager", None):
self.session_manager.close()
if getattr(self, "event_bus", None):
self.event_bus.clear_instance()

def start_ioloop(self):
"""Start the IO Loop."""
Expand Down
15 changes: 0 additions & 15 deletions jupyter_server/services/events/bus.py

This file was deleted.

41 changes: 12 additions & 29 deletions jupyter_server/services/events/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
.. versionadded:: 2.0
"""
import logging
import json
from datetime import datetime
from typing import Any, Dict, Optional

from jupyter_telemetry.eventlog import _skip_message
from pythonjsonlogger import jsonlogger
from jupyter_events import EventLogger
from tornado import web, websocket

from jupyter_server.auth import authorized
Expand All @@ -18,18 +17,6 @@
AUTH_RESOURCE = "events"


class WebSocketLoggingHandler(logging.Handler):
"""Python logging handler that routes records to a Tornado websocket."""

def __init__(self, websocket, *args, **kwargs):
super().__init__(*args, **kwargs)
self.websocket = websocket

def emit(self, record):
"""Emit the message across the websocket"""
self.websocket.write_message(record.msg)


class SubscribeWebsocket(
JupyterHandler,
websocket.WebSocketHandler,
Expand Down Expand Up @@ -58,26 +45,23 @@ async def get(self, *args, **kwargs):
res = super().get(*args, **kwargs)
await res

async def event_listener(self, logger: EventLogger, schema_id: str, data: dict) -> None:
capsule = dict(schema_id=schema_id, **data)
self.write_message(json.dumps(capsule))

def open(self):
"""Routes events that are emitted by Jupyter Server's
EventBus to a WebSocket client in the browser.
"""
self.logging_handler = WebSocketLoggingHandler(self)
# Add a JSON formatter to the handler.
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
self.logging_handler.setFormatter(formatter)
# To do: add an eventlog.add_handler method to jupyter_telemetry.
self.event_bus.log.addHandler(self.logging_handler)
self.event_bus.handlers.append(self.logging_handler)
self.event_logger.add_listener(listener=self.event_listener)

def on_close(self):
self.event_bus.log.removeHandler(self.logging_handler)
self.event_bus.handlers.remove(self.logging_handler)
self.event_logger.remove_listener(listener=self.event_listener)


def validate_model(data: Dict[str, Any]) -> None:
"""Validates for required fields in the JSON request body"""
required_keys = {"schema_name", "version", "event"}
required_keys = {"schema_id", "version", "data"}
for key in required_keys:
if key not in data:
raise web.HTTPError(400, f"Missing `{key}` in the JSON request body.")
Expand Down Expand Up @@ -115,10 +99,9 @@ async def post(self):

try:
validate_model(payload)
self.event_bus.record_event(
schema_name=payload.get("schema_name"),
version=payload.get("version"),
event=payload.get("event"),
self.event_logger.emit(
schema_id=payload.get("schema_id"),
data=payload.get("data"),
timestamp_override=get_timestamp(payload),
)
self.set_status(204)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies = [
"tornado>=6.1.0",
"traitlets>=5.1",
"websocket-client",
"jupyter_telemetry"
"jupyter_events>=0.4.0"
]

[project.urls]
Expand Down
28 changes: 26 additions & 2 deletions tests/extension/mockextensions/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from jupyter_events import EventLogger
from jupyter_events.schema_registry import SchemaRegistryException
from traitlets import List, Unicode

from jupyter_server.base.handlers import JupyterHandler
Expand All @@ -11,16 +13,28 @@

STATIC_PATH = os.path.join(os.path.dirname(__file__), "static")

# Function that makes these extensions discoverable
# by the test functions.
EVENT_SCHEMA = """\
$id: https://events.jupyter.org/mockapp/v1/test
version: 1
properties:
msg:
type: string
required:
- msg
"""


# Function that makes these extensions discoverable
# by the test functions.
def _jupyter_server_extension_points():
return [{"module": __name__, "app": MockExtensionApp}]


class MockExtensionHandler(ExtensionHandlerMixin, JupyterHandler):
def get(self):
self.event_logger.emit(
schema_id="https://events.jupyter.org/mockapp/v1/test", data={"msg": "Hello, world!"}
)
self.finish(self.config.mock_trait)


Expand All @@ -45,6 +59,16 @@ class MockExtensionApp(ExtensionAppJinjaMixin, ExtensionApp):
def get_extension_package():
return "tests.extension.mockextensions"

def initialize_settings(self):
# Only add this event if it hasn't already been added.
# Log the error if it fails, but don't crash the app.
try:
elogger: EventLogger = self.serverapp.event_logger
elogger.register_event_schema(EVENT_SCHEMA)
except SchemaRegistryException as err:
self.log.error(err)
pass

def initialize_handlers(self):
self.handlers.append(("/mock", MockExtensionHandler))
self.handlers.append(("/mock_template", MockExtensionTemplateHandler))
Expand Down
18 changes: 18 additions & 0 deletions tests/extension/test_app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import json
from io import StringIO
from logging import StreamHandler
from typing import Any

import pytest
Expand Down Expand Up @@ -171,3 +174,18 @@ async def _stop(*args):

# check the shutdown method was called twice
assert calls == 2


async def test_events(jp_serverapp, jp_fetch):
stream = StringIO()
handler = StreamHandler(stream)
jp_serverapp.event_logger.register_handler(handler)

await jp_fetch("mock")

handler.flush()
output = json.loads(stream.getvalue())
# Clear the sink.
stream.truncate(0)
stream.seek(0)
assert output["msg"] == "Hello, world!"
2 changes: 2 additions & 0 deletions tests/services/events/mock_event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type: object
properties:
event_message:
title: Event Messages
categories:
- unrestricted
description: |
Mock event message to read.
required:
Expand Down
9 changes: 4 additions & 5 deletions tests/services/events/mockextension/mock_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
class MockEventHandler(JupyterHandler):
def get(self):
# Emit an event.
self.event_bus.record_event(
schema_name="event.mockextension.jupyter.org/message",
version=1,
event={"event_message": "Hello world, from mock extension!"},
self.event_logger.emit(
schema_id="event.mockextension.jupyter.org/message",
data={"event_message": "Hello world, from mock extension!"},
)


def _load_jupyter_server_extension(serverapp):
# Register a schema with the EventBus
schema_file = pathlib.Path(__file__).parent / "mock_extension_event.yaml"
serverapp.event_bus.register_schema_file(schema_file)
serverapp.event_logger.register_event_schema(schema_file)
serverapp.web_app.add_handlers(
".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)]
)
2 changes: 2 additions & 0 deletions tests/services/events/mockextension/mock_extension_event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type: object
properties:
event_message:
title: Event Message
categories:
- unrestricted
description: |
Mock event message to read.
required:
Expand Down
Loading

0 comments on commit 63bb2a9

Please sign in to comment.