-
-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug: No cleanup for SSE-endpoints if client tears connection down #3772
Comments
Temporary solution is to shield import contextlib
import sys
import typing
import anyio
import litestar
import redis.asyncio
from litestar.response import ServerSentEvent
AsyncContextManagerReturnType = typing.TypeVar("AsyncContextManagerReturnType")
def shield_async_context_manager_aexit(
async_context_manager: typing.AsyncContextManager[AsyncContextManagerReturnType],
) -> typing.AsyncContextManager[AsyncContextManagerReturnType]:
@contextlib.asynccontextmanager
async def _inner() -> typing.AsyncGenerator[AsyncContextManagerReturnType, None]:
async_context_manager_result: typing.Final = (
await async_context_manager.__aenter__()
)
try:
yield async_context_manager_result
finally:
with anyio.CancelScope(shield=True):
await async_context_manager.__aexit__(*sys.exc_info())
return _inner()
@contextlib.asynccontextmanager
async def create_redis_client() -> typing.AsyncGenerator[redis.asyncio.Redis, None]:
redis_client = redis.asyncio.Redis(host="localhost", port="6379")
try:
await redis_client.initialize()
print("initialized redis client")
yield redis_client
finally:
await redis_client.aclose()
print("closed redis client")
redis_list_key = "whatever"
async def _iter_sse_session_events_as_str() -> typing.AsyncIterable[str]:
async with shield_async_context_manager_aexit(
create_redis_client()
) as redis_client:
while True:
try:
# BLPOP blocks redis client until an item in list is available,
# i. e. you can't do anything with the client while waiting here.
_list_key, event_content = await redis_client.blpop(redis_list_key)
except BaseException as exception:
print("caught an exception from blpop:", exception)
raise exception
yield event_content
@litestar.get("/sse")
async def listen_to_sse_session_events() -> ServerSentEvent:
return ServerSentEvent(_iter_sse_session_events_as_str())
app = litestar.Litestar([listen_to_sse_session_events]) |
any reason you create the redis connection inside the generator ? that seems highly inneficient, usually that's what lifespan is for |
BLPOP is a blocking operation. To serve to more than one request we would have to create connection-per-request. |
Running into this issue as well |
Anyways, I found out that this is intended and simply a quirk with async generators and async context managers in Python. Nothing you can do about it except using |
At first I though this was just a side effect of not having implemented cancelling when a client disconnects (#2496), but on second look, this just seems to be an unexpected behaviour in the with the interplay of route handlers and generators, as @winstxnhdw mentioned. Even if Litestar were to cancel the route handler task on client disconnect, this wouldn't have the result you want, since the handler task finishes as soon as you return the Maybe we could provide some functionality to handle this special case, something like adding |
Description
This leads to resource leakage. Possibly relevant to #2958.
URL to code causing the issue
https://github.com/vrslev/litestar-sse-cleanup-issue
MCVE
No response
Steps to reproduce
Screenshots
No response
Logs
Litestar Version
2.12.1
Platform
Note
While we are open for sponsoring on GitHub Sponsors and
OpenCollective, we also utilize Polar.sh to engage in pledge-based sponsorship.
Check out all issues funded or available for funding on our Polar.sh dashboard
The text was updated successfully, but these errors were encountered: