Using one session in FastAPI #7760
-
Hi, I'm learning aiohttp for the first time (coming from simple Requests). I read "Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling." Question: what is the proper way to use a single session (per worker) for a FastAPI app? On my local dev server, I'm using the code below (based on this snippet), and so far it's working as expected, but I wanted to see if it can be improved or made more reliable before using it in production. import asyncio
from random import randint
from socket import AF_INET
from fastapi import FastAPI
import aiohttp
MAX_CONNECTIONS = 100
TIMEOUT = 60 # seconds
class HttpSession:
aiohttp_client = None
@classmethod
def get_session(cls):
if cls.aiohttp_client is None:
timeout = aiohttp.ClientTimeout(total=TIMEOUT)
connector = aiohttp.TCPConnector(family=AF_INET, limit_per_host=MAX_CONNECTIONS)
cls.aiohttp_client = aiohttp.ClientSession(timeout=timeout, connector=connector)
return cls.aiohttp_client
@classmethod
async def close_session(cls):
if cls.aiohttp_client:
await cls.aiohttp_client.close()
cls.aiohttp_client = None
@classmethod
async def get(cls, url):
client = cls.get_session()
try:
async with client.get(url) as response:
if response.status != 200:
return { 'ERROR OCCURED' + await response.text() }
json_result = await response.json()
except Exception as err:
return { 'ERROR': err }
return json_result
app = FastAPI()
tasks = []
@app.on_event('startup')
async def on_startup():
HttpSession.get_session()
@app.on_event('shutdown')
async def on_shutdown():
await HttpSession.close_session()
@app.get('/work')
async def get_work():
"""Initiate long-running task"""
loop = asyncio.get_event_loop()
task = loop.create_task(do_work())
tasks.append(task)
return { 'message': 'Work initiated...' }
async def do_work():
"""Simulate slow, unreliable web service"""
delay = randint(2000, 10000) # milliseconds
url_200 = 'https://run.mocky.io/v3/22bc0d64-50c6-464c-ba58-52f8f2ec72f4'
url_503 = 'https://run.mocky.io/v3/c8adb86c-cdaa-42ff-bbe6-c423845dc4f0'
url = url_200 if randint(0, 1) else url_503
try:
response = await HttpSession.get(f'{url}?mocky-delay={delay}ms')
print(response)
except Exception as err:
print(err)
finally:
tasks.remove(asyncio.current_task())
if __name__ == '__main__': # local dev
import uvicorn
uvicorn.run(app, host='localhost', port=8000) |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
Update: after reading https://medium.com/@benshearlaw/how-to-use-httpx-request-client-with-fastapi-16255a9984a4, I replaced the startup/shutdown events with this:
|
Beta Was this translation helpful? Give feedback.
-
Thanks for the advice, @Dreamsorcerer , I will look into going with your suggestions. |
Beta Was this translation helpful? Give feedback.
That's fine. My only suggestion would be to avoid a singleton (and save yourself the wrapper code). In aiohttp, we'd use cleanup_ctx (same thing as lifespan in FastAPI/Starlette) and just save it to the app object:
The you'd be able to use it in any handler like
async with request.app[client].get(...) as resp:
.You can do something similar with FastAPI, but it won't be typed. Or, you can use their dependency injection stuff to make it typed (though I personally think i…