Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add limit and offset parameters to routes that fetch lists from db #277

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Dockerfile-dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM pyroapi:python3.8-alpine3.10
FROM pyronear/pyro-api:python3.8-alpine3.10

# copy requirements file
COPY requirements-dev.txt requirements-dev.txt
Expand Down
12 changes: 10 additions & 2 deletions src/app/api/crud/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from fastapi import HTTPException, Path, status
from pydantic import BaseModel
from sqlalchemy import Table
from sqlalchemy.orm import Query
from sqlalchemy.sql import Select

from app.db import database

Expand Down Expand Up @@ -41,16 +43,22 @@ async def fetch_all(
query_filters: Optional[Dict[str, Any]] = None,
exclusions: Optional[Dict[str, Any]] = None,
limit: int = 50,
offset: Optional[int] = None,
query: Optional[Select] = None,
) -> List[Mapping[str, Any]]:
query = table.select().order_by(table.c.id.desc())
if query is None:
query = table.select()
if isinstance(query_filters, dict):
for key, value in query_filters.items():
query = query.where(getattr(table.c, key) == value)

if isinstance(exclusions, dict):
for key, value in exclusions.items():
query = query.where(getattr(table.c, key) != value)
return (await database.fetch_all(query=query.limit(limit)))[::-1]
query = query.order_by(table.c.id.desc()).limit(limit).offset(offset)
if isinstance(query, Query):
return [item.__dict__ for item in query[::-1]]
return (await database.fetch_all(query=query))[::-1]
Comment on lines +46 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Two questions though:

  • I think I understand this is an equivalent of the offset syntax in SQL. But here we seem to be making an arbitrary choice on order direction. Should we make that a boolean param?
  • regarding the query arg, we expect it to already have a .select()?

If we only want to paginate the queries, there are a few plugin options for this I think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(paginate will prevent the case where we have a huge table and someone send limit=100000 which might blow up the RAM)



async def fetch_one(table: Table, query_filters: Dict[str, Any]) -> Optional[Mapping[str, Any]]:
Expand Down
13 changes: 9 additions & 4 deletions src/app/api/endpoints/accesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

from typing import List
from typing import List, Optional

from fastapi import APIRouter, Path, Security
from fastapi import APIRouter, Path, Query, Security
from typing_extensions import Annotated

from app.api import crud
from app.api.deps import get_current_access
Expand All @@ -25,8 +26,12 @@ async def get_access(access_id: int = Path(..., gt=0), _=Security(get_current_ac


@router.get("/", response_model=List[AccessRead], summary="Get the list of all accesses")
async def fetch_accesses(_=Security(get_current_access, scopes=[AccessType.admin])):
async def fetch_accesses(
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
_=Security(get_current_access, scopes=[AccessType.admin]),
):
Comment on lines -28 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder for me: how do we change those values when sending a request?

"""
Retrieves the list of all accesses and their information
"""
return await crud.fetch_all(accesses)
return await crud.fetch_all(accesses, limit=limit, offset=offset)
55 changes: 24 additions & 31 deletions src/app/api/endpoints/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from functools import partial
from string import Template
from typing import List, cast
from typing import List, Optional, cast

from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Security, status
from sqlalchemy import select
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Security, status
from typing_extensions import Annotated

from app.api import crud
from app.api.crud.authorizations import check_group_read, is_admin_access
Expand All @@ -18,7 +18,7 @@
from app.api.endpoints.notifications import send_notification
from app.api.endpoints.recipients import fetch_recipients_for_group
from app.api.external import post_request
from app.db import alerts, events, media
from app.db import alerts, media
from app.models import Access, AccessType, Alert, Device, Event
from app.schemas import AlertBase, AlertIn, AlertOut, DeviceOut, NotificationIn, RecipientOut

Expand Down Expand Up @@ -123,19 +123,22 @@ async def get_alert(

@router.get("/", response_model=List[AlertOut], summary="Get the list of all alerts")
async def fetch_alerts(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of all alerts and their information
"""
if await is_admin_access(requester.id):
return await crud.fetch_all(alerts)
else:
retrieved_alerts = (
session.query(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id).all()
)
retrieved_alerts = [x.__dict__ for x in retrieved_alerts]
return retrieved_alerts
return await crud.fetch_all(
alerts,
query=None
if await is_admin_access(requester.id)
else session.query(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
limit=limit,
offset=offset,
)


@router.delete("/{alert_id}/", response_model=AlertOut, summary="Delete a specific alert")
Expand All @@ -148,25 +151,15 @@ async def delete_alert(alert_id: int = Path(..., gt=0), _=Security(get_current_a

@router.get("/ongoing", response_model=List[AlertOut], summary="Get the list of ongoing alerts")
async def fetch_ongoing_alerts(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of ongoing alerts and their information
"""
if await is_admin_access(requester.id):
query = (
alerts.select().where(alerts.c.event_id.in_(select([events.c.id]).where(events.c.end_ts.is_(None))))
).order_by(alerts.c.id.desc())

return (await crud.base.database.fetch_all(query=query.limit(50)))[::-1]
else:
retrieved_alerts = (
session.query(Alert)
.join(Event)
.filter(Event.end_ts.is_(None))
.join(Device)
.join(Access)
.filter(Access.group_id == requester.group_id)
)
retrieved_alerts = [x.__dict__ for x in retrieved_alerts.all()]
return retrieved_alerts
query = session.query(Alert).join(Event).filter(Event.end_ts.is_(None))
if not await is_admin_access(requester.id):
query = query.join(Device).join(Access).filter(Access.group_id == requester.group_id)
return await crud.fetch_all(alerts, query=query, limit=limit, offset=offset)
33 changes: 21 additions & 12 deletions src/app/api/endpoints/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

from datetime import datetime
from typing import List, cast
from typing import List, Optional, cast

from fastapi import APIRouter, Depends, HTTPException, Path, Security, status
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Security, status
from typing_extensions import Annotated

from app.api import crud
from app.api.crud.authorizations import is_admin_access
Expand Down Expand Up @@ -80,18 +81,22 @@ async def get_my_device(me: DeviceOut = Security(get_current_device, scopes=["de

@router.get("/", response_model=List[DeviceOut], summary="Get the list of all devices")
async def fetch_devices(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of all devices and their information
"""
if await is_admin_access(requester.id):
return await crud.fetch_all(devices)
else:
retrieved_devices = session.query(Device).join(Access).filter(Access.group_id == requester.group_id).all()
retrieved_devices = [x.__dict__ for x in retrieved_devices]

return retrieved_devices
return await crud.fetch_all(
devices,
query=None
if await is_admin_access(requester.id)
else session.query(Device).join(Access).filter(Access.group_id == requester.group_id),
limit=limit,
offset=offset,
)


@router.put("/{device_id}/", response_model=DeviceOut, summary="Update information about a specific device")
Expand All @@ -115,11 +120,15 @@ async def delete_device(device_id: int = Path(..., gt=0), _=Security(get_current
@router.get(
"/my-devices", response_model=List[DeviceOut], summary="Get the list of all devices belonging to the current user"
)
async def fetch_my_devices(me: UserRead = Security(get_current_user, scopes=[AccessType.admin, AccessType.user])):
async def fetch_my_devices(
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
me: UserRead = Security(get_current_user, scopes=[AccessType.admin, AccessType.user]),
):
"""
Retrieves the list of all devices and the information which are owned by the current user
"""
return await crud.fetch_all(devices, {"owner_id": me.id})
return await crud.fetch_all(devices, {"owner_id": me.id}, limit=limit, offset=offset)


@router.put("/heartbeat", response_model=DeviceOut, summary="Update the last ping of the current device")
Expand Down
87 changes: 46 additions & 41 deletions src/app/api/endpoints/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

from typing import List, cast
from typing import List, Optional, cast

from fastapi import APIRouter, Depends, Path, Security, status
from fastapi import APIRouter, Depends, Path, Query, Security, status
from pydantic import PositiveInt
from sqlalchemy import and_
from typing_extensions import Annotated

from app.api import crud
from app.api.crud.authorizations import check_group_read, check_group_update, is_admin_access
Expand Down Expand Up @@ -44,40 +44,43 @@ async def get_event(

@router.get("/", response_model=List[EventOut], summary="Get the list of all events")
async def fetch_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of all events and their information
"""
if await is_admin_access(requester.id):
return await crud.fetch_all(events)
else:
retrieved_events = (
session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id)
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events
return await crud.fetch_all(
events,
query=None
if await is_admin_access(requester.id)
else session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
limit=limit,
offset=offset,
)


@router.get("/past", response_model=List[EventOut], summary="Get the list of all past events")
async def fetch_past_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of all events and their information
Retrieves the list of all events without end timestamp and their information
"""
if await is_admin_access(requester.id):
return await crud.fetch_all(events, exclusions={"end_ts": None})
else:
retrieved_events = (
session.query(Event)
.join(Alert)
.join(Device)
.join(Access)
.filter(and_(Access.group_id == requester.group_id, Event.end_ts.isnot(None)))
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events
return await crud.fetch_all(
events,
exclusions={"end_ts": None},
query=None
if await is_admin_access(requester.id)
else session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
limit=limit,
offset=offset,
)


@router.put("/{event_id}/", response_model=EventOut, summary="Update information about a specific event")
Expand Down Expand Up @@ -122,28 +125,30 @@ async def delete_event(
"/unacknowledged", response_model=List[EventOut], summary="Get the list of events that haven't been acknowledged"
)
async def fetch_unacknowledged_events(
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]), session=Depends(get_db)
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
"""
Retrieves the list of non confirmed alerts and their information
Retrieves the list of unacknowledged alerts and their information
"""
if await is_admin_access(requester.id):
return await crud.fetch_all(events, {"is_acknowledged": False})
else:
retrieved_events = (
session.query(Event)
.join(Alert)
.join(Device)
.join(Access)
.filter(and_(Access.group_id == requester.group_id, Event.is_acknowledged.is_(False)))
)
retrieved_events = [x.__dict__ for x in retrieved_events.all()]
return retrieved_events
return await crud.fetch_all(
events,
{"is_acknowledged": False},
query=None
if await is_admin_access(requester.id)
else session.query(Event).join(Alert).join(Device).join(Access).filter(Access.group_id == requester.group_id),
limit=limit,
offset=offset,
)


@router.get("/{event_id}/alerts", response_model=List[AlertOut], summary="Get the list of alerts for event")
async def fetch_alerts_for_event(
event_id: PositiveInt,
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
requester=Security(get_current_access, scopes=[AccessType.admin, AccessType.user]),
session=Depends(get_db),
):
Expand All @@ -152,4 +157,4 @@ async def fetch_alerts_for_event(
"""
requested_group_id = await get_entity_group_id(events, event_id)
await check_group_read(requester.id, cast(int, requested_group_id))
return await crud.base.database.fetch_all(query=alerts.select().where(alerts.c.event_id == event_id))
return await crud.fetch_all(alerts, {"event_id": event_id}, limit=limit, offset=offset)
12 changes: 8 additions & 4 deletions src/app/api/endpoints/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

from typing import List
from typing import List, Optional

from fastapi import APIRouter, Path, Security, status
from fastapi import APIRouter, Path, Query, Security, status
from typing_extensions import Annotated

from app.api import crud
from app.api.deps import get_current_access
Expand Down Expand Up @@ -35,11 +36,14 @@ async def get_group(group_id: int = Path(..., gt=0)):


@router.get("/", response_model=List[GroupOut], summary="Get the list of all groups")
async def fetch_groups():
async def fetch_groups(
limit: Annotated[int, Query(description="maximum number of items", ge=1, le=1000)] = 50,
offset: Annotated[Optional[int], Query(description="number of items to skip", ge=0)] = None,
):
"""
Retrieves the list of all groups and their information
"""
return await crud.fetch_all(groups)
return await crud.fetch_all(groups, limit=limit, offset=offset)


@router.put("/{group_id}/", response_model=GroupOut, summary="Update information about a specific group")
Expand Down
Loading
Loading