From 6c03d23056847ab2d465652b49a2559033dcdae1 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Sat, 6 Apr 2024 16:18:40 -0300 Subject: [PATCH] refactor(django_channels): move redis storage to its own file so redis its not required and update imports --- pycrdt_websocket/django_channels/__init__.py | 2 +- .../base_yroom_storage.py} | 78 ----------------- .../storage/redis_yroom_storage.py | 83 +++++++++++++++++++ .../django_channels/yjs_consumer.py | 10 ++- 4 files changed, 91 insertions(+), 82 deletions(-) rename pycrdt_websocket/django_channels/{yroom_storage.py => storage/base_yroom_storage.py} (62%) create mode 100644 pycrdt_websocket/django_channels/storage/redis_yroom_storage.py diff --git a/pycrdt_websocket/django_channels/__init__.py b/pycrdt_websocket/django_channels/__init__.py index 0f860bd..7be8da3 100644 --- a/pycrdt_websocket/django_channels/__init__.py +++ b/pycrdt_websocket/django_channels/__init__.py @@ -1,2 +1,2 @@ +from .storage.base_yroom_storage import BaseYRoomStorage as BaseYRoomStorage from .yjs_consumer import YjsConsumer as YjsConsumer -from .yroom_storage import BaseYRoomStorage as BaseYRoomStorage diff --git a/pycrdt_websocket/django_channels/yroom_storage.py b/pycrdt_websocket/django_channels/storage/base_yroom_storage.py similarity index 62% rename from pycrdt_websocket/django_channels/yroom_storage.py rename to pycrdt_websocket/django_channels/storage/base_yroom_storage.py index f41f08c..427b5af 100644 --- a/pycrdt_websocket/django_channels/yroom_storage.py +++ b/pycrdt_websocket/django_channels/storage/base_yroom_storage.py @@ -2,7 +2,6 @@ from abc import ABC, abstractmethod from typing import Optional -import redis.asyncio as redis from pycrdt import Doc @@ -108,80 +107,3 @@ async def throttled_save_snapshot(self) -> None: await self.save_snapshot() self.last_saved_at = time.time() - - -class RedisYRoomStorage(BaseYRoomStorage): - """A YRoom storage that uses Redis as main storage, without - persistent storage. - Args: - room_name: The name of the room. - """ - - def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: - super().__init__(room_name, save_throttle_interval) - - self.redis_key = f"document:{self.room_name}" - self.redis = self._make_redis() - - async def get_document(self) -> Doc: - snapshot = await self.redis.get(self.redis_key) - - if not snapshot: - snapshot = await self.load_snapshot() - - document = Doc() - - if snapshot: - document.apply_update(snapshot) - - return document - - async def update_document(self, update: bytes): - await self.redis.watch(self.redis_key) - - try: - current_document = await self.get_document() - updated_snapshot = self._apply_update_to_document(current_document, update) - - async with self.redis.pipeline() as pipe: - while True: - try: - pipe.multi() - pipe.set(self.redis_key, updated_snapshot) - - await pipe.execute() - - break - except redis.WatchError: - current_document = await self.get_document() - updated_snapshot = self._apply_update_to_document( - current_document, - update, - ) - - continue - finally: - await self.redis.unwatch() - - await self.throttled_save_snapshot() - - async def load_snapshot(self) -> Optional[bytes]: - return None - - async def save_snapshot(self) -> Optional[bytes]: - return None - - async def close(self): - await self.save_snapshot() - await self.redis.close() - - def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes: - document.apply_update(update) - - return document.get_update() - - def _make_redis(self): - """Makes a Redis client. - Defaults to a local client""" - - return redis.Redis(host="localhost", port=6379, db=0) diff --git a/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py new file mode 100644 index 0000000..51d1fb3 --- /dev/null +++ b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py @@ -0,0 +1,83 @@ +from typing import Optional + +import redis.asyncio as redis +from pycrdt import Doc + +from .base_yroom_storage import BaseYRoomStorage + + +class RedisYRoomStorage(BaseYRoomStorage): + """A YRoom storage that uses Redis as main storage, without + persistent storage. + Args: + room_name: The name of the room. + """ + + def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: + super().__init__(room_name, save_throttle_interval) + + self.redis_key = f"document:{self.room_name}" + self.redis = self._make_redis() + + async def get_document(self) -> Doc: + snapshot = await self.redis.get(self.redis_key) + + if not snapshot: + snapshot = await self.load_snapshot() + + document = Doc() + + if snapshot: + document.apply_update(snapshot) + + return document + + async def update_document(self, update: bytes): + await self.redis.watch(self.redis_key) + + try: + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_document(current_document, update) + + async with self.redis.pipeline() as pipe: + while True: + try: + pipe.multi() + pipe.set(self.redis_key, updated_snapshot) + + await pipe.execute() + + break + except redis.WatchError: + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_document( + current_document, + update, + ) + + continue + finally: + await self.redis.unwatch() + + await self.throttled_save_snapshot() + + async def load_snapshot(self) -> Optional[bytes]: + return None + + async def save_snapshot(self) -> Optional[bytes]: + return None + + async def close(self): + await self.save_snapshot() + await self.redis.close() + + def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes: + document.apply_update(update) + + return document.get_update() + + def _make_redis(self): + """Makes a Redis client. + Defaults to a local client""" + + return redis.Redis(host="localhost", port=6379, db=0) diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index e4999ea..ff750df 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -6,7 +6,7 @@ from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found] from pycrdt import Doc -from pycrdt_websocket.django_channels.yroom_storage import BaseYRoomStorage +from pycrdt_websocket.django_channels.storage.base_yroom_storage import BaseYRoomStorage from ..websocket import Websocket from ..yutils import ( @@ -96,13 +96,14 @@ class YjsConsumer(AsyncWebsocketConsumer): from channels.layers import get_channel_layer from pycrdt_websocket.django_channels_consumer import YjsConsumer from pycrdt_websocket.yutils import create_update_message + from pycrdt_websocket.django_channels.storage.redis_yroom_storage import RedisYRoomStorage class DocConsumer(YjsConsumer): def make_room_storage(self) -> BaseYRoomStorage: # Modify the room storage here - return RedisYRoomStorage(self.room_name) + return RedisYRoomStorage(room_name=self.room_name) def make_room_name(self) -> str: # Modify the room name here @@ -147,7 +148,10 @@ def make_room_storage(self) -> BaseYRoomStorage | None: Defaults to not using any (just broadcast updates between consumers). Example: - self.room_storage = RedisYRoomStorage(self.room_name) + self.room_storage = YourCustomRedisYRoomStorage( + room_name=self.room_name, + save_throttle_interval=5 + ) """ return None