From 794930369cc1d4c1aa21c42654ccae9cef67c04b Mon Sep 17 00:00:00 2001 From: Javad Zarezadeh Date: Tue, 27 Aug 2024 18:18:41 +0330 Subject: [PATCH] Changing all status codes from raw numbers to using fastapi.status --- backend/app/api/deps.py | 11 ++++-- backend/app/api/routes/items.py | 26 ++++++++++---- backend/app/api/routes/login.py | 25 ++++++++----- backend/app/api/routes/users.py | 33 ++++++++++------- backend/app/api/routes/utils.py | 4 +-- backend/app/tests/api/routes/test_items.py | 23 ++++++------ backend/app/tests/api/routes/test_login.py | 15 ++++---- backend/app/tests/api/routes/test_users.py | 41 +++++++++++----------- 8 files changed, 108 insertions(+), 70 deletions(-) diff --git a/backend/app/api/deps.py b/backend/app/api/deps.py index c2b83c841d..3dc0d04c79 100644 --- a/backend/app/api/deps.py +++ b/backend/app/api/deps.py @@ -40,9 +40,13 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User: ) user = session.get(User, token_data.sub) if not user: - raise HTTPException(status_code=404, detail="User not found") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" + ) if not user.is_active: - raise HTTPException(status_code=400, detail="Inactive user") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" + ) return user @@ -52,6 +56,7 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User: def get_current_active_superuser(current_user: CurrentUser) -> User: if not current_user.is_superuser: raise HTTPException( - status_code=403, detail="The user doesn't have enough privileges" + status_code=status.HTTP_403_FORBIDDEN, + detail="The user doesn't have enough privileges", ) return current_user diff --git a/backend/app/api/routes/items.py b/backend/app/api/routes/items.py index 67196c2366..09032d3827 100644 --- a/backend/app/api/routes/items.py +++ b/backend/app/api/routes/items.py @@ -1,7 +1,7 @@ import uuid from typing import Any -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, status from sqlmodel import func, select from app.api.deps import CurrentUser, SessionDep @@ -48,9 +48,13 @@ def read_item(session: SessionDep, current_user: CurrentUser, id: uuid.UUID) -> """ item = session.get(Item, id) if not item: - raise HTTPException(status_code=404, detail="Item not found") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) if not current_user.is_superuser and (item.owner_id != current_user.id): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Not enough permissions" + ) return item @@ -81,9 +85,13 @@ def update_item( """ item = session.get(Item, id) if not item: - raise HTTPException(status_code=404, detail="Item not found") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) if not current_user.is_superuser and (item.owner_id != current_user.id): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Not enough permissions" + ) update_dict = item_in.model_dump(exclude_unset=True) item.sqlmodel_update(update_dict) session.add(item) @@ -101,9 +109,13 @@ def delete_item( """ item = session.get(Item, id) if not item: - raise HTTPException(status_code=404, detail="Item not found") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) if not current_user.is_superuser and (item.owner_id != current_user.id): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Not enough permissions" + ) session.delete(item) session.commit() return Message(message="Item deleted successfully") diff --git a/backend/app/api/routes/login.py b/backend/app/api/routes/login.py index fe7e94d5c1..844cc27b5a 100644 --- a/backend/app/api/routes/login.py +++ b/backend/app/api/routes/login.py @@ -1,7 +1,7 @@ from datetime import timedelta from typing import Annotated, Any -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import HTMLResponse from fastapi.security import OAuth2PasswordRequestForm @@ -32,9 +32,14 @@ def login_access_token( session=session, email=form_data.username, password=form_data.password ) if not user: - raise HTTPException(status_code=400, detail="Incorrect email or password") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Incorrect email or password", + ) elif not user.is_active: - raise HTTPException(status_code=400, detail="Inactive user") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" + ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return Token( access_token=security.create_access_token( @@ -60,7 +65,7 @@ def recover_password(email: str, session: SessionDep) -> Message: if not user: raise HTTPException( - status_code=404, + status_code=status.HTTP_404_NOT_FOUND, detail="The user with this email does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) @@ -82,15 +87,19 @@ def reset_password(session: SessionDep, body: NewPassword) -> Message: """ email = verify_password_reset_token(token=body.token) if not email: - raise HTTPException(status_code=400, detail="Invalid token") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token" + ) user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( - status_code=404, + status_code=status.HTTP_404_NOT_FOUND, detail="The user with this email does not exist in the system.", ) elif not user.is_active: - raise HTTPException(status_code=400, detail="Inactive user") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" + ) hashed_password = get_password_hash(password=body.new_password) user.hashed_password = hashed_password session.add(user) @@ -111,7 +120,7 @@ def recover_password_html_content(email: str, session: SessionDep) -> Any: if not user: raise HTTPException( - status_code=404, + status_code=status.HTTP_404_NOT_FOUND, detail="The user with this username does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) diff --git a/backend/app/api/routes/users.py b/backend/app/api/routes/users.py index c636b094ee..928e1901ff 100644 --- a/backend/app/api/routes/users.py +++ b/backend/app/api/routes/users.py @@ -1,7 +1,7 @@ import uuid from typing import Any -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, status from sqlmodel import col, delete, func, select from app import crud @@ -58,7 +58,7 @@ def create_user(*, session: SessionDep, user_in: UserCreate) -> Any: user = crud.get_user_by_email(session=session, email=user_in.email) if user: raise HTTPException( - status_code=400, + status_code=status.HTTP_400_BAD_REQUEST, detail="The user with this email already exists in the system.", ) @@ -87,7 +87,8 @@ def update_user_me( existing_user = crud.get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != current_user.id: raise HTTPException( - status_code=409, detail="User with this email already exists" + status_code=status.HTTP_409_CONFLICT, + detail="User with this email already exists", ) user_data = user_in.model_dump(exclude_unset=True) current_user.sqlmodel_update(user_data) @@ -105,10 +106,13 @@ def update_password_me( Update own password. """ if not verify_password(body.current_password, current_user.hashed_password): - raise HTTPException(status_code=400, detail="Incorrect password") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect password" + ) if body.current_password == body.new_password: raise HTTPException( - status_code=400, detail="New password cannot be the same as the current one" + status_code=status.HTTP_400_BAD_REQUEST, + detail="New password cannot be the same as the current one", ) hashed_password = get_password_hash(body.new_password) current_user.hashed_password = hashed_password @@ -132,7 +136,8 @@ def delete_user_me(session: SessionDep, current_user: CurrentUser) -> Any: """ if current_user.is_superuser: raise HTTPException( - status_code=403, detail="Super users are not allowed to delete themselves" + status_code=status.HTTP_403_FORBIDDEN, + detail="Super users are not allowed to delete themselves", ) statement = delete(Item).where(col(Item.owner_id) == current_user.id) session.exec(statement) # type: ignore @@ -149,7 +154,7 @@ def register_user(session: SessionDep, user_in: UserRegister) -> Any: user = crud.get_user_by_email(session=session, email=user_in.email) if user: raise HTTPException( - status_code=400, + status_code=status.HTTP_400_BAD_REQUEST, detail="The user with this email already exists in the system", ) user_create = UserCreate.model_validate(user_in) @@ -169,7 +174,7 @@ def read_user_by_id( return user if not current_user.is_superuser: raise HTTPException( - status_code=403, + status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges", ) return user @@ -193,14 +198,15 @@ def update_user( db_user = session.get(User, user_id) if not db_user: raise HTTPException( - status_code=404, + status_code=status.HTTP_404_NOT_FOUND, detail="The user with this id does not exist in the system", ) if user_in.email: existing_user = crud.get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != user_id: raise HTTPException( - status_code=409, detail="User with this email already exists" + status_code=status.HTTP_409_CONFLICT, + detail="User with this email already exists", ) db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in) @@ -216,10 +222,13 @@ def delete_user( """ user = session.get(User, user_id) if not user: - raise HTTPException(status_code=404, detail="User not found") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" + ) if user == current_user: raise HTTPException( - status_code=403, detail="Super users are not allowed to delete themselves" + status_code=status.HTTP_403_FORBIDDEN, + detail="Super users are not allowed to delete themselves", ) statement = delete(Item).where(col(Item.owner_id) == user_id) session.exec(statement) # type: ignore diff --git a/backend/app/api/routes/utils.py b/backend/app/api/routes/utils.py index 82f6d2b821..8a38e7e78c 100644 --- a/backend/app/api/routes/utils.py +++ b/backend/app/api/routes/utils.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, status from pydantic.networks import EmailStr from app.api.deps import get_current_active_superuser @@ -11,7 +11,7 @@ @router.post( "/test-email/", dependencies=[Depends(get_current_active_superuser)], - status_code=201, + status_code=status.HTTP_201_CREATED, ) def test_email(email_to: EmailStr) -> Message: """ diff --git a/backend/app/tests/api/routes/test_items.py b/backend/app/tests/api/routes/test_items.py index c215238a69..80ec5aecf7 100644 --- a/backend/app/tests/api/routes/test_items.py +++ b/backend/app/tests/api/routes/test_items.py @@ -1,5 +1,6 @@ import uuid +from fastapi import status from fastapi.testclient import TestClient from sqlmodel import Session @@ -16,7 +17,7 @@ def test_create_item( headers=superuser_token_headers, json=data, ) - assert response.status_code == 200 + assert response.status_code == status.HTTP_200_OK content = response.json() assert content["title"] == data["title"] assert content["description"] == data["description"] @@ -32,7 +33,7 @@ def test_read_item( f"{settings.API_V1_STR}/items/{item.id}", headers=superuser_token_headers, ) - assert response.status_code == 200 + assert response.status_code == status.HTTP_200_OK content = response.json() assert content["title"] == item.title assert content["description"] == item.description @@ -47,7 +48,7 @@ def test_read_item_not_found( f"{settings.API_V1_STR}/items/{uuid.uuid4()}", headers=superuser_token_headers, ) - assert response.status_code == 404 + assert response.status_code == status.HTTP_404_NOT_FOUND content = response.json() assert content["detail"] == "Item not found" @@ -60,7 +61,7 @@ def test_read_item_not_enough_permissions( f"{settings.API_V1_STR}/items/{item.id}", headers=normal_user_token_headers, ) - assert response.status_code == 400 + assert response.status_code == status.HTTP_400_BAD_REQUEST content = response.json() assert content["detail"] == "Not enough permissions" @@ -74,7 +75,7 @@ def test_read_items( f"{settings.API_V1_STR}/items/", headers=superuser_token_headers, ) - assert response.status_code == 200 + assert response.status_code == status.HTTP_200_OK content = response.json() assert len(content["data"]) >= 2 @@ -89,7 +90,7 @@ def test_update_item( headers=superuser_token_headers, json=data, ) - assert response.status_code == 200 + assert response.status_code == status.HTTP_200_OK content = response.json() assert content["title"] == data["title"] assert content["description"] == data["description"] @@ -106,7 +107,7 @@ def test_update_item_not_found( headers=superuser_token_headers, json=data, ) - assert response.status_code == 404 + assert response.status_code == status.HTTP_404_NOT_FOUND content = response.json() assert content["detail"] == "Item not found" @@ -121,7 +122,7 @@ def test_update_item_not_enough_permissions( headers=normal_user_token_headers, json=data, ) - assert response.status_code == 400 + assert response.status_code == status.HTTP_400_BAD_REQUEST content = response.json() assert content["detail"] == "Not enough permissions" @@ -134,7 +135,7 @@ def test_delete_item( f"{settings.API_V1_STR}/items/{item.id}", headers=superuser_token_headers, ) - assert response.status_code == 200 + assert response.status_code == status.HTTP_200_OK content = response.json() assert content["message"] == "Item deleted successfully" @@ -146,7 +147,7 @@ def test_delete_item_not_found( f"{settings.API_V1_STR}/items/{uuid.uuid4()}", headers=superuser_token_headers, ) - assert response.status_code == 404 + assert response.status_code == status.HTTP_404_NOT_FOUND content = response.json() assert content["detail"] == "Item not found" @@ -159,6 +160,6 @@ def test_delete_item_not_enough_permissions( f"{settings.API_V1_STR}/items/{item.id}", headers=normal_user_token_headers, ) - assert response.status_code == 400 + assert response.status_code == status.HTTP_400_BAD_REQUEST content = response.json() assert content["detail"] == "Not enough permissions" diff --git a/backend/app/tests/api/routes/test_login.py b/backend/app/tests/api/routes/test_login.py index 34fe8ee560..3657a89667 100644 --- a/backend/app/tests/api/routes/test_login.py +++ b/backend/app/tests/api/routes/test_login.py @@ -1,5 +1,6 @@ from unittest.mock import patch +from fastapi import status from fastapi.testclient import TestClient from sqlmodel import Session, select @@ -16,7 +17,7 @@ def test_get_access_token(client: TestClient) -> None: } r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data) tokens = r.json() - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK assert "access_token" in tokens assert tokens["access_token"] @@ -27,7 +28,7 @@ def test_get_access_token_incorrect_password(client: TestClient) -> None: "password": "incorrect", } r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data) - assert r.status_code == 400 + assert r.status_code == status.HTTP_400_BAD_REQUEST def test_use_access_token( @@ -38,7 +39,7 @@ def test_use_access_token( headers=superuser_token_headers, ) result = r.json() - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK assert "email" in result @@ -54,7 +55,7 @@ def test_recovery_password( f"{settings.API_V1_STR}/password-recovery/{email}", headers=normal_user_token_headers, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK assert r.json() == {"message": "Password recovery email sent"} @@ -66,7 +67,7 @@ def test_recovery_password_user_not_exits( f"{settings.API_V1_STR}/password-recovery/{email}", headers=normal_user_token_headers, ) - assert r.status_code == 404 + assert r.status_code == status.HTTP_404_NOT_FOUND def test_reset_password( @@ -79,7 +80,7 @@ def test_reset_password( headers=superuser_token_headers, json=data, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK assert r.json() == {"message": "Password updated successfully"} user_query = select(User).where(User.email == settings.FIRST_SUPERUSER) @@ -100,5 +101,5 @@ def test_reset_password_invalid_token( response = r.json() assert "detail" in response - assert r.status_code == 400 + assert r.status_code == status.HTTP_400_BAD_REQUEST assert response["detail"] == "Invalid token" diff --git a/backend/app/tests/api/routes/test_users.py b/backend/app/tests/api/routes/test_users.py index ba9be65426..1126ef0211 100644 --- a/backend/app/tests/api/routes/test_users.py +++ b/backend/app/tests/api/routes/test_users.py @@ -1,6 +1,7 @@ import uuid from unittest.mock import patch +from fastapi import status from fastapi.testclient import TestClient from sqlmodel import Session, select @@ -109,7 +110,7 @@ def test_get_existing_user_permissions_error( f"{settings.API_V1_STR}/users/{uuid.uuid4()}", headers=normal_user_token_headers, ) - assert r.status_code == 403 + assert r.status_code == status.HTTP_403_FORBIDDEN assert r.json() == {"detail": "The user doesn't have enough privileges"} @@ -128,7 +129,7 @@ def test_create_user_existing_username( json=data, ) created_user = r.json() - assert r.status_code == 400 + assert r.status_code == status.HTTP_400_BAD_REQUEST assert "_id" not in created_user @@ -143,7 +144,7 @@ def test_create_user_by_normal_user( headers=normal_user_token_headers, json=data, ) - assert r.status_code == 403 + assert r.status_code == status.HTTP_403_FORBIDDEN def test_retrieve_users( @@ -179,7 +180,7 @@ def test_update_user_me( headers=normal_user_token_headers, json=data, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK updated_user = r.json() assert updated_user["email"] == email assert updated_user["full_name"] == full_name @@ -204,7 +205,7 @@ def test_update_password_me( headers=superuser_token_headers, json=data, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK updated_user = r.json() assert updated_user["message"] == "Password updated successfully" @@ -226,7 +227,7 @@ def test_update_password_me( ) db.refresh(user_db) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password) @@ -240,7 +241,7 @@ def test_update_password_me_incorrect_password( headers=superuser_token_headers, json=data, ) - assert r.status_code == 400 + assert r.status_code == status.HTTP_400_BAD_REQUEST updated_user = r.json() assert updated_user["detail"] == "Incorrect password" @@ -259,7 +260,7 @@ def test_update_user_me_email_exists( headers=normal_user_token_headers, json=data, ) - assert r.status_code == 409 + assert r.status_code == status.HTTP_409_CONFLICT assert r.json()["detail"] == "User with this email already exists" @@ -275,7 +276,7 @@ def test_update_password_me_same_password_error( headers=superuser_token_headers, json=data, ) - assert r.status_code == 400 + assert r.status_code == status.HTTP_400_BAD_REQUEST updated_user = r.json() assert ( updated_user["detail"] == "New password cannot be the same as the current one" @@ -291,7 +292,7 @@ def test_register_user(client: TestClient, db: Session) -> None: f"{settings.API_V1_STR}/users/signup", json=data, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK created_user = r.json() assert created_user["email"] == username assert created_user["full_name"] == full_name @@ -316,7 +317,7 @@ def test_register_user_already_exists_error(client: TestClient) -> None: f"{settings.API_V1_STR}/users/signup", json=data, ) - assert r.status_code == 400 + assert r.status_code == status.HTTP_400_BAD_REQUEST assert r.json()["detail"] == "The user with this email already exists in the system" @@ -334,7 +335,7 @@ def test_update_user( headers=superuser_token_headers, json=data, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK updated_user = r.json() assert updated_user["full_name"] == "Updated_full_name" @@ -355,7 +356,7 @@ def test_update_user_not_exists( headers=superuser_token_headers, json=data, ) - assert r.status_code == 404 + assert r.status_code == status.HTTP_404_NOT_FOUND assert r.json()["detail"] == "The user with this id does not exist in the system" @@ -378,7 +379,7 @@ def test_update_user_email_exists( headers=superuser_token_headers, json=data, ) - assert r.status_code == 409 + assert r.status_code == status.HTTP_409_CONFLICT assert r.json()["detail"] == "User with this email already exists" @@ -402,7 +403,7 @@ def test_delete_user_me(client: TestClient, db: Session) -> None: f"{settings.API_V1_STR}/users/me", headers=headers, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK deleted_user = r.json() assert deleted_user["message"] == "User deleted successfully" result = db.exec(select(User).where(User.id == user_id)).first() @@ -420,7 +421,7 @@ def test_delete_user_me_as_superuser( f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers, ) - assert r.status_code == 403 + assert r.status_code == status.HTTP_403_FORBIDDEN response = r.json() assert response["detail"] == "Super users are not allowed to delete themselves" @@ -437,7 +438,7 @@ def test_delete_user_super_user( f"{settings.API_V1_STR}/users/{user_id}", headers=superuser_token_headers, ) - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK deleted_user = r.json() assert deleted_user["message"] == "User deleted successfully" result = db.exec(select(User).where(User.id == user_id)).first() @@ -451,7 +452,7 @@ def test_delete_user_not_found( f"{settings.API_V1_STR}/users/{uuid.uuid4()}", headers=superuser_token_headers, ) - assert r.status_code == 404 + assert r.status_code == status.HTTP_404_NOT_FOUND assert r.json()["detail"] == "User not found" @@ -466,7 +467,7 @@ def test_delete_user_current_super_user_error( f"{settings.API_V1_STR}/users/{user_id}", headers=superuser_token_headers, ) - assert r.status_code == 403 + assert r.status_code == status.HTTP_403_FORBIDDEN assert r.json()["detail"] == "Super users are not allowed to delete themselves" @@ -482,5 +483,5 @@ def test_delete_user_without_privileges( f"{settings.API_V1_STR}/users/{user.id}", headers=normal_user_token_headers, ) - assert r.status_code == 403 + assert r.status_code == status.HTTP_403_FORBIDDEN assert r.json()["detail"] == "The user doesn't have enough privileges"