2
\$\begingroup\$
from fastapi import APIRouter, HTTPException, status from api.database import database, user_table from api.models.user import UserIn from api.security import authenticate_user, get_password_hash, get_user, create_access_token router = APIRouter() @router.post("/register", status_code=201) async def register(user: UserIn): if await get_user(user.email): raise HTTPException(status_code=400, detail="A user with that email already exists") hashed_password = get_password_hash(user.password) query = user_table.insert().values(email=user.email, password=hashed_password) await database.execute(query) return {"detail": "User created"} @router.post("/login") async def login(user: UserIn): user = await authenticate_user(user.email, user.password) access_token = create_access_token(user.email) return {"access_token": access_token, "token_type": "bearer"} 

User router

import datetime from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import ExpiredSignatureError, JWTError, jwt from passlib.context import CryptContext from api.database import database, user_table SECRET_KEY = "2b31f2a" ALGORITHM = "HS256" oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") pwd_context = CryptContext(schemes=["bcrypt"]) credentials_exception = HTTPException( status_code=401, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"} ) def access_token_expire_minutes() -> int: return 30 def create_access_token(email: str): expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=30) jwt_data = {"sub": email, "exp": expire} encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_password_hash(password: str) -> str: return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) async def get_user(email:str): query = user_table.select().where(user_table.c.email == email) result = await database.fetch_one(query) if result: return result async def authenticate_user(email: str, password: str): user = await get_user(email) if not user: raise credentials_exception if not verify_password(password, user.password): raise credentials_exception return user async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): try: payload = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM]) email = payload.get("sub") if email is None: raise credentials_exception except ExpiredSignatureError as e: raise HTTPException( status_code=401, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"} ) from e except JWTError as e: raise credentials_exception from e user = await get_user(email=email) if user is None: raise credentials_exception return user 

Security module

So the way it is configured right now makes it difficult to implement a logout, and the way it's implemented right now forces us to use get_user several times if we were to hold a column called session_active to implement logout, or a column called secret to hold the secret so we can logout, but then as I've said we need to call get_user several times if we were to do that, so I am wondering if there's any issue with the code.

\$\endgroup\$

    1 Answer 1

    1
    \$\begingroup\$

    secrets in the source ?!?

    SECRET_KEY = "2b31f2a" 

    This is a problem.

    Delete the line. And generate a new key, invalidating the old one. Secrets go into a vault, an HSM, or maybe into env vars or a text file.

    Source repos like git are good at many things, but managing secrets is not one of them.

    Maybe we wish to use the identifier PEPPER for this secret?

    hash vs cleartext

     query = user_table.insert().values(email=user.email, password=hashed_password) 

    I don't understand what's going on with that line. Let's leave aside that what we're assigning is clearly an insert command rather than a select query.

    Somewhere in the backend we're apparently passing around a cleartext password, yet we invite confusion between whether we persisted it in cleartext form or in hashed form? That's going to be a problem for every maintenance engineer who joins the project, and for each annual security review you conduct.

    If it's an argon2id hash, then call it a hash.

    Consider this subsequent line:

    async def login(user: UserIn): user = await authenticate_user(user.email, user.password) 

    At this point, I have no idea if user.password denotes some cleartext that, after a breach, could be used used to authenticate to unrelated sites, or if it actually denotes a hashed credential.

    non-SI units

    def access_token_expire_minutes() -> int: 

    Thank you for explaining the units very clearly; that's helpful.

    Consider adjusting by a factor of 60, so we can demote the "it's in seconds!" aspect to just a comment.

    hardcoded parameter

    This is very nice:

    import datetime as dt def create_access_token(email: str): expire = dt.datetime.now(dt.timezone.utc) + dt.timedelta(minutes=30) 

    Consider throwing that parameter into the signature as an optional kwarg:

    def create_access_token(email: str, expire_sec: int = 30 * 60): expire = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expire_sec) 

    nit: Use $ black *.py to make this more legible, imposing "4 blanks per tab stop".

    Annotating email as str is helpful, thank you. Feel free to tell us about the -> return type, as well. View it with type(encoded_jwt), or use reveal_type() with mypy.

    implicit return

    async def get_user(email: str): ... if result: return result 

    This isn't great. And there's no signature annotation of a return type, which keeps mypy from being as helpful as it could be. Consider using $ mypy --strict ...

    If None comes back as the result, we just fall off the end of the function, implicitly returning None. Please tack on an explicit return None to such functions, to alert maintenance engineers of the "else" behavior.

    But here, the test is pointless. Unconditionally return result, and you're done. Let the signature annotation do the work of pointing out it's an optional row we're sending back.

    cached result

    Do we have "too many DB calls" that get a user? No, I don't think so, especially since we're asking for exact equality on an indexed column, hopefully one that has a UNIQUE constraint on it.

    But suppose the query takes more than a millisecond, maybe the RDBMS is across a WAN link or something silly like that, and async lookups aren't enough. Then you might consider decorating with @lru_cache(maxsize=1).

    If you want to enforce a timeout, write your own @property getter, or tack on a final def ... , epoch: int) parameter which will force a cache miss whenever the epoch changes. Make the epoch be current time rounded to two seconds, or whatever you're comfortable with.

    cipher choice

    pwd_context = CryptContext(schemes=["bcrypt"]) 

    Documentation written since 2015 will typically recommend a different scheme, a contest winner. The passlib guidance appears to be on the somewhat dated side.

    It would be helpful to future maintenance engineers to add a comment or docstring explaining the design rationale for not going with argon2id.

    explicit logout

    Consider adding an additional column to the user table, holding a timestamp. Use it to record the time of last login, and pass that timestamp around when making "is this request authenticated?" decisions.

    Then your /logout button simply sets it to a timestamp in January 1970, or maybe makes it NULL, to invalidate future replay attempts of that credential. End user will have to /login after that in order to put a fresh timestamp there.

    \$\endgroup\$