Complete Section 1 security/auth hardening

This commit is contained in:
dwindown
2026-04-30 11:35:56 +07:00
parent 432ffbcdb9
commit 12d2d9458f
15 changed files with 863 additions and 232 deletions

144
app/core/auth.py Normal file
View File

@@ -0,0 +1,144 @@
"""
Token-based authentication helpers for website-scoped access control.
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import time
from dataclasses import dataclass
from typing import Optional
from fastapi import Header, HTTPException, status
from app.core.config import get_settings
settings = get_settings()
@dataclass
class AuthContext:
website_id: int
role: str
wp_user_id: Optional[str] = None
def _b64url_encode(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")
def _b64url_decode(raw: str) -> bytes:
padding = "=" * (-len(raw) % 4)
return base64.urlsafe_b64decode((raw + padding).encode("ascii"))
def issue_access_token(
website_id: int,
role: str = "student",
wp_user_id: str | None = None,
expires_in_seconds: int = 3600,
) -> str:
payload = {
"website_id": int(website_id),
"role": role,
"wp_user_id": wp_user_id,
"exp": int(time.time()) + int(expires_in_seconds),
}
payload_bytes = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8")
payload_b64 = _b64url_encode(payload_bytes)
sig = hmac.new(settings.SECRET_KEY.encode("utf-8"), payload_b64.encode("ascii"), hashlib.sha256).digest()
return f"{payload_b64}.{_b64url_encode(sig)}"
def decode_access_token(token: str) -> AuthContext:
try:
payload_b64, sig_b64 = token.split(".", 1)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token format",
) from exc
expected_sig = hmac.new(
settings.SECRET_KEY.encode("utf-8"),
payload_b64.encode("ascii"),
hashlib.sha256,
).digest()
provided_sig = _b64url_decode(sig_b64)
if not hmac.compare_digest(provided_sig, expected_sig):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token signature",
)
try:
payload = json.loads(_b64url_decode(payload_b64).decode("utf-8"))
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token payload",
) from exc
exp = int(payload.get("exp", 0))
if exp <= int(time.time()):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token has expired",
)
website_id = payload.get("website_id")
role = payload.get("role")
if website_id is None or not role:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token missing required claims",
)
return AuthContext(
website_id=int(website_id),
role=str(role),
wp_user_id=payload.get("wp_user_id"),
)
def get_auth_context(
authorization: str | None = Header(None, alias="Authorization"),
) -> AuthContext:
if authorization is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization header is required",
)
parts = authorization.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Authorization header format. Use: Bearer {token}",
)
return decode_access_token(parts[1])
def require_website_auth(
auth: AuthContext,
allowed_roles: set[str] | None = None,
) -> int:
if allowed_roles is not None and auth.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions for this endpoint",
)
return auth.website_id
def ensure_website_scope_matches(
auth_website_id: int,
payload_website_id: int,
) -> None:
if int(auth_website_id) != int(payload_website_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="website_id in payload must match authenticated website scope",
)