fix: harden admin access, repair ORM joins, and add migration/tests

This commit is contained in:
dwindown
2026-04-01 14:59:54 +07:00
parent de592d140e
commit 16ab13e911
21 changed files with 1275 additions and 368 deletions

View File

@@ -5,18 +5,29 @@ Provides admin panel for managing tryouts, items, sessions, users, and tryout st
Includes custom actions for calibration, AI generation toggle, and normalization reset.
"""
import secrets
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional
from fastapi import Request
import aioredis
from fastapi import Depends, Form, HTTPException, Request
from fastapi_admin import constants
from fastapi_admin.app import app as admin_app
from fastapi_admin.depends import get_current_admin, get_resources
from fastapi_admin.providers import Provider
from fastapi_admin.resources import (
Field,
Link,
Model,
)
from fastapi_admin.template import templates
from fastapi_admin.widgets import displays, inputs
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import RedirectResponse
from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED
from app.core.config import get_settings
from app.database import get_db
@@ -29,77 +40,175 @@ settings = get_settings()
# Authentication Provider
# =============================================================================
class AdminAuthProvider:
"""
Authentication provider for FastAPI Admin.
@dataclass
class AdminPrincipal:
"""Minimal admin user object expected by fastapi-admin templates."""
Supports two modes:
1. WordPress JWT token integration (production)
2. Basic auth for testing (development)
pk: str
username: str
avatar: str = ""
class EnvCredentialProvider(Provider):
"""
FastAPI-Admin provider backed by env credentials and Redis session tokens.
Compatible with fastapi-admin 1.0.x provider API without requiring
Tortoise admin models.
"""
async def login(
name = "env_credential_provider"
access_token = "access_token"
def __init__(
self,
username: str,
password: str,
) -> Optional[str]:
"""
Authenticate user and return token.
login_path: str = "/login",
logout_path: str = "/logout",
login_title: str = "Admin Login",
login_logo_url: str | None = None,
expire_seconds: int = 3600,
template: str = "providers/login/login.html",
) -> None:
self.username = username
self.password = password
self.login_path = login_path
self.logout_path = logout_path
self.login_title = login_title
self.login_logo_url = login_logo_url
self.expire_seconds = expire_seconds
self.template = template
Args:
username: Username
password: Password
async def register(self, app: "FastAPIAdmin") -> None:
await super().register(app)
app.get(self.login_path)(self.login_view)
app.post(self.login_path)(self.login)
app.get(self.logout_path)(self.logout)
app.get("/password")(self.password_view)
app.post("/password")(self.password)
app.add_middleware(BaseHTTPMiddleware, dispatch=self.authenticate)
Returns:
Access token if authentication successful, None otherwise
"""
# Development mode: basic auth
if settings.ENVIRONMENT == "development":
# Allow admin/admin or admin/password for testing
if (username == "admin" and password in ["admin", "password"]):
return f"dev_token_{username}"
async def login_view(self, request: Request):
return templates.TemplateResponse(
self.template,
context={
"request": request,
"login_logo_url": self.login_logo_url,
"login_title": self.login_title,
},
)
# Production mode: WordPress JWT token validation
# For now, return None - implement WordPress integration when needed
return None
async def login(
self,
request: Request,
username: str = Form(...),
password: str = Form(...),
remember_me: Optional[str] = Form(None),
):
if not (
secrets.compare_digest(username, self.username)
and secrets.compare_digest(password, self.password)
):
return templates.TemplateResponse(
self.template,
status_code=HTTP_401_UNAUTHORIZED,
context={
"request": request,
"error": "Invalid username or password",
"login_logo_url": self.login_logo_url,
"login_title": self.login_title,
},
)
async def logout(self, request: Request) -> bool:
"""
Logout user.
response = RedirectResponse(url=request.app.admin_path, status_code=HTTP_303_SEE_OTHER)
expire = self.expire_seconds
if remember_me == "on":
expire = max(self.expire_seconds, 3600 * 24 * 30)
response.set_cookie("remember_me", "on")
else:
response.delete_cookie("remember_me")
Args:
request: FastAPI request
token = uuid.uuid4().hex
response.set_cookie(
self.access_token,
token,
expires=expire,
path=request.app.admin_path,
httponly=True,
)
await request.app.redis.set(constants.LOGIN_USER.format(token=token), self.username, ex=expire)
return response
Returns:
True if logout successful
"""
return True
async def authenticate(self, request: Request, call_next: RequestResponseEndpoint):
token = request.cookies.get(self.access_token)
path = request.scope["path"]
admin = None
async def get_current_user(self, request: Request) -> Optional[dict]:
"""
Get current authenticated user.
if token:
key = constants.LOGIN_USER.format(token=token)
username = await request.app.redis.get(key)
if username:
admin = AdminPrincipal(pk=str(username), username=str(username))
Args:
request: FastAPI request
request.state.admin = admin
Returns:
User data if authenticated, None otherwise
"""
token = request.cookies.get("admin_token") or request.headers.get("Authorization")
if path.endswith(self.login_path) and admin:
return RedirectResponse(url=request.app.admin_path, status_code=HTTP_303_SEE_OTHER)
if not token:
return None
return await call_next(request)
# Development mode: validate dev token
if settings.ENVIRONMENT == "development" and token.startswith("dev_token_"):
username = token.replace("dev_token_", "")
return {
"id": 1,
"username": username,
"is_superuser": True,
}
async def logout(self, request: Request):
response = RedirectResponse(
url=request.app.admin_path + self.login_path,
status_code=HTTP_303_SEE_OTHER,
)
token = request.cookies.get(self.access_token)
if token:
await request.app.redis.delete(constants.LOGIN_USER.format(token=token))
response.delete_cookie(self.access_token, path=request.app.admin_path)
return response
return None
async def password_view(self, request: Request, resources=Depends(get_resources)):
return templates.TemplateResponse(
"providers/login/password.html",
context={"request": request, "resources": resources},
)
async def password(
self,
request: Request,
old_password: str = Form(...),
new_password: str = Form(...),
re_new_password: str = Form(...),
admin: AdminPrincipal = Depends(get_current_admin),
resources=Depends(get_resources),
):
_ = admin
if not secrets.compare_digest(old_password, self.password):
return templates.TemplateResponse(
"providers/login/password.html",
context={
"request": request,
"resources": resources,
"error": "Old password is incorrect",
},
)
if new_password != re_new_password:
return templates.TemplateResponse(
"providers/login/password.html",
context={
"request": request,
"resources": resources,
"error": "New passwords do not match",
},
)
# Password is env-configured and immutable at runtime.
raise HTTPException(
status_code=400,
detail="Password rotation via UI is disabled. Update ADMIN_PASSWORD in environment.",
)
# =============================================================================
@@ -604,7 +713,8 @@ def create_admin_app() -> Any:
# admin_app.settings.site_description = "Admin Panel for Adaptive Question Bank System"
# Register authentication provider
# admin_app.settings.auth_provider = AdminAuthProvider()
# NOTE: fastapi-admin 1.0.4 requires provider registration via app.configure(...).
# Keep provider implementation here for future integration during startup configure.
# Register model resources
admin_app.register(TryoutResource)
@@ -621,5 +731,55 @@ def create_admin_app() -> Any:
return admin_app
_admin_configured = False
_admin_redis = None
async def configure_admin_app() -> None:
"""Configure fastapi-admin runtime (redis + auth provider)."""
global _admin_configured, _admin_redis
if _admin_configured:
return
if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD:
raise RuntimeError(
"ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set."
)
_admin_redis = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
provider = EnvCredentialProvider(
username=settings.ADMIN_USERNAME,
password=settings.ADMIN_PASSWORD,
login_title="IRT Bank Soal Admin",
expire_seconds=settings.ADMIN_SESSION_EXPIRE_SECONDS,
)
await admin_app.configure(
redis=_admin_redis,
admin_path="/admin",
providers=[provider],
)
_admin_configured = True
async def shutdown_admin_app() -> None:
"""Close admin redis client cleanly."""
global _admin_redis
if _admin_redis is None:
return
try:
await _admin_redis.close()
finally:
_admin_redis = None
# Export admin app for mounting in main.py
admin = create_admin_app()