Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement file ID service #244

Merged
merged 6 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
194 changes: 194 additions & 0 deletions plugins/contents/fps_contents/fileid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import asyncio
from typing import Dict, List, Optional
from uuid import uuid4

import aiosqlite
from anyio import Path
from fps.logging import get_configured_logger # type: ignore
from watchfiles import Change, awatch

watchfiles_logger = get_configured_logger("watchfiles.main", "warning")
logger = get_configured_logger("contents")


class Watcher:
def __init__(self, path: str) -> None:
self.path = path
self._event = asyncio.Event()

def __aiter__(self):
return self

async def __anext__(self):
await self._event.wait()
self._event.clear()
return self._change

def notify(self, change):
self._change = change
self._event.set()


class Singleton(type):
_instances: Dict = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


class FileIdManager(metaclass=Singleton):

db_path: str
initialized: asyncio.Event
watchers: Dict[str, List[Watcher]]

def __init__(self, db_path: str = "fileid.db"):
self.db_path = db_path
self.initialized = asyncio.Event()
self.watchers = {}
asyncio.create_task(self.watch_files())

async def get_id(self, path: str) -> Optional[str]:
await self.initialized.wait()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute("SELECT id FROM fileids WHERE path = ?", (path,)) as cursor:
async for idx, in cursor:
return idx
return None

async def get_path(self, idx: str) -> Optional[str]:
await self.initialized.wait()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute("SELECT path FROM fileids WHERE id = ?", (idx,)) as cursor:
async for path, in cursor:
return path
return None

async def index(self, path: str) -> Optional[str]:
await self.initialized.wait()
async with aiosqlite.connect(self.db_path) as db:
apath = Path(path)
if not await apath.exists():
return None

idx = uuid4().hex
mtime = (await apath.stat()).st_mtime
await db.execute("INSERT INTO fileids VALUES (?, ?, ?)", (idx, path, mtime))
await db.commit()
return idx

async def watch_files(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute("DROP TABLE IF EXISTS fileids")
await db.execute(
"CREATE TABLE fileids "
"(id TEXT PRIMARY KEY, path TEXT NOT NULL UNIQUE, mtime REAL NOT NULL)"
)
await db.commit()

# index files
async with aiosqlite.connect(self.db_path) as db:
async for path in Path().rglob("*"):
idx = uuid4().hex
mtime = (await path.stat()).st_mtime
await db.execute("INSERT INTO fileids VALUES (?, ?, ?)", (idx, str(path), mtime))
await db.commit()
self.initialized.set()

async for changes in awatch("."):
deleted_paths = []
added_paths = []
for change, changed_path in changes:
# get relative path
changed_path = Path(changed_path).relative_to(await Path().absolute())
changed_path_str = str(changed_path)

if change == Change.deleted:
logger.debug("File %s was deleted", changed_path_str)
async with db.execute(
"SELECT COUNT(*) FROM fileids WHERE path = ?", (changed_path_str,)
) as cursor:
if not (await cursor.fetchone())[0]:
# path is not indexed, ignore
logger.debug("File %s is not indexed, ignoring", changed_path_str)
continue
# path is indexed
await maybe_rename(db, changed_path_str, deleted_paths, added_paths, False)
elif change == Change.added:
logger.debug("File %s was added", changed_path_str)
await maybe_rename(db, changed_path_str, added_paths, deleted_paths, True)
elif change == Change.modified:
logger.debug("File %s was modified", changed_path_str)
if changed_path_str == self.db_path:
continue
async with db.execute(
"SELECT COUNT(*) FROM fileids WHERE path = ?", (changed_path_str,)
) as cursor:
if not (await cursor.fetchone())[0]:
# path is not indexed, ignore
logger.debug("File %s is not indexed, ignoring", changed_path_str)
continue
mtime = (await changed_path.stat()).st_mtime
await db.execute(
"UPDATE fileids SET mtime = ? WHERE path = ?", (mtime, changed_path_str)
)

for path in deleted_paths + added_paths:
await db.execute("DELETE FROM fileids WHERE path = ?", (path,))
await db.commit()

for change in changes:
changed_path = change[1]
# get relative path
changed_path = str(Path(changed_path).relative_to(await Path().absolute()))
for watcher in self.watchers.get(changed_path, []):
watcher.notify(change)

def watch(self, path: str) -> Watcher:
watcher = Watcher(path)
self.watchers[path] = self.watchers.get(path, [])
self.watchers[path].append(watcher)
davidbrochart marked this conversation as resolved.
Show resolved Hide resolved
return watcher

def unwatch(self, path: str, watcher: Watcher):
self.watchers[path].remove(watcher)


async def get_mtime(path, db) -> Optional[float]:
if db:
async with db.execute("SELECT mtime FROM fileids WHERE path = ?", (path,)) as cursor:
async for mtime, in cursor:
return mtime
# deleted file is not in database, shouldn't happen
return None
try:
mtime = (await Path(path).stat()).st_mtime
except FileNotFoundError:
return None
return mtime


async def maybe_rename(
db, changed_path: str, changed_paths: List[str], other_paths: List[str], is_added_path
) -> None:
# check if the same file was added/deleted, this would be a rename
db_or_fs1, db_or_fs2 = db, None
if is_added_path:
db_or_fs1, db_or_fs2 = db_or_fs2, db_or_fs1
mtime1 = await get_mtime(changed_path, db_or_fs1)
if mtime1 is None:
return
for other_path in other_paths:
mtime2 = await get_mtime(other_path, db_or_fs2)
if mtime1 == mtime2:
# same files, according to modification times
path1, path2 = changed_path, other_path
if is_added_path:
path1, path2 = path2, path1
logger.debug("File %s was renamed to %s", path1, path2)
await db.execute("UPDATE fileids SET path = ? WHERE path = ?", (path2, path1))
other_paths.remove(other_path)
return
changed_paths.append(changed_path)
1 change: 0 additions & 1 deletion plugins/contents/fps_contents/watchfiles.py

This file was deleted.

2 changes: 1 addition & 1 deletion plugins/contents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "fps_contents"
description = "An FPS plugin for the contents API"
keywords = [ "jupyter", "server", "fastapi", "pluggy", "plugins",]
requires-python = ">=3.7"
dependencies = [ "fps >=0.0.8", "fps-auth-base", "anyio", "watchfiles >=0.16.1,<1",]
dependencies = [ "fps >=0.0.8", "fps-auth-base", "anyio", "watchfiles >=0.16.1,<1", "aiosqlite >=0.17.0,<1", "anyio>=3.6.2,<4"]
dynamic = [ "version",]
[[project.authors]]
name = "Jupyter Development Team"
Expand Down
2 changes: 1 addition & 1 deletion plugins/lab/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "fps_lab"
description = "An FPS plugin for the JupyterLab/RetroLab API"
keywords = [ "jupyter", "server", "fastapi", "pluggy", "plugins",]
requires-python = ">=3.7"
dependencies = [ "fps >=0.0.8", "fps-auth-base", "fps-frontend", "aiofiles", "babel", "json5",]
dependencies = [ "fps >=0.0.8", "fps-auth-base", "fps-frontend", "babel", "json5",]
dynamic = [ "version",]
[[project.authors]]
name = "Jupyter Development Team"
Expand Down
6 changes: 6 additions & 0 deletions plugins/yjs/fps_yjs/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel


class CreateRoomId(BaseModel):
format: str
type: str
90 changes: 64 additions & 26 deletions plugins/yjs/fps_yjs/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@
from pathlib import Path
from typing import Optional, Tuple

from fastapi import APIRouter, Depends, WebSocketDisconnect
from fastapi import (
APIRouter,
Depends,
HTTPException,
Request,
Response,
WebSocketDisconnect,
status,
)
from fastapi.responses import PlainTextResponse
from fps.hooks import register_router # type: ignore
from fps_contents.routes import read_content, write_content # type: ignore

try:
from fps_contents.watchfiles import awatch

has_awatch = True
except ImportError:
has_awatch = False
from fps_auth_base import websocket_auth # type: ignore
from fps_auth_base import User, current_user
from fps_contents.fileid import FileIdManager
from fps_contents.routes import read_content, write_content # type: ignore
from jupyter_ydoc import ydocs as YDOCS # type: ignore
from ypy_websocket.websocket_server import WebsocketServer, YRoom # type: ignore
from ypy_websocket.ystore import BaseYStore, SQLiteYStore, YDocNotFound # type: ignore
from ypy_websocket.yutils import YMessageType # type: ignore

from .models import CreateRoomId

YFILE = YDOCS["file"]
AWARENESS = 1
RENAME_SESSION = 127
Expand All @@ -35,6 +41,12 @@ def to_datetime(iso_date: str) -> datetime:
return datetime.fromisoformat(iso_date.rstrip("Z"))


@router.on_event("startup")
async def startup():
# start indexing in the background
FileIdManager()


@router.websocket("/api/yjs/{path:path}")
async def websocket_endpoint(
path,
Expand Down Expand Up @@ -126,9 +138,14 @@ def __init__(self, websocket, path, permissions):
self.room = self.websocket_server.get_room(self.websocket.path)
self.set_file_info(path)

def get_file_info(self) -> Tuple[str, str, str]:
async def get_file_info(self) -> Tuple[str, str, str]:
room_name = self.websocket_server.get_room_name(self.room)
file_format, file_type, file_path = room_name.split(":", 2)
file_format, file_type, file_id = room_name.split(":", 2)
file_path = await FileIdManager().get_path(file_id)
if file_path is None:
raise RuntimeError(f"File {self.room.document.path} cannot be found anymore")
if file_path != self.room.document.path:
self.room.document.path = file_path
return file_format, file_type, file_path

def set_file_info(self, value: str) -> None:
Expand All @@ -145,7 +162,7 @@ async def serve(self):
self.room.cleaner.cancel()

if not self.room.is_transient and not self.room.ready:
file_format, file_type, file_path = self.get_file_info()
file_format, file_type, file_path = await self.get_file_info()
is_notebook = file_type == "notebook"
model = await read_content(file_path, True, as_json=is_notebook)
self.last_modified = to_datetime(model.last_modified)
Expand Down Expand Up @@ -212,22 +229,20 @@ async def on_message(self, message: bytes) -> bool:
return skip

async def watch_file(self):
if has_awatch:
file_format, file_type, file_path = self.get_file_info()
async for changes in awatch(file_path):
await self.maybe_load_document()
else:
# contents plugin doesn't provide watcher, fall back to polling
poll_interval = 1 # FIXME: pass in config
if not poll_interval:
self.room.watcher = None
return
while True:
await asyncio.sleep(poll_interval)
file_format, file_type, file_path = await self.get_file_info()
while True:
watcher = FileIdManager().watch(file_path)
async for changes in watcher:
file_format, file_type, new_file_path = await self.get_file_info()
if new_file_path != file_path:
# file was renamed
FileIdManager().unwatch(file_path, watcher)
file_path = new_file_path
break
await self.maybe_load_document()

async def maybe_load_document(self):
file_format, file_type, file_path = self.get_file_info()
file_format, file_type, file_path = await self.get_file_info()
model = await read_content(file_path, False)
# do nothing if the file was saved by us
if self.last_modified < to_datetime(model.last_modified):
Expand Down Expand Up @@ -266,7 +281,7 @@ async def maybe_save_document(self):
await asyncio.sleep(1)
# if the room cannot be found, don't save
try:
file_format, file_type, file_path = self.get_file_info()
file_format, file_type, file_path = await self.get_file_info()
except Exception:
return
is_notebook = file_type == "notebook"
Expand All @@ -293,4 +308,27 @@ async def maybe_save_document(self):
self.room.document.dirty = False


@router.put("/api/yjs/roomid/{path:path}", status_code=200, response_class=PlainTextResponse)
async def create_roomid(
path,
request: Request,
response: Response,
user: User = Depends(current_user(permissions={"contents": ["read"]})),
):
# we need to process the request manually
# see https://github.com/tiangolo/fastapi/issues/3373#issuecomment-1306003451
create_room_id = CreateRoomId(**(await request.json()))
ws_url = f"{create_room_id.format}:{create_room_id.type}:"
idx = await FileIdManager().get_id(path)
if idx is not None:
return ws_url + idx

idx = await FileIdManager().index(path)
if idx is None:
raise HTTPException(status_code=404, detail=f"File {path} does not exist")

response.status_code = status.HTTP_201_CREATED
return ws_url + idx


r = register_router(router)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dynamic = ["version"]
requires-python = ">=3.7"
dependencies = [
"fastapi>=0.87.0",
"fps>=0.0.19",
"fps>=0.0.21",
"fps-uvicorn>=0.0.19",
"fps-auth-base>=0.0.42",
"fps-contents>=0.0.42",
Expand Down
Loading