Skip to content

Commit

Permalink
refactor(django_channels): move redis storage to its own file so redi…
Browse files Browse the repository at this point in the history
…s its not required and update imports
  • Loading branch information
cacosandon committed Apr 6, 2024
1 parent bfd11c4 commit 6c03d23
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 82 deletions.
2 changes: 1 addition & 1 deletion pycrdt_websocket/django_channels/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .storage.base_yroom_storage import BaseYRoomStorage as BaseYRoomStorage
from .yjs_consumer import YjsConsumer as YjsConsumer
from .yroom_storage import BaseYRoomStorage as BaseYRoomStorage
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from abc import ABC, abstractmethod
from typing import Optional

import redis.asyncio as redis
from pycrdt import Doc


Expand Down Expand Up @@ -108,80 +107,3 @@ async def throttled_save_snapshot(self) -> None:
await self.save_snapshot()

self.last_saved_at = time.time()


class RedisYRoomStorage(BaseYRoomStorage):
"""A YRoom storage that uses Redis as main storage, without
persistent storage.
Args:
room_name: The name of the room.
"""

def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None:
super().__init__(room_name, save_throttle_interval)

self.redis_key = f"document:{self.room_name}"
self.redis = self._make_redis()

async def get_document(self) -> Doc:
snapshot = await self.redis.get(self.redis_key)

if not snapshot:
snapshot = await self.load_snapshot()

document = Doc()

if snapshot:
document.apply_update(snapshot)

return document

async def update_document(self, update: bytes):
await self.redis.watch(self.redis_key)

try:
current_document = await self.get_document()
updated_snapshot = self._apply_update_to_document(current_document, update)

async with self.redis.pipeline() as pipe:
while True:
try:
pipe.multi()
pipe.set(self.redis_key, updated_snapshot)

await pipe.execute()

break
except redis.WatchError:
current_document = await self.get_document()
updated_snapshot = self._apply_update_to_document(
current_document,
update,
)

continue
finally:
await self.redis.unwatch()

await self.throttled_save_snapshot()

async def load_snapshot(self) -> Optional[bytes]:
return None

async def save_snapshot(self) -> Optional[bytes]:
return None

async def close(self):
await self.save_snapshot()
await self.redis.close()

def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes:
document.apply_update(update)

return document.get_update()

def _make_redis(self):
"""Makes a Redis client.
Defaults to a local client"""

return redis.Redis(host="localhost", port=6379, db=0)
83 changes: 83 additions & 0 deletions pycrdt_websocket/django_channels/storage/redis_yroom_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Optional

import redis.asyncio as redis
from pycrdt import Doc

from .base_yroom_storage import BaseYRoomStorage


class RedisYRoomStorage(BaseYRoomStorage):
"""A YRoom storage that uses Redis as main storage, without
persistent storage.
Args:
room_name: The name of the room.
"""

def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None:
super().__init__(room_name, save_throttle_interval)

self.redis_key = f"document:{self.room_name}"
self.redis = self._make_redis()

async def get_document(self) -> Doc:
snapshot = await self.redis.get(self.redis_key)

if not snapshot:
snapshot = await self.load_snapshot()

document = Doc()

if snapshot:
document.apply_update(snapshot)

return document

async def update_document(self, update: bytes):
await self.redis.watch(self.redis_key)

try:
current_document = await self.get_document()
updated_snapshot = self._apply_update_to_document(current_document, update)

async with self.redis.pipeline() as pipe:
while True:
try:
pipe.multi()
pipe.set(self.redis_key, updated_snapshot)

await pipe.execute()

break
except redis.WatchError:
current_document = await self.get_document()
updated_snapshot = self._apply_update_to_document(
current_document,
update,
)

continue
finally:
await self.redis.unwatch()

await self.throttled_save_snapshot()

async def load_snapshot(self) -> Optional[bytes]:
return None

async def save_snapshot(self) -> Optional[bytes]:
return None

async def close(self):
await self.save_snapshot()
await self.redis.close()

def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes:
document.apply_update(update)

return document.get_update()

def _make_redis(self):
"""Makes a Redis client.
Defaults to a local client"""

return redis.Redis(host="localhost", port=6379, db=0)
10 changes: 7 additions & 3 deletions pycrdt_websocket/django_channels/yjs_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found]
from pycrdt import Doc

from pycrdt_websocket.django_channels.yroom_storage import BaseYRoomStorage
from pycrdt_websocket.django_channels.storage.base_yroom_storage import BaseYRoomStorage

from ..websocket import Websocket
from ..yutils import (
Expand Down Expand Up @@ -96,13 +96,14 @@ class YjsConsumer(AsyncWebsocketConsumer):
from channels.layers import get_channel_layer
from pycrdt_websocket.django_channels_consumer import YjsConsumer
from pycrdt_websocket.yutils import create_update_message
from pycrdt_websocket.django_channels.storage.redis_yroom_storage import RedisYRoomStorage
class DocConsumer(YjsConsumer):
def make_room_storage(self) -> BaseYRoomStorage:
# Modify the room storage here
return RedisYRoomStorage(self.room_name)
return RedisYRoomStorage(room_name=self.room_name)
def make_room_name(self) -> str:
# Modify the room name here
Expand Down Expand Up @@ -147,7 +148,10 @@ def make_room_storage(self) -> BaseYRoomStorage | None:
Defaults to not using any (just broadcast updates between consumers).
Example:
self.room_storage = RedisYRoomStorage(self.room_name)
self.room_storage = YourCustomRedisYRoomStorage(
room_name=self.room_name,
save_throttle_interval=5
)
"""
return None

Expand Down

0 comments on commit 6c03d23

Please sign in to comment.