1
\$\begingroup\$

I liked the idea of being able to authenticate using your own private key using Nostr protocol. The idea is based on events, so you prove your identity by signing an event.

It's close to Wallet Connect used in web3 and Metamask but not related to cryptocurrency directly, and it's used in Nostr social network.

I've created this code based on Fastapi Authentication Tutorial

I would like to know how far or my code is close to OAuth 2.0 specifications. I'm using the user public key as a username and the signature of the event signed as the password.

I'm also passing the JSON-formatted event data to make it easier to validate without storing the historical data of events in the backend.

The code is below and more details in this code snippet with test vector

# Based on https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/ # and https://github.com/monty888/monstr/blob/master/src/monstr/event/event.py from datetime import datetime, timedelta, timezone from typing import Annotated import jwt import hashlib import json from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jwt.exceptions import InvalidTokenError from pydantic import BaseModel from passlib.context import CryptContext import secp256k1 from fastapi import Form # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "497664fb69cf00d6fa47eacb8401d421cc22a01beefe0041e9a5882a1252a538" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 ## List of banned accounts banned_list = [] class CustomOAuth2PasswordRequestForm(OAuth2PasswordRequestForm): """Custom OAuth2 password request form with additional JSON data.""" def __init__( self, username: str = Form(...), password: str = Form(...), json_data: str = Form(...), ): super().__init__(username=username, password=password) self.json_data = json_data class Token(BaseModel): """Token model for JWT token response.""" access_token: str token_type: str class TokenData(BaseModel): """Token data model for decoded JWT token.""" username: str | None = None class User(BaseModel): """User model with public key.""" user_pubkey: str class UserInDB(User): """User model with hashed password.""" hashed_password: str class PublicKeyRequest(BaseModel): """Request model for public key.""" pubkey: str class EventToSign(BaseModel): """Event model to be signed.""" id: str pubkey: str created_at: int kind: int tags: list content: str sig: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") app = FastAPI() class Event: """Event class to handle Nostr events.""" def __init__( self, id=None, sig=None, kind=None, content=None, tags=None, pub_key=None, created_at=None, ): self._id = id self._sig = sig self._kind = kind self._created_at = created_at self._content = str(content) self._pub_key = pub_key self._tags = tags def serialize(self): """Serialize the event data to JSON.""" if self._pub_key is None: raise Exception("Event::serialize can't be done unless pub key is set") ret = json.dumps( [0, self._pub_key, self._created_at, self._kind, self._tags, self._content], separators=(",", ":"), ensure_ascii=False, ) return ret def _get_id(self): """Generate the event ID based on the serialized event data.""" evt_str = self.serialize() self._id = hashlib.sha256(evt_str.encode("utf-8")).hexdigest() return self._id def sign(self, event_id, priv_key): """Sign the event with the given private key.""" pk = secp256k1.PrivateKey() pk.deserialize(priv_key) id_bytes = bytes.fromhex(event_id) sig = pk.schnorr_sign(id_bytes, bip340tag="", raw=True) sig_hex = sig.hex() self._sig = sig_hex return self._sig def is_valid(self): """Verify the event signature.""" pub_key = secp256k1.PublicKey(bytes.fromhex("02" + self._pub_key), raw=True) _return = pub_key.schnorr_verify( msg=bytes.fromhex(self._id), schnorr_sig=bytes.fromhex(self._sig), bip340tag="", raw=True, ) return _return def compute_event_id(event_data): """Compute the event ID based on the event data.""" event = Event( kind=event_data["kind"], content=event_data["content"], tags=event_data["tags"], pub_key=event_data["pubkey"], created_at=event_data["created_at"], ) event_id = event._get_id() return event_id def compute_event_sig(event_id, private_key): """Compute the event signature based on the event ID and private key.""" event = Event() return event.sign(event_id, private_key) def verify_nostr_event(sig, event): """Verify the Nostr event signature.""" try: event_data = json.loads(event) public_key = event_data["pubkey"] signature = event_data["sig"] signature = sig event_id = event_data["id"] content = event_data["content"] created_at = event_data["created_at"] kind = event_data["kind"] tags = event_data["tags"] data = { "pubkey": public_key, "created_at": created_at, "kind": kind, "tags": tags, "content": content, } serialized_data = json.dumps(data, sort_keys=True, separators=(",", ":")) data_bytes = serialized_data.encode("utf-8") event = Event( id=event_id, sig=sig, pub_key=public_key, created_at=created_at, kind=kind, content=content, tags=tags, ) event_id2 = event._get_id() if event_id != event_id2: print("verify_nostr_event: Event invalid != hashlib") return False if not event.is_valid(): print("verify_nostr_event: Event invalid: verifying_key") return False print("verify_nostr_event: Event valid") return True except (KeyError, json.JSONDecodeError, ecdsa.BadSignatureError): return False def verify_password(plain_password, hashed_password): """Verify the given plain password against the hashed password.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): """Hash the given password.""" return pwd_context.hash(password) def get_user(pubkey): """Get user by public key.""" return {"user_pubkey": pubkey} def authenticate_user(sig, event): """Authenticate user by verifying the event signature.""" event_json = json.loads(event) user = event_json["pubkey"] if not user: return False if not verify_nostr_event(sig, event): return False return user def create_access_token(data: dict, expires_delta: timedelta | None = None): """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): """Get the current user from the JWT token.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except InvalidTokenError: print("credentials_exception") raise credentials_exception user = get_user(token_data.username) if user is None: raise credentials_exception print("get_current_user: returning user") return user async def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)], ): """Get the current active user, ensuring they are not banned.""" print("get_current_active_user: inside it") if current_user in banned_list: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.post("/token") async def login_for_access_token( form_data: Annotated[CustomOAuth2PasswordRequestForm, Depends()], ) -> Token: """Login endpoint to get an access token.""" user = authenticate_user(form_data.password, form_data.json_data) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": form_data.username}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @app.get("/users/me/", response_model=User) async def read_users_me( current_user: Annotated[User, Depends(get_current_active_user)], ): """Get the current user's information.""" return current_user @app.get("/users/me/items/") async def read_own_items( current_user: Annotated[User, Depends(get_current_active_user)], ): """Get the current user's items.""" return [{"item_id": "Foo", "owner": current_user}] @app.post("/get_event_to_sign", response_model=EventToSign) async def get_event_to_sign(public_key_request: PublicKeyRequest): """Generate an event to be signed by the user.""" pubkey = public_key_request.pubkey created_at = int(datetime.now(timezone.utc).timestamp()) kind = 1 tags = [] content = "Please sign this event" event_data = { "pubkey": pubkey, "created_at": created_at, "kind": kind, "tags": tags, "content": content, } serialized_data = json.dumps(event_data, sort_keys=True, separators=(",", ":")) data_bytes = serialized_data.encode("utf-8") computed_event_id = compute_event_id(event_data) event = { "id": computed_event_id, "pubkey": pubkey, "created_at": created_at, "kind": kind, "tags": tags, "content": content, "sig": "", # The signature will be added by the user } return event 
\$\endgroup\$

    0

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.