from fastapi import APIRouter, HTTPException, status from api.database import database, user_table from api.models.user import UserIn from api.security import authenticate_user, get_password_hash, get_user, create_access_token router = APIRouter() @router.post("/register", status_code=201) async def register(user: UserIn): if await get_user(user.email): raise HTTPException(status_code=400, detail="A user with that email already exists") hashed_password = get_password_hash(user.password) query = user_table.insert().values(email=user.email, password=hashed_password) await database.execute(query) return {"detail": "User created"} @router.post("/login") async def login(user: UserIn): user = await authenticate_user(user.email, user.password) access_token = create_access_token(user.email) return {"access_token": access_token, "token_type": "bearer"}
User router
import datetime from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import ExpiredSignatureError, JWTError, jwt from passlib.context import CryptContext from api.database import database, user_table SECRET_KEY = "2b31f2a" ALGORITHM = "HS256" oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") pwd_context = CryptContext(schemes=["bcrypt"]) credentials_exception = HTTPException( status_code=401, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"} ) def access_token_expire_minutes() -> int: return 30 def create_access_token(email: str): expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=30) jwt_data = {"sub": email, "exp": expire} encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_password_hash(password: str) -> str: return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) async def get_user(email:str): query = user_table.select().where(user_table.c.email == email) result = await database.fetch_one(query) if result: return result async def authenticate_user(email: str, password: str): user = await get_user(email) if not user: raise credentials_exception if not verify_password(password, user.password): raise credentials_exception return user async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): try: payload = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM]) email = payload.get("sub") if email is None: raise credentials_exception except ExpiredSignatureError as e: raise HTTPException( status_code=401, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"} ) from e except JWTError as e: raise credentials_exception from e user = await get_user(email=email) if user is None: raise credentials_exception return user
Security module
So the way it is configured right now makes it difficult to implement a logout, and the way it's implemented right now forces us to use get_user several times if we were to hold a column called session_active to implement logout, or a column called secret to hold the secret so we can logout, but then as I've said we need to call get_user several times if we were to do that, so I am wondering if there's any issue with the code.