Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing all status codes from raw numbers to using fastapi.status #1319

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions backend/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User:
)
user = session.get(User, token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return user


Expand All @@ -52,6 +56,7 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User:
def get_current_active_superuser(current_user: CurrentUser) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
return current_user
26 changes: 19 additions & 7 deletions backend/app/api/routes/items.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from typing import Any

from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select

from app.api.deps import CurrentUser, SessionDep
Expand Down Expand Up @@ -48,9 +48,13 @@ def read_item(session: SessionDep, current_user: CurrentUser, id: uuid.UUID) ->
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Item not found"
)
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Not enough permissions"
)
return item


Expand Down Expand Up @@ -81,9 +85,13 @@ def update_item(
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Item not found"
)
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Not enough permissions"
)
update_dict = item_in.model_dump(exclude_unset=True)
item.sqlmodel_update(update_dict)
session.add(item)
Expand All @@ -101,9 +109,13 @@ def delete_item(
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Item not found"
)
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Not enough permissions"
)
session.delete(item)
session.commit()
return Message(message="Item deleted successfully")
25 changes: 17 additions & 8 deletions backend/app/api/routes/login.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import timedelta
from typing import Annotated, Any

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import HTMLResponse
from fastapi.security import OAuth2PasswordRequestForm

Expand Down Expand Up @@ -32,9 +32,14 @@ def login_access_token(
session=session, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect email or password",
)
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return Token(
access_token=security.create_access_token(
Expand All @@ -60,7 +65,7 @@ def recover_password(email: str, session: SessionDep) -> Message:

if not user:
raise HTTPException(
status_code=404,
status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this email does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
Expand All @@ -82,15 +87,19 @@ def reset_password(session: SessionDep, body: NewPassword) -> Message:
"""
email = verify_password_reset_token(token=body.token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token"
)
user = crud.get_user_by_email(session=session, email=email)
if not user:
raise HTTPException(
status_code=404,
status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this email does not exist in the system.",
)
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
hashed_password = get_password_hash(password=body.new_password)
user.hashed_password = hashed_password
session.add(user)
Expand All @@ -111,7 +120,7 @@ def recover_password_html_content(email: str, session: SessionDep) -> Any:

if not user:
raise HTTPException(
status_code=404,
status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
Expand Down
33 changes: 21 additions & 12 deletions backend/app/api/routes/users.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from typing import Any

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import col, delete, func, select

from app import crud
Expand Down Expand Up @@ -58,7 +58,7 @@ def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
status_code=status.HTTP_400_BAD_REQUEST,
detail="The user with this email already exists in the system.",
)

Expand Down Expand Up @@ -87,7 +87,8 @@ def update_user_me(
existing_user = crud.get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != current_user.id:
raise HTTPException(
status_code=409, detail="User with this email already exists"
status_code=status.HTTP_409_CONFLICT,
detail="User with this email already exists",
)
user_data = user_in.model_dump(exclude_unset=True)
current_user.sqlmodel_update(user_data)
Expand All @@ -105,10 +106,13 @@ def update_password_me(
Update own password.
"""
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect password")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect password"
)
if body.current_password == body.new_password:
raise HTTPException(
status_code=400, detail="New password cannot be the same as the current one"
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password cannot be the same as the current one",
)
hashed_password = get_password_hash(body.new_password)
current_user.hashed_password = hashed_password
Expand All @@ -132,7 +136,8 @@ def delete_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
"""
if current_user.is_superuser:
raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves"
status_code=status.HTTP_403_FORBIDDEN,
detail="Super users are not allowed to delete themselves",
)
statement = delete(Item).where(col(Item.owner_id) == current_user.id)
session.exec(statement) # type: ignore
Expand All @@ -149,7 +154,7 @@ def register_user(session: SessionDep, user_in: UserRegister) -> Any:
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
status_code=status.HTTP_400_BAD_REQUEST,
detail="The user with this email already exists in the system",
)
user_create = UserCreate.model_validate(user_in)
Expand All @@ -169,7 +174,7 @@ def read_user_by_id(
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=403,
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
return user
Expand All @@ -193,14 +198,15 @@ def update_user(
db_user = session.get(User, user_id)
if not db_user:
raise HTTPException(
status_code=404,
status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this id does not exist in the system",
)
if user_in.email:
existing_user = crud.get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != user_id:
raise HTTPException(
status_code=409, detail="User with this email already exists"
status_code=status.HTTP_409_CONFLICT,
detail="User with this email already exists",
)

db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in)
Expand All @@ -216,10 +222,13 @@ def delete_user(
"""
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
if user == current_user:
raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves"
status_code=status.HTTP_403_FORBIDDEN,
detail="Super users are not allowed to delete themselves",
)
statement = delete(Item).where(col(Item.owner_id) == user_id)
session.exec(statement) # type: ignore
Expand Down
4 changes: 2 additions & 2 deletions backend/app/api/routes/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, status
from pydantic.networks import EmailStr

from app.api.deps import get_current_active_superuser
Expand All @@ -11,7 +11,7 @@
@router.post(
"/test-email/",
dependencies=[Depends(get_current_active_superuser)],
status_code=201,
status_code=status.HTTP_201_CREATED,
)
def test_email(email_to: EmailStr) -> Message:
"""
Expand Down
23 changes: 12 additions & 11 deletions backend/app/tests/api/routes/test_items.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid

from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session

Expand All @@ -16,7 +17,7 @@ def test_create_item(
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
Expand All @@ -32,7 +33,7 @@ def test_read_item(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["title"] == item.title
assert content["description"] == item.description
Expand All @@ -47,7 +48,7 @@ def test_read_item_not_found(
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.status_code == status.HTTP_404_NOT_FOUND
content = response.json()
assert content["detail"] == "Item not found"

Expand All @@ -60,7 +61,7 @@ def test_read_item_not_enough_permissions(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
assert response.status_code == status.HTTP_400_BAD_REQUEST
content = response.json()
assert content["detail"] == "Not enough permissions"

Expand All @@ -74,7 +75,7 @@ def test_read_items(
f"{settings.API_V1_STR}/items/",
headers=superuser_token_headers,
)
assert response.status_code == 200
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert len(content["data"]) >= 2

Expand All @@ -89,7 +90,7 @@ def test_update_item(
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
Expand All @@ -106,7 +107,7 @@ def test_update_item_not_found(
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.status_code == status.HTTP_404_NOT_FOUND
content = response.json()
assert content["detail"] == "Item not found"

Expand All @@ -121,7 +122,7 @@ def test_update_item_not_enough_permissions(
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 400
assert response.status_code == status.HTTP_400_BAD_REQUEST
content = response.json()
assert content["detail"] == "Not enough permissions"

Expand All @@ -134,7 +135,7 @@ def test_delete_item(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["message"] == "Item deleted successfully"

Expand All @@ -146,7 +147,7 @@ def test_delete_item_not_found(
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.status_code == status.HTTP_404_NOT_FOUND
content = response.json()
assert content["detail"] == "Item not found"

Expand All @@ -159,6 +160,6 @@ def test_delete_item_not_enough_permissions(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
assert response.status_code == status.HTTP_400_BAD_REQUEST
content = response.json()
assert content["detail"] == "Not enough permissions"
Loading