-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from hotosm/feat-google-login
Google Login Implementation
- Loading branch information
Showing
8 changed files
with
1,323 additions
and
764 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
"""Core logic for Google OAuth.""" | ||
|
||
import base64 | ||
import json | ||
import logging | ||
|
||
from itsdangerous import BadSignature, SignatureExpired | ||
from itsdangerous.url_safe import URLSafeSerializer | ||
from requests_oauthlib import OAuth2Session | ||
from pydantic import BaseModel | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class Login(BaseModel): | ||
login_url: str | ||
|
||
|
||
class Token(BaseModel): | ||
access_token: str | ||
|
||
|
||
class Auth: | ||
"""Main class for Google login.""" | ||
|
||
def __init__( | ||
self, | ||
authorization_url, | ||
token_url, | ||
client_id, | ||
client_secret, | ||
secret_key, | ||
login_redirect_uri, | ||
scope, | ||
): | ||
"""Set object params and get OAuth2 session.""" | ||
self.authorization_url = authorization_url | ||
self.token_url = token_url | ||
self.client_secret = client_secret | ||
self.secret_key = secret_key | ||
self.oauth = OAuth2Session( | ||
client_id, | ||
redirect_uri=login_redirect_uri, | ||
scope=scope, | ||
) | ||
|
||
def login(self) -> dict: | ||
"""Generate login URL from Google session. | ||
Provides a login URL using the session created by Google | ||
client id and redirect uri supplied. | ||
Returns: | ||
dict: {'login_url': 'URL'} | ||
""" | ||
login_url, _ = self.oauth.authorization_url(self.authorization_url) | ||
return json.loads(Login(login_url=login_url).model_dump_json()) | ||
|
||
def callback(self, callback_url: str) -> str: | ||
"""Performs token exchange between Google and the callback website. | ||
Core will use Oauth secret key from configuration while deserializing token, | ||
provides access token that can be used for authorized endpoints. | ||
Args: | ||
callback_url(str): Absolute URL should be passed which | ||
is catched from login_redirect_uri. | ||
Returns: | ||
access_token(str): The decoded access token. | ||
""" | ||
self.oauth.fetch_token( | ||
self.token_url, | ||
authorization_response=callback_url, | ||
client_secret=self.client_secret, | ||
) | ||
|
||
user_api_url = "https://www.googleapis.com/oauth2/v1/userinfo" | ||
resp = self.oauth.get(user_api_url) | ||
if resp.status_code != 200: | ||
raise ValueError("Invalid response from Google") | ||
data = resp.json().get("user") | ||
serializer = URLSafeSerializer(self.secret_key) | ||
user_data = { | ||
"id": data.get("id"), | ||
"username": data.get("display_name"), | ||
"img_url": data.get("img").get("href") if data.get("img") else None, | ||
} | ||
token = serializer.dumps(user_data) | ||
access_token = base64.b64encode(bytes(token, "utf-8")).decode("utf-8") | ||
token = Token(access_token=access_token) | ||
return json.loads(token.model_dump_json()) | ||
|
||
def deserialize_access_token(self, access_token: str) -> dict: | ||
"""Returns the userdata as JSON from access token. | ||
Can be used for login required decorator or to check | ||
the access token provided. | ||
Args: | ||
access_token(str): The access token from Auth.callback() | ||
Returns: | ||
user_data(dict): A JSON of user data from Google. | ||
""" | ||
deserializer = URLSafeSerializer(self.secret_key) | ||
|
||
try: | ||
decoded_token = base64.b64decode(access_token) | ||
except Exception as e: | ||
log.error(e) | ||
log.error(f"Could not decode token: {access_token}") | ||
raise ValueError("Could not decode token") from e | ||
|
||
try: | ||
user_data = deserializer.loads(decoded_token) | ||
except (SignatureExpired, BadSignature) as e: | ||
log.error(e) | ||
raise ValueError("Auth token is invalid or expired") from e | ||
|
||
return user_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import os | ||
import json | ||
from loguru import logger as log | ||
from fastapi import Depends, Request | ||
from fastapi.responses import JSONResponse | ||
from sqlalchemy.orm import Session | ||
from app.db import database | ||
from app.users.user_routes import router | ||
from app.users.user_deps import init_google_auth, login_required | ||
from app.users.user_schemas import AuthUser | ||
from app.config import settings | ||
|
||
if settings.DEBUG: | ||
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" | ||
|
||
|
||
@router.get("/google-login") | ||
async def login_url(google_auth=Depends(init_google_auth)): | ||
"""Get Login URL for Google Oauth Application. | ||
The application must be registered on google oauth. | ||
Open the download url returned to get access_token. | ||
Args: | ||
request: The GET request. | ||
google_auth: The Auth object. | ||
Returns: | ||
login_url (string): URL to authorize user in Google OAuth. | ||
Includes URL params: client_id, redirect_uri, permission scope. | ||
""" | ||
login_url = google_auth.login() | ||
log.debug(f"Login URL returned: {login_url}") | ||
return JSONResponse(content=login_url, status_code=200) | ||
|
||
|
||
@router.get("/callback/") | ||
async def callback(request: Request, google_auth=Depends(init_google_auth)): | ||
"""Performs token exchange between Google and DTM API""" | ||
|
||
callback_url = str(request.url) | ||
access_token = google_auth.callback(callback_url).get("access_token") | ||
return json.loads(access_token) | ||
|
||
|
||
@router.get("/my-info/") | ||
async def my_data( | ||
db: Session = Depends(database.get_db), | ||
user_data: AuthUser = Depends(login_required), | ||
): | ||
"""Read access token and get user details from Google""" | ||
|
||
return user_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.