From 4351978826fcb75a6a369058c92ab97ca01b3d8c Mon Sep 17 00:00:00 2001 From: dwindown Date: Wed, 1 Apr 2026 21:13:23 +0700 Subject: [PATCH] Replace mounted fastapi-admin with plain FastAPI admin router --- app/admin_web.py | 463 +++++++++++++++++++++++++++++++++++++++++++++++ app/main.py | 26 +-- 2 files changed, 471 insertions(+), 18 deletions(-) create mode 100644 app/admin_web.py diff --git a/app/admin_web.py b/app/admin_web.py new file mode 100644 index 0000000..c6f68ac --- /dev/null +++ b/app/admin_web.py @@ -0,0 +1,463 @@ +""" +Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions. + +This replaces the previous fastapi-admin runtime path, which depended on +Tortoise-oriented internals that do not match this project. +""" + +import secrets +import uuid +from dataclasses import dataclass +from html import escape +from typing import Any + +import aioredis +from fastapi import APIRouter, Form, Request +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession +from starlette.responses import HTMLResponse, RedirectResponse +from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED + +from app.core.config import get_settings +from app.database import get_db +from app.models import Item, Session, Tryout +from app.services.irt_calibration import get_calibration_status + +settings = get_settings() +router = APIRouter(prefix="/admin", tags=["admin-web"]) + +SESSION_COOKIE = "access_token" +SESSION_PREFIX = "admin:session:" + +_admin_redis = None + + +@dataclass +class AdminPrincipal: + username: str + + +async def configure_admin_web() -> None: + global _admin_redis + + if _admin_redis is not None: + return + + if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD: + raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.") + + _admin_redis = aioredis.from_url( + settings.REDIS_URL, + encoding="utf-8", + decode_responses=True, + ) + + +async def shutdown_admin_web() -> None: + global _admin_redis + + if _admin_redis is None: + return + + try: + await _admin_redis.close() + finally: + _admin_redis = None + + +async def _current_admin(request: Request) -> AdminPrincipal | None: + if _admin_redis is None: + return None + + token = request.cookies.get(SESSION_COOKIE) + if not token: + return None + + username = await _admin_redis.get(f"{SESSION_PREFIX}{token}") + if not username: + return None + + return AdminPrincipal(username=str(username)) + + +def _login_redirect() -> RedirectResponse: + return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER) + + +def _dashboard_redirect() -> RedirectResponse: + return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER) + + +def _render_auth_page( + request: Request, + title: str, + subtitle: str, + body: str, + status_code: int = 200, +) -> HTMLResponse: + remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else "" + html = f""" + + + + + {escape(title)} + + + +
+

{escape(title)}

+

{escape(subtitle)}

+ {body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)} +
+ +""" + return HTMLResponse(html, status_code=status_code) + + +def _render_admin_page(title: str, page_title: str, body: str) -> HTMLResponse: + html = f""" + + + + + {escape(title)} + + + +
+ +
+
+

{escape(page_title)}

+ {body} +
+
+
+ +""" + return HTMLResponse(html) + + +def _table(headers: list[str], rows: list[list[Any]]) -> str: + head = "".join(f"{escape(str(header))}" for header in headers) + body_rows = [] + for row in rows: + cols = "".join(f"{escape(str(value))}" for value in row) + body_rows.append(f"{cols}") + body = "".join(body_rows) or f"No data" + return f"{head}{body}
" + + +@router.get("", include_in_schema=False) +@router.get("/", include_in_schema=False) +async def admin_root(request: Request): + admin = await _current_admin(request) + if admin: + return _dashboard_redirect() + return _login_redirect() + + +@router.get("/login", include_in_schema=False) +async def login_view(request: Request): + admin = await _current_admin(request) + if admin: + return _dashboard_redirect() + + body = """ +
+ + + + + + +
+

Direct environment-backed admin access.

+ """ + return _render_auth_page( + request, + "Admin Login", + "Use the configured admin credentials to access the dashboard.", + body, + ) + + +@router.post("/login", include_in_schema=False) +async def login_submit( + request: Request, + username: str = Form(...), + password: str = Form(...), + remember_me: str | None = Form(None), +): + if not ( + secrets.compare_digest(username, settings.ADMIN_USERNAME) + and secrets.compare_digest(password, settings.ADMIN_PASSWORD) + ): + body = f""" +
Invalid username or password.
+
+ + + + + + +
+ """ + return _render_auth_page( + request, + "Admin Login", + "Use the configured admin credentials to access the dashboard.", + body, + status_code=HTTP_401_UNAUTHORIZED, + ) + + expire = settings.ADMIN_SESSION_EXPIRE_SECONDS + response = _dashboard_redirect() + if remember_me == "on": + expire = max(expire, 3600 * 24 * 30) + response.set_cookie("remember_me", "on", expires=expire, path="/admin") + else: + response.delete_cookie("remember_me", path="/admin") + + token = uuid.uuid4().hex + response.set_cookie( + SESSION_COOKIE, + token, + expires=expire, + path="/admin", + httponly=True, + samesite="lax", + ) + await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire) + return response + + +@router.get("/logout", include_in_schema=False) +async def logout(request: Request): + token = request.cookies.get(SESSION_COOKIE) + if token and _admin_redis is not None: + await _admin_redis.delete(f"{SESSION_PREFIX}{token}") + + response = _login_redirect() + response.delete_cookie(SESSION_COOKIE, path="/admin") + response.delete_cookie("remember_me", path="/admin") + return response + + +@router.get("/password", include_in_schema=False) +async def password_view(request: Request): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + body = f""" +

Signed in as {escape(admin.username)}.

+

Password changes are disabled in the UI for this deployment.

+

Update ADMIN_PASSWORD in the server environment, then restart the app.

+

Session expiry is currently set to {settings.ADMIN_SESSION_EXPIRE_SECONDS} seconds.

+

Back to dashboard

+ """ + return _render_auth_page( + request, + "Password Management", + "Runtime password rotation is intentionally disabled.", + body, + ) + + +@router.post("/password", include_in_schema=False) +async def password_submit( + request: Request, + old_password: str = Form(...), + new_password: str = Form(...), + re_new_password: str = Form(...), +): + _ = (old_password, new_password, re_new_password) + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + body = """ +
Password rotation via UI is disabled.
+

Update ADMIN_PASSWORD in the server environment, then restart the app.

+

Back to dashboard

+ """ + return _render_auth_page( + request, + "Password Management", + "Runtime password rotation is intentionally disabled.", + body, + status_code=400, + ) + + +@router.get("/dashboard", include_in_schema=False) +async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0 + items = await db.scalar(select(func.count()).select_from(Item)) or 0 + sessions = await db.scalar(select(func.count()).select_from(Session)) or 0 + completed_sessions = ( + await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True))) + or 0 + ) + + body = f""" +

Signed in as {escape(admin.username)}.

+
+
Tryouts{tryouts}
+
Items{items}
+
Sessions{sessions}
+
Completed Sessions{completed_sessions}
+
+ """ + return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body) + + +@router.get("/calibration-status", include_in_schema=False) +async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id)) + tryouts = result.all() + + rows = [] + for tryout_id, name, website_id in tryouts: + status = await get_calibration_status(tryout_id, website_id, db) + rows.append( + [ + tryout_id, + name, + status["total_items"], + status["calibrated_items"], + f'{status["calibration_percentage"]:.2f}%', + "Yes" if status["ready_for_irt"] else "No", + ] + ) + + body = _table( + ["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"], + rows, + ) + return _render_admin_page("Calibration Status", "Calibration Status", body) + + +@router.get("/item-statistics", include_in_schema=False) +async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + result = await db.execute(select(Item.level).distinct()) + levels = result.scalars().all() + + rows = [] + for level in levels: + item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10)) + items = item_result.scalars().all() + total_responses = sum(item.calibration_sample_size or 0 for item in items) + calibrated_count = sum(1 for item in items if item.calibrated) + calibration_percentage = (calibrated_count / len(items) * 100) if items else 0 + avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0 + rows.append( + [ + level, + len(items), + calibrated_count, + f"{calibration_percentage:.2f}%", + total_responses, + f"{avg_correctness:.4f}", + ] + ) + + body = _table( + ["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"], + rows, + ) + return _render_admin_page("Item Statistics", "Item Statistics", body) + + +@router.get("/session-overview", include_in_schema=False) +async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50)) + sessions = result.scalars().all() + + rows = [ + [ + session.session_id, + session.wp_user_id, + session.tryout_id, + "Yes" if session.is_completed else "No", + session.scoring_mode_used, + session.total_benar, + session.NM, + session.NN, + session.theta, + ] + for session in sessions + ] + body = _table( + ["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"], + rows, + ) + return _render_admin_page("Session Overview", "Session Overview", body) + + +@router.get("/tryout/list", include_in_schema=False) +@router.get("/item/list", include_in_schema=False) +@router.get("/user/list", include_in_schema=False) +@router.get("/session/list", include_in_schema=False) +@router.get("/tryoutstats/list", include_in_schema=False) +async def legacy_admin_paths(request: Request): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + return _dashboard_redirect() diff --git a/app/main.py b/app/main.py index c9d70de..8569774 100644 --- a/app/main.py +++ b/app/main.py @@ -15,9 +15,12 @@ from typing import AsyncGenerator from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from starlette.responses import RedirectResponse -from starlette.status import HTTP_303_SEE_OTHER +from app.admin_web import ( + configure_admin_web, + router as admin_web_router, + shutdown_admin_web, +) from app.core.config import get_settings from app.database import close_db, init_db from app.routers import ( @@ -43,17 +46,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Startup: Initialize database await init_db() if settings.ENABLE_ADMIN: - from app.admin import configure_admin_app - - await configure_admin_app() + await configure_admin_web() yield # Shutdown: Close database connections if settings.ENABLE_ADMIN: - from app.admin import shutdown_admin_app - - await shutdown_admin_app() + await shutdown_admin_web() await close_db() @@ -177,21 +176,12 @@ app.include_router( ) if settings.ENABLE_ADMIN: - from app.admin import admin as admin_app - - @app.get("/admin", include_in_schema=False) - async def admin_entrypoint(): - # Avoid Starlette mount slash-normalization redirect, which can emit an - # absolute URL based on proxy headers such as https://127.0.0.1/admin/. - return RedirectResponse(url="/admin/", status_code=HTTP_303_SEE_OTHER) - app.include_router( ai_router, prefix=f"{settings.API_V1_STR}", ) - # Mount FastAPI Admin panel - app.mount("/admin", admin_app) + app.include_router(admin_web_router) # Include admin API router for custom actions app.include_router(