Files
yellow-bank-soal/backend/app/core/auth.py
2026-06-20 01:43:39 +07:00

171 lines
5.1 KiB
Python

"""
Token-based authentication helpers for website-scoped access control.
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import time
from dataclasses import dataclass
from typing import Optional
from fastapi import Header, HTTPException, status
from app.core.config import get_settings
settings = get_settings()
@dataclass
class AuthContext:
website_id: Optional[int]
role: str
wp_user_id: Optional[str] = None
def _b64url_encode(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")
def _b64url_decode(raw: str) -> bytes:
padding = "=" * (-len(raw) % 4)
return base64.urlsafe_b64decode((raw + padding).encode("ascii"))
def issue_access_token(
website_id: int | None,
role: str = "student",
wp_user_id: str | None = None,
expires_in_seconds: int = 3600,
) -> str:
payload = {
"website_id": int(website_id) if website_id is not None else None,
"role": role,
"wp_user_id": wp_user_id,
"exp": int(time.time()) + int(expires_in_seconds),
}
payload_bytes = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8")
payload_b64 = _b64url_encode(payload_bytes)
sig = hmac.new(settings.SECRET_KEY.encode("utf-8"), payload_b64.encode("ascii"), hashlib.sha256).digest()
return f"{payload_b64}.{_b64url_encode(sig)}"
def decode_access_token(token: str) -> AuthContext:
try:
payload_b64, sig_b64 = token.split(".", 1)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token format",
) from exc
expected_sig = hmac.new(
settings.SECRET_KEY.encode("utf-8"),
payload_b64.encode("ascii"),
hashlib.sha256,
).digest()
provided_sig = _b64url_decode(sig_b64)
if not hmac.compare_digest(provided_sig, expected_sig):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token signature",
)
try:
payload = json.loads(_b64url_decode(payload_b64).decode("utf-8"))
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token payload",
) from exc
exp = int(payload.get("exp", 0))
if exp <= int(time.time()):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token has expired",
)
website_id = payload.get("website_id")
role = payload.get("role")
if not role:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token missing required claims",
)
if website_id is None and role != "system_admin":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token missing website scope",
)
return AuthContext(
website_id=int(website_id) if website_id is not None else None,
role=str(role),
wp_user_id=payload.get("wp_user_id"),
)
def get_auth_context(
authorization: str | None = Header(None, alias="Authorization"),
x_website_id: str | None = Header(None, alias="X-Website-ID"),
) -> AuthContext:
if authorization is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization header is required",
)
parts = authorization.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Authorization header format. Use: Bearer {token}",
)
context = decode_access_token(parts[1])
# If system_admin explicitly sets a website context via header, use it
if context.role == "system_admin" and x_website_id and x_website_id.isdigit():
context.website_id = int(x_website_id)
return context
def require_website_auth(
auth: AuthContext,
allowed_roles: set[str] | None = None,
) -> Optional[int]:
"""
Check if the authenticated user has required roles.
Returns the website_id if scoped to a specific website.
Returns None if the user is a system_admin with global access and no specific website context.
"""
if allowed_roles is not None and auth.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions for this endpoint",
)
if auth.role == "system_admin":
if auth.website_id is not None:
return auth.website_id
return None
return auth.website_id
def ensure_website_scope_matches(
auth_website_id: int | None,
payload_website_id: int,
) -> None:
if auth_website_id is None:
return
if int(auth_website_id) != int(payload_website_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="website_id in payload must match authenticated website scope",
)