Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes the initialization of rooms #198

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

from jupyter_server.extension.application import ExtensionApp
from traitlets import Bool, Float, Type
from ypy_websocket.ystore import BaseYStore

from .handlers import DocSessionHandler, YDocWebSocketHandler
from .loaders import FileLoaderMapping
from .stores import SQLiteYStore
from .stores import BaseYStore, SQLiteYStore
from .utils import EVENTS_SCHEMA_PATH
from .websocketserver import JupyterWebsocketServer

Expand All @@ -22,6 +21,8 @@ class YDocExtension(ExtensionApp):
Enables Real Time Collaboration in JupyterLab
"""

_store: BaseYStore = None

disable_rtc = Bool(False, config=True, help="Whether to disable real time collaboration.")

file_poll_interval = Float(
Expand Down Expand Up @@ -80,10 +81,12 @@ def initialize_handlers(self):
for k, v in self.config.get(self.ystore_class.__name__, {}).items():
setattr(self.ystore_class, k, v)

# Instantiate the store
self._store = self.ystore_class(log=self.log)

self.ywebsocket_server = JupyterWebsocketServer(
rooms_ready=False,
auto_clean_rooms=False,
ystore_class=self.ystore_class,
log=self.log,
)

Expand All @@ -103,7 +106,7 @@ def initialize_handlers(self):
"document_cleanup_delay": self.document_cleanup_delay,
"document_save_delay": self.document_save_delay,
"file_loaders": self.file_loaders,
"ystore_class": self.ystore_class,
"store": self._store,
"ywebsocket_server": self.ywebsocket_server,
},
),
Expand Down
18 changes: 12 additions & 6 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from tornado import web
from tornado.websocket import WebSocketHandler
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore
from ypy_websocket.yutils import YMessageType, write_var_uint

from .loaders import FileLoaderMapping
from .rooms import DocumentRoom, TransientRoom
from .stores import BaseYStore
from .utils import (
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
Expand Down Expand Up @@ -62,6 +62,14 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
# NOTE: Initialize in the ExtensionApp.start_extension once
# https://github.com/jupyter-server/jupyter_server/issues/1329
# is done.
# We are temporarily initializing the store here because `start``
# is an async function
if self._store is not None and not self._store.initialized:
await self._store.initialize()

if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()
Expand All @@ -84,15 +92,13 @@ async def prepare(self):
)

file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(path=updates_file_path, log=self.log)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self._store,
self.log,
self._document_save_delay,
)
Expand All @@ -111,15 +117,15 @@ def initialize(
self,
ywebsocket_server: JupyterWebsocketServer,
file_loaders: FileLoaderMapping,
ystore_class: type[BaseYStore],
store: BaseYStore,
document_cleanup_delay: float | None = 60.0,
document_save_delay: float | None = 1.0,
) -> None:
self._background_tasks = set()
# File ID manager cannot be passed as argument as the extension may load after this one
self._file_id_manager = self.settings["file_id_manager"]
self._file_loaders = file_loaders
self._ystore_class = ystore_class
self._store = store
self._cleanup_delay = document_cleanup_delay
self._document_save_delay = document_save_delay
self._websocket_server = ywebsocket_server
Expand Down
57 changes: 29 additions & 28 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

from jupyter_events import EventLogger
from jupyter_ydoc import ydocs as YDOCS
from ypy_websocket.stores import BaseYStore
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore, YDocNotFound
from ypy_websocket.yutils import write_var_uint

from .loaders import FileLoader
Expand Down Expand Up @@ -104,36 +104,28 @@ async def initialize(self) -> None:
return

self.log.info("Initializing room %s", self._room_id)

model = await self._file.load_content(self._file_format, self._file_type, True)

async with self._update_lock:
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass
if self.ystore is not None and await self.ystore.exists(self._room_id):
# Load the content from the store
await self.ystore.apply_updates(self._room_id, self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)

if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
)
Expand All @@ -142,17 +134,26 @@ async def initialize(self) -> None:
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

if read_from_source:
doc = await self.ystore.get(self._room_id)
await self.ystore.remove(self._room_id)
version = 0
if "version" in doc:
version = doc["version"] + 1

await self.ystore.create(self._room_id, version)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

else:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info(
"Content in room %s loaded from file %s", self._room_id, self._file.path
)
self._document.source = model["content"]

if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)
if self.ystore is not None:
await self.ystore.create(self._room_id, 0)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

self._last_modified = model["last_modified"]
self._document.dirty = False
Expand Down
6 changes: 6 additions & 0 deletions jupyter_collaboration/stores/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from .base_store import BaseYStore # noqa
from .stores import SQLiteYStore, TempFileYStore # noqa
from .utils import YDocExists, YDocNotFound # noqa
157 changes: 157 additions & 0 deletions jupyter_collaboration/stores/base_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from __future__ import annotations

from abc import ABC, abstractmethod
from inspect import isawaitable
from typing import AsyncIterator, Awaitable, Callable, cast

import y_py as Y
from anyio import Event


class BaseYStore(ABC):
"""
Base class for the stores.
"""

version = 3
metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None

_store_path: str
_initialized: Event | None = None

@abstractmethod
def __init__(
self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None
):
"""
Initialize the object.

Arguments:
path: The path where the store will be located.
metadata_callback: An optional callback to call to get the metadata.
log: An optional logger.
"""
...

@abstractmethod
async def initialize(self) -> None:
"""
Initializes the store.
"""
...

@abstractmethod
async def exists(self, path: str) -> bool:
"""
Returns True if the document exists, else returns False.

Arguments:
path: The document name/path.
"""
...

@abstractmethod
async def list(self) -> AsyncIterator[str]:
"""
Returns a list with the name/path of the documents stored.
"""
...

@abstractmethod
async def get(self, path: str, updates: bool = False) -> dict | None:
"""
Returns the document's metadata or None if the document does't exist.

Arguments:
path: The document name/path.
updates: Whether to return document's content or only the metadata.
"""
...

@abstractmethod
async def create(self, path: str, version: int) -> None:
"""
Creates a new document.

Arguments:
path: The document name/path.
version: Document version.
"""
...

@abstractmethod
async def remove(self, path: str) -> dict | None:
"""
Removes a document.

Arguments:
path: The document name/path.
"""
...

@abstractmethod
async def write(self, path: str, data: bytes) -> None:
"""
Store a document update.

Arguments:
path: The document name/path.
data: The update to store.
"""
...

@abstractmethod
async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]:
"""
Async iterator for reading document's updates.

Arguments:
path: The document name/path.

Returns:
A tuple of (update, metadata, timestamp) for each update.
"""
...

@property
def initialized(self) -> bool:
if self._initialized is not None:
return self._initialized.is_set()
return False

async def get_metadata(self) -> bytes:
"""
Returns:
The metadata.
"""
if self.metadata_callback is None:
return b""

metadata = self.metadata_callback()
if isawaitable(metadata):
metadata = await metadata
metadata = cast(bytes, metadata)
return metadata

async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None:
"""Store a YDoc state.

Arguments:
path: The document name/path.
ydoc: The YDoc from which to store the state.
"""
update = Y.encode_state_as_update(ydoc) # type: ignore
await self.write(path, update)

async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None:
"""Apply all stored updates to the YDoc.

Arguments:
path: The document name/path.
ydoc: The YDoc on which to apply the updates.
"""
async for update, *rest in self.read(path): # type: ignore
Y.apply_update(ydoc, update) # type: ignore
Loading