""" Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions. This replaces the previous fastapi-admin runtime path, which depended on Tortoise-oriented internals that do not match this project. """ import secrets import uuid from dataclasses import dataclass from html import escape import json from typing import Any import aioredis from fastapi import APIRouter, Depends, File, Form, Request, UploadFile from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import HTMLResponse, RedirectResponse from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED from app.core.config import get_settings from app.database import get_db from app.models import Item, Session, Tryout, TryoutImportSnapshot, TryoutSnapshotQuestion, Website from app.services.ai_generation import ( SUPPORTED_MODELS, generate_question, get_ai_stats, save_ai_question, validate_ai_model, ) from app.services.irt_calibration import get_calibration_status from app.services.tryout_json_import import ( TryoutImportError, import_tryout_json_snapshot, preview_tryout_json_import, ) settings = get_settings() router = APIRouter(prefix="/admin", tags=["admin-web"]) SESSION_COOKIE = "access_token" SESSION_PREFIX = "admin:session:" IMPORT_PREVIEW_PREFIX = "admin:import-preview:" IMPORT_PREVIEW_TTL_SECONDS = 900 _admin_redis = None @dataclass class AdminPrincipal: username: str async def configure_admin_web() -> None: global _admin_redis if _admin_redis is not None: return if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD: raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.") _admin_redis = aioredis.from_url( settings.REDIS_URL, encoding="utf-8", decode_responses=True, ) async def shutdown_admin_web() -> None: global _admin_redis if _admin_redis is None: return try: await _admin_redis.close() finally: _admin_redis = None async def _current_admin(request: Request) -> AdminPrincipal | None: if _admin_redis is None: return None token = request.cookies.get(SESSION_COOKIE) if not token: return None username = await _admin_redis.get(f"{SESSION_PREFIX}{token}") if not username: return None return AdminPrincipal(username=str(username)) def _login_redirect() -> RedirectResponse: return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER) def _dashboard_redirect() -> RedirectResponse: return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER) def _render_auth_page( request: Request, title: str, subtitle: str, body: str, status_code: int = 200, ) -> HTMLResponse: remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else "" html = f""" {escape(title)}

{escape(title)}

{escape(subtitle)}

{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
""" return HTMLResponse(html, status_code=status_code) def _render_admin_page(title: str, page_title: str, body: str) -> HTMLResponse: html = f""" {escape(title)}

{escape(page_title)}

{body}
""" return HTMLResponse(html) def _table(headers: list[str], rows: list[list[Any]]) -> str: head = "".join(f"{escape(str(header))}" for header in headers) body_rows = [] for row in rows: cols = "".join(f"{escape(str(value))}" for value in row) body_rows.append(f"{cols}") body = "".join(body_rows) or f"No data" return f"{head}{body}
" def _truncate(text: str | None, max_length: int = 120) -> str: if not text: return "" if len(text) <= max_length: return text return f"{text[: max_length - 3]}..." def _websites_form_body( websites: list[Website], error: str | None = None, success: str | None = None, site_name: str = "", site_url: str = "", ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" body_rows = [] for website in websites: actions = f"""
Edit
""" body_rows.append( "" f"{website.id}" f"{escape(website.site_name)}" f"{escape(website.site_url)}" f"{actions}" "" ) if body_rows: websites_table = ( "" + "".join(body_rows) + "
IDNameURLActions
" ) else: websites_table = _table(["ID", "Name", "URL", "Actions"], []) return f"""

Register websites here so imports and tryout references can be tied to a known source site.

{success_html} {error_html}

Registered Websites

Use the website ID when importing read-only tryout snapshots.

{websites_table} """ def _website_edit_form_body( website: Website, error: str | None = None, success: str | None = None, site_name: str | None = None, site_url: str | None = None, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" display_name = website.site_name if site_name is None else site_name display_url = website.site_url if site_url is None else site_url return f"""

Website ID: {website.id}

{success_html} {error_html}
Back
""" def _tryout_import_form_body( websites: list[Website], recent_snapshots: list[TryoutImportSnapshot], error: str | None = None, success: str | None = None, selected_website_id: int | None = None, preview: dict[str, Any] | None = None, preview_token: str | None = None, upload_filename: str = "", ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" website_options = [''] for website in websites: selected = "selected" if selected_website_id == website.id else "" website_options.append( f'' ) website_map = {website.id: website.site_name for website in websites} snapshot_rows = [] for snapshot in recent_snapshots: snapshot_rows.append( "" f"{snapshot.id}" f"{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})" f"{escape(snapshot.source_tryout_id)}" f"{escape(snapshot.title)}" f"{snapshot.question_count}" f"{escape(str(snapshot.created_at))}" f"Browse" "" ) snapshots_table = ( "" + ("".join(snapshot_rows) if snapshot_rows else "") + "
Snapshot IDWebsiteTryout IDTitleQuestionsImported AtActions
No data
" ) preview_html = "" if preview: totals = preview.get("totals") or {} tryout_rows = [] for tryout in preview.get("tryouts") or []: diff = tryout.get("question_diff") or {} warnings = "; ".join(tryout.get("warnings") or []) or "-" tryout_rows.append( [ tryout.get("source_tryout_id"), tryout.get("title"), diff.get("total_questions", 0), diff.get("new_questions", 0), diff.get("updated_questions", 0), diff.get("unchanged_questions", 0), diff.get("removed_questions", 0), warnings, ] ) import_form = "" if preview_token and selected_website_id: import_form = f"""
""" preview_html = f"""

Preview Summary

File: {escape(upload_filename or "uploaded JSON")}

Tryouts{preview.get("tryout_count", 0)}
New Questions{totals.get("new_questions", 0)}
Updated Questions{totals.get("updated_questions", 0)}
Removed Questions{totals.get("removed_questions", 0)}
{_table( ["Tryout ID", "Title", "Total", "New", "Updated", "Unchanged", "Removed", "Warnings"], tryout_rows, )}
{import_form}
""" return f"""

Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.

Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.

{success_html} {error_html}
{preview_html}

Recent Snapshots

These are archived imports stored in PostgreSQL for traceability.

{snapshots_table} """ def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]: slot_map: dict[str, int] = {} questions = (snapshot.raw_payload or {}).get("questions") or [] for index, question in enumerate(questions, start=1): source_question_id = str((question or {}).get("id") or "").strip() if source_question_id: slot_map[source_question_id] = index return slot_map def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]: item_options: dict[str, str] = {} for option in raw_options or []: if not isinstance(option, dict): continue increment = str(option.get("increment") or "").strip().upper() text = str(option.get("text") or option.get("label") or "").strip() if increment and text: item_options[increment] = text return item_options def _snapshot_questions_body( snapshot: TryoutImportSnapshot, questions: list[TryoutSnapshotQuestion], promoted_items_by_slot: dict[int, Item], error: str | None = None, success: str | None = None, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" slot_map = _snapshot_slot_map(snapshot) rows = [] for question in questions: slot = slot_map.get(question.source_question_id, 0) promoted_item = promoted_items_by_slot.get(slot) if promoted_item: action_html = ( f'Item #{promoted_item.id} already exists. ' f'Open in AI Playground' ) else: action_html = ( f'
' f'' f'' '' '
' ) rows.append( "" f"{slot or '-'}" f"{escape(question.source_question_id)}" f"{escape(question.correct_answer)}" f"{question.option_count}" f"{'Yes' if question.is_active else 'No'}" f"{escape(_truncate(question.question_title or question.question_html, 100))}" f"{action_html}" "" ) questions_table = ( "" + ("".join(rows) if rows else "") + "
SlotSource Question IDCorrectOptionsActiveStemAction
No data
" ) return f"""

Snapshot ID: {snapshot.id} | Website: {snapshot.website_id} | Tryout: {escape(snapshot.source_tryout_id)}

Promote selected snapshot questions into the live items table as sedang basis items for AI generation.

{success_html} {error_html} {questions_table}

Back to Tryout Import

""" async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]: result = await db.execute( select(Item) .where(Item.level == "sedang") .order_by(Item.created_at.desc(), Item.id.desc()) .limit(limit) ) return list(result.scalars().all()) async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item: result = await db.execute( select(Item) .where( Item.level == "sedang", Item.generated_by == "manual", Item.tryout_id == "demo-tryout", ) .order_by(Item.id.asc()) .limit(1) ) existing_item = result.scalar_one_or_none() if existing_item: return existing_item website_result = await db.execute( select(Website).where(Website.site_url == "https://demo.local").limit(1) ) website = website_result.scalar_one_or_none() if website is None: website = Website(site_url="https://demo.local", site_name="Demo Website") db.add(website) await db.flush() tryout_result = await db.execute( select(Tryout) .where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout") .limit(1) ) tryout = tryout_result.scalar_one_or_none() if tryout is None: tryout = Tryout( website_id=website.id, tryout_id="demo-tryout", name="Demo AI Playground Tryout", description="Seed data for the AI playground.", scoring_mode="ctt", selection_mode="fixed", normalization_mode="static", ai_generation_enabled=True, ) db.add(tryout) await db.flush() item = Item( tryout_id=tryout.tryout_id, website_id=website.id, slot=1, level="sedang", stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?", options={ "A": "Rp260.000", "B": "Rp300.000", "C": "Rp320.000", "D": "Rp360.000", }, correct_answer="B", explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.", generated_by="manual", calibrated=False, calibration_sample_size=0, ) db.add(item) await db.flush() await db.commit() await db.refresh(item) return item async def _load_websites(db: AsyncSession) -> list[Website]: result = await db.execute(select(Website).order_by(Website.id.asc())) return list(result.scalars().all()) async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImportSnapshot]: result = await db.execute( select(TryoutImportSnapshot).order_by(TryoutImportSnapshot.id.desc()).limit(limit) ) return list(result.scalars().all()) async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout: result = await db.execute( select(Tryout).where( Tryout.website_id == snapshot.website_id, Tryout.tryout_id == snapshot.source_tryout_id, ) ) tryout = result.scalar_one_or_none() if tryout: return tryout tryout = Tryout( website_id=snapshot.website_id, tryout_id=snapshot.source_tryout_id, name=snapshot.title, description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.", scoring_mode="ctt", selection_mode="fixed", normalization_mode="static", ai_generation_enabled=True, ) db.add(tryout) await db.flush() return tryout @router.get("", include_in_schema=False) @router.get("/", include_in_schema=False) async def admin_root(request: Request): admin = await _current_admin(request) if admin: return _dashboard_redirect() return _login_redirect() @router.get("/login", include_in_schema=False) async def login_view(request: Request): admin = await _current_admin(request) if admin: return _dashboard_redirect() body = """

Direct environment-backed admin access.

""" return _render_auth_page( request, "Admin Login", "Use the configured admin credentials to access the dashboard.", body, ) @router.post("/login", include_in_schema=False) async def login_submit( request: Request, username: str = Form(...), password: str = Form(...), remember_me: str | None = Form(None), ): if not ( secrets.compare_digest(username, settings.ADMIN_USERNAME) and secrets.compare_digest(password, settings.ADMIN_PASSWORD) ): body = f"""
Invalid username or password.
""" return _render_auth_page( request, "Admin Login", "Use the configured admin credentials to access the dashboard.", body, status_code=HTTP_401_UNAUTHORIZED, ) expire = settings.ADMIN_SESSION_EXPIRE_SECONDS response = _dashboard_redirect() if remember_me == "on": expire = max(expire, 3600 * 24 * 30) response.set_cookie("remember_me", "on", expires=expire, path="/admin") else: response.delete_cookie("remember_me", path="/admin") token = uuid.uuid4().hex response.set_cookie( SESSION_COOKIE, token, expires=expire, path="/admin", httponly=True, samesite="lax", ) await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire) return response @router.get("/logout", include_in_schema=False) async def logout(request: Request): token = request.cookies.get(SESSION_COOKIE) if token and _admin_redis is not None: await _admin_redis.delete(f"{SESSION_PREFIX}{token}") response = _login_redirect() response.delete_cookie(SESSION_COOKIE, path="/admin") response.delete_cookie("remember_me", path="/admin") return response @router.get("/password", include_in_schema=False) async def password_view(request: Request): admin = await _current_admin(request) if not admin: return _login_redirect() body = f"""

Signed in as {escape(admin.username)}.

Password changes are disabled in the UI for this deployment.

Update ADMIN_PASSWORD in the server environment, then restart the app.

Session expiry is currently set to {settings.ADMIN_SESSION_EXPIRE_SECONDS} seconds.

Back to dashboard

""" return _render_auth_page( request, "Password Management", "Runtime password rotation is intentionally disabled.", body, ) @router.post("/password", include_in_schema=False) async def password_submit( request: Request, old_password: str = Form(...), new_password: str = Form(...), re_new_password: str = Form(...), ): _ = (old_password, new_password, re_new_password) admin = await _current_admin(request) if not admin: return _login_redirect() body = """
Password rotation via UI is disabled.

Update ADMIN_PASSWORD in the server environment, then restart the app.

Back to dashboard

""" return _render_auth_page( request, "Password Management", "Runtime password rotation is intentionally disabled.", body, status_code=400, ) @router.get("/dashboard", include_in_schema=False) async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0 items = await db.scalar(select(func.count()).select_from(Item)) or 0 sessions = await db.scalar(select(func.count()).select_from(Session)) or 0 completed_sessions = ( await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True))) or 0 ) body = f"""

Signed in as {escape(admin.username)}.

Tryouts{tryouts}
Items{items}
Sessions{sessions}
Completed Sessions{completed_sessions}

Open AI Playground

""" return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body) @router.get("/websites", include_in_schema=False) async def websites_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites) return _render_admin_page("Websites", "Websites", body) @router.post("/websites", include_in_schema=False) async def websites_submit( request: Request, db: AsyncSession = Depends(get_db), site_name: str = Form(...), site_url: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() normalized_name = site_name.strip() normalized_url = site_url.strip().rstrip("/") if not normalized_name: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, error="Website name is required.", site_name=site_name, site_url=site_url, ) return _render_admin_page("Websites", "Websites", body) if not normalized_url.startswith(("http://", "https://")): result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, error="Website URL must start with http:// or https://.", site_name=site_name, site_url=site_url, ) return _render_admin_page("Websites", "Websites", body) website = Website(site_name=normalized_name, site_url=normalized_url) db.add(website) try: await db.commit() except IntegrityError: await db.rollback() result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, error="Website URL already exists.", site_name=site_name, site_url=site_url, ) return _render_admin_page("Websites", "Websites", body) result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, success=f"Website added successfully with ID {website.id}.", ) return _render_admin_page("Websites", "Websites", body) @router.get("/websites/{website_id}/edit", include_in_schema=False) async def website_edit_view( website_id: int, request: Request, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() website = await db.get(Website, website_id) if website is None: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites, error=f"Website not found: {website_id}") return _render_admin_page("Websites", "Websites", body) body = _website_edit_form_body(website) return _render_admin_page("Edit Website", "Edit Website", body) @router.post("/websites/{website_id}/edit", include_in_schema=False) async def website_edit_submit( website_id: int, request: Request, db: AsyncSession = Depends(get_db), site_name: str = Form(...), site_url: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() website = await db.get(Website, website_id) if website is None: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites, error=f"Website not found: {website_id}") return _render_admin_page("Websites", "Websites", body) normalized_name = site_name.strip() normalized_url = site_url.strip().rstrip("/") if not normalized_name: body = _website_edit_form_body( website, error="Website name is required.", site_name=site_name, site_url=site_url, ) return _render_admin_page("Edit Website", "Edit Website", body) if not normalized_url.startswith(("http://", "https://")): body = _website_edit_form_body( website, error="Website URL must start with http:// or https://.", site_name=site_name, site_url=site_url, ) return _render_admin_page("Edit Website", "Edit Website", body) website.site_name = normalized_name website.site_url = normalized_url try: await db.commit() except IntegrityError: await db.rollback() body = _website_edit_form_body( website, error="Website URL already exists.", site_name=site_name, site_url=site_url, ) return _render_admin_page("Edit Website", "Edit Website", body) await db.refresh(website) body = _website_edit_form_body( website, success=f"Website #{website.id} updated successfully.", ) return _render_admin_page("Edit Website", "Edit Website", body) @router.post("/websites/{website_id}/delete", include_in_schema=False) async def website_delete_submit( website_id: int, request: Request, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() website = await db.get(Website, website_id) if website is None: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites, error=f"Website not found: {website_id}") return _render_admin_page("Websites", "Websites", body) deleted_label = f"{website.site_name} ({website.site_url})" await db.delete(website) await db.commit() result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, success=f"Website deleted successfully: {deleted_label}", ) return _render_admin_page("Websites", "Websites", body) @router.get("/tryout-import", include_in_schema=False) async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body(websites, snapshots) return _render_admin_page("Tryout Import", "Tryout Import", body) @router.post("/tryout-import/preview", include_in_schema=False) async def tryout_import_preview( request: Request, db: AsyncSession = Depends(get_db), website_id: int = Form(...), file: UploadFile = File(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() websites = await _load_websites(db) snapshots = await _recent_snapshots(db) if not file.filename or not file.filename.lower().endswith(".json"): body = _tryout_import_form_body( websites, snapshots, error="File must be .json format.", selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) try: payload_bytes = await file.read() payload_text = payload_bytes.decode("utf-8") payload = json.loads(payload_text) except UnicodeDecodeError: body = _tryout_import_form_body( websites, snapshots, error="File must be UTF-8 encoded JSON.", selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) except json.JSONDecodeError as exc: body = _tryout_import_form_body( websites, snapshots, error=f"Invalid JSON file: {exc}", selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) try: preview = await preview_tryout_json_import(payload, website_id, db) except TryoutImportError as exc: body = _tryout_import_form_body( websites, snapshots, error=str(exc), selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) preview_token = uuid.uuid4().hex await _admin_redis.set( f"{IMPORT_PREVIEW_PREFIX}{preview_token}", payload_text, ex=IMPORT_PREVIEW_TTL_SECONDS, ) body = _tryout_import_form_body( websites, snapshots, selected_website_id=website_id, preview=preview, preview_token=preview_token, upload_filename=file.filename or "", ) return _render_admin_page("Tryout Import", "Tryout Import", body) @router.post("/tryout-import", include_in_schema=False) async def tryout_import_submit( request: Request, db: AsyncSession = Depends(get_db), website_id: int = Form(...), preview_token: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() websites = await _load_websites(db) snapshots = await _recent_snapshots(db) payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}") if not payload_text: body = _tryout_import_form_body( websites, snapshots, error="Preview token expired. Upload the JSON again and preview before importing.", selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) try: payload = json.loads(payload_text) result = await import_tryout_json_snapshot(payload, website_id, db) await db.commit() except TryoutImportError as exc: await db.rollback() body = _tryout_import_form_body( websites, snapshots, error=str(exc), selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) except Exception: await db.rollback() raise finally: await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}") updated_snapshots = await _recent_snapshots(db) imported_tryouts = result.get("imported_tryouts") or [] imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts) body = _tryout_import_form_body( websites, updated_snapshots, success=( f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)." ), selected_website_id=website_id, ) return _render_admin_page("Tryout Import", "Tryout Import", body) @router.get("/snapshot-questions", include_in_schema=False) async def snapshot_questions_view( request: Request, snapshot_id: int, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() snapshot = await db.get(TryoutImportSnapshot, snapshot_id) if snapshot is None: websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body( websites, snapshots, error=f"Snapshot not found: {snapshot_id}", ) return _render_admin_page("Tryout Import", "Tryout Import", body) result = await db.execute( select(TryoutSnapshotQuestion) .where( TryoutSnapshotQuestion.website_id == snapshot.website_id, TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, ) .order_by(TryoutSnapshotQuestion.source_question_id.asc()) ) questions = list(result.scalars().all()) slot_map = _snapshot_slot_map(snapshot) item_result = await db.execute( select(Item).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.level == "sedang", ) ) promoted_items = list(item_result.scalars().all()) promoted_items_by_slot = {item.slot: item for item in promoted_items} questions.sort(key=lambda question: (slot_map.get(question.source_question_id, 10**9), question.source_question_id)) body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot) return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) @router.post("/snapshot-questions/promote", include_in_schema=False) async def snapshot_question_promote( request: Request, snapshot_id: int = Form(...), snapshot_question_id: int = Form(...), db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() snapshot = await db.get(TryoutImportSnapshot, snapshot_id) question = await db.get(TryoutSnapshotQuestion, snapshot_question_id) if snapshot is None or question is None: websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body( websites, snapshots, error="Snapshot or snapshot question not found.", ) return _render_admin_page("Tryout Import", "Tryout Import", body) if ( question.website_id != snapshot.website_id or question.source_tryout_id != snapshot.source_tryout_id ): body = _snapshot_questions_body(snapshot, [], {}, error="Snapshot question does not belong to the selected snapshot.") return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) slot_map = _snapshot_slot_map(snapshot) slot = slot_map.get(question.source_question_id) if not slot: max_slot = ( await db.scalar( select(func.max(Item.slot)).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.level == "sedang", ) ) or 0 ) slot = max_slot + 1 options = _snapshot_options_to_item_options(question.raw_options) if not options: item_result = await db.execute( select(Item).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.level == "sedang", ) ) promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} question_result = await db.execute( select(TryoutSnapshotQuestion) .where( TryoutSnapshotQuestion.website_id == snapshot.website_id, TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, ) .order_by(TryoutSnapshotQuestion.source_question_id.asc()) ) questions = list(question_result.scalars().all()) questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) body = _snapshot_questions_body( snapshot, questions, promoted_items_by_slot, error="Snapshot question has no usable option text, so it cannot be promoted into the live item bank.", ) return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) await _ensure_operational_tryout(snapshot, db) existing_item_result = await db.execute( select(Item).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.slot == slot, Item.level == "sedang", ) ) existing_item = existing_item_result.scalar_one_or_none() if existing_item is None: existing_item = Item( tryout_id=snapshot.source_tryout_id, website_id=snapshot.website_id, slot=slot, level="sedang", stem=question.question_html, options=options, correct_answer=question.correct_answer, explanation=question.explanation_html, generated_by="manual", calibrated=False, calibration_sample_size=0, ) db.add(existing_item) await db.commit() await db.refresh(existing_item) success_message = ( f"Snapshot question {question.source_question_id} promoted successfully as Item #{existing_item.id}. " f"Use that Basis Item ID in AI Playground." ) else: success_message = ( f"Snapshot question {question.source_question_id} is already available as Item #{existing_item.id}. " f"Use that Basis Item ID in AI Playground." ) question_result = await db.execute( select(TryoutSnapshotQuestion) .where( TryoutSnapshotQuestion.website_id == snapshot.website_id, TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, ) .order_by(TryoutSnapshotQuestion.source_question_id.asc()) ) questions = list(question_result.scalars().all()) item_result = await db.execute( select(Item).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.level == "sedang", ) ) promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message) return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) @router.get("/calibration-status", include_in_schema=False) async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id)) tryouts = result.all() rows = [] for tryout_id, name, website_id in tryouts: status = await get_calibration_status(tryout_id, website_id, db) rows.append( [ tryout_id, name, status["total_items"], status["calibrated_items"], f'{status["calibration_percentage"]:.2f}%', "Yes" if status["ready_for_irt"] else "No", ] ) body = _table( ["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"], rows, ) return _render_admin_page("Calibration Status", "Calibration Status", body) @router.get("/item-statistics", include_in_schema=False) async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Item.level).distinct()) levels = result.scalars().all() rows = [] for level in levels: item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10)) items = item_result.scalars().all() total_responses = sum(item.calibration_sample_size or 0 for item in items) calibrated_count = sum(1 for item in items if item.calibrated) calibration_percentage = (calibrated_count / len(items) * 100) if items else 0 avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0 rows.append( [ level, len(items), calibrated_count, f"{calibration_percentage:.2f}%", total_responses, f"{avg_correctness:.4f}", ] ) body = _table( ["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"], rows, ) return _render_admin_page("Item Statistics", "Item Statistics", body) @router.get("/session-overview", include_in_schema=False) async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50)) sessions = result.scalars().all() rows = [ [ session.session_id, session.wp_user_id, session.tryout_id, "Yes" if session.is_completed else "No", session.scoring_mode_used, session.total_benar, session.NM, session.NN, session.theta, ] for session in sessions ] body = _table( ["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"], rows, ) return _render_admin_page("Session Overview", "Session Overview", body) def _ai_form_body( key_configured: bool, stats: dict[str, Any], error: str | None = None, success: str | None = None, result: dict[str, Any] | None = None, basis_items: list[Item] | None = None, basis_item_id: str = "", target_level: str = "mudah", ai_model: str = settings.OPENROUTER_MODEL_QWEN, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" options_html = "".join( f'' for model, label in SUPPORTED_MODELS.items() ) basis_items = basis_items or [] basis_rows = [ [ item.id, item.tryout_id, item.slot, item.website_id, _truncate(item.stem, 90), ] for item in basis_items ] basis_table = _table( ["Item ID", "Tryout", "Slot", "Website", "Stem"], basis_rows, ) seed_callout = "" if not basis_items: seed_callout = """
No sedang basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
""" result_html = "" if result: options = result.get("options") or {} save_html = "" if result.get("basis_item_id") and not result.get("existing_item_id"): save_html = f"""
""" elif result.get("existing_item_id"): save_html = f"""
Slot {escape(str(result.get("slot", "")))} already has a {escape(str(result.get("target_level", "")))} item for this tryout. Existing item ID: {escape(str(result.get("existing_item_id")))}.
""" result_html = f"""

Preview Result

Model: {escape(result.get("ai_model", ""))}

Basis Item: #{escape(str(result.get("basis_item_id", "")))} | Tryout: {escape(result.get("tryout_id", ""))} | Slot: {escape(str(result.get("slot", "")))}

Stem:
{escape(result.get("stem", ""))}

Options:

{_table(["Key", "Text"], [[key, value] for key, value in options.items()])}

Correct: {escape(result.get("correct", ""))}

Explanation:
{escape(result.get("explanation", ""))}

{save_html}
""" return f"""

OpenRouter key configured: {"Yes" if key_configured else "No"}

Total AI-generated items: {stats.get("total_ai_items", 0)}

{success_html} {error_html} {seed_callout}

Available Sedang Basis Items

The generator needs a sedang item. Use one of these IDs, or seed demo data if the table is empty.

{basis_table} {result_html} """ @router.get("/ai-playground", include_in_schema=False) async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) basis_item_id = request.query_params.get("basis_item_id", "") body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, basis_items=basis_items, basis_item_id=str(basis_item_id or ""), ) return _render_admin_page("AI Playground", "AI Playground", body) @router.post("/ai-playground/seed-demo", include_in_schema=False) async def ai_playground_seed_demo(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() demo_item = await _find_or_create_demo_basis_item(db) stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, success=f"Demo basis item is ready: item #{demo_item.id}, tryout {demo_item.tryout_id}, slot {demo_item.slot}.", basis_items=basis_items, basis_item_id=str(demo_item.id), ) return _render_admin_page("AI Playground", "AI Playground", body) @router.post("/ai-playground", include_in_schema=False) async def ai_playground_submit( request: Request, db: AsyncSession = Depends(get_db), basis_item_id: int = Form(...), target_level: str = Form(...), ai_model: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) if not settings.OPENROUTER_API_KEY: body = _ai_form_body( False, stats, error="OPENROUTER_API_KEY is not configured in the environment.", basis_items=basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) if target_level not in {"mudah", "sulit"}: body = _ai_form_body( True, stats, error="Target level must be mudah or sulit.", basis_items=basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) if not validate_ai_model(ai_model): body = _ai_form_body( True, stats, error="Unsupported AI model.", basis_items=basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) result = await db.execute(select(Item).where(Item.id == basis_item_id)) basis_item = result.scalar_one_or_none() if not basis_item: body = _ai_form_body( True, stats, error=f"Basis item not found: {basis_item_id}", basis_items=basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) if basis_item.level != "sedang": body = _ai_form_body( True, stats, error=f"Basis item must be sedang level, got: {basis_item.level}", basis_items=basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) generated = await generate_question( basis_item=basis_item, target_level=target_level, ai_model=ai_model, ) if not generated: body = _ai_form_body( True, stats, error="AI generation failed. Check OPENROUTER_API_KEY, model availability, and server logs.", basis_items=basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) existing_item_result = await db.execute( select(Item.id).where( Item.tryout_id == basis_item.tryout_id, Item.website_id == basis_item.website_id, Item.slot == basis_item.slot, Item.level == target_level, ) ) existing_item_id = existing_item_result.scalar_one_or_none() body = _ai_form_body( True, stats, basis_items=basis_items, result={ "basis_item_id": basis_item.id, "tryout_id": basis_item.tryout_id, "website_id": basis_item.website_id, "slot": basis_item.slot, "target_level": target_level, "stem": generated.stem, "options": generated.options, "correct": generated.correct, "explanation": generated.explanation or "", "ai_model": ai_model, "existing_item_id": existing_item_id, }, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) @router.post("/ai-playground/save", include_in_schema=False) async def ai_playground_save( request: Request, db: AsyncSession = Depends(get_db), basis_item_id: int = Form(...), tryout_id: str = Form(...), website_id: int = Form(...), slot: int = Form(...), target_level: str = Form(...), ai_model: str = Form(...), stem: str = Form(...), options_json: str = Form(...), correct: str = Form(...), explanation: str = Form(""), ): admin = await _current_admin(request) if not admin: return _login_redirect() stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) if target_level not in {"mudah", "sulit"}: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Only mudah or sulit generated items can be saved from the playground.", basis_items=basis_items, ) return _render_admin_page("AI Playground", "AI Playground", body) try: options = json.loads(options_json) except json.JSONDecodeError: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Generated options payload is invalid.", basis_items=basis_items, ) return _render_admin_page("AI Playground", "AI Playground", body) existing_result = await db.execute( select(Item.id).where( Item.tryout_id == tryout_id, Item.website_id == website_id, Item.slot == slot, Item.level == target_level, ) ) existing_item_id = existing_result.scalar_one_or_none() if existing_item_id: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error=f"Item already exists at tryout={tryout_id}, slot={slot}, level={target_level} (item #{existing_item_id}).", basis_items=basis_items, ) return _render_admin_page("AI Playground", "AI Playground", body) from app.schemas.ai import GeneratedQuestion item_id = await save_ai_question( generated_data=GeneratedQuestion( stem=stem, options=options, correct=correct, explanation=explanation or None, ), tryout_id=tryout_id, website_id=website_id, basis_item_id=basis_item_id, slot=slot, level=target_level, ai_model=ai_model, db=db, ) if not item_id: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Failed to save generated item. Check server logs for the database error.", basis_items=basis_items, ) return _render_admin_page("AI Playground", "AI Playground", body) await db.commit() updated_stats = await get_ai_stats(db) updated_basis_items = await _basis_items_for_playground(db) body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), updated_stats, success=f"Generated item saved successfully as item #{item_id}.", basis_items=updated_basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page("AI Playground", "AI Playground", body) @router.get("/tryout/list", include_in_schema=False) @router.get("/item/list", include_in_schema=False) @router.get("/user/list", include_in_schema=False) @router.get("/session/list", include_in_schema=False) @router.get("/tryoutstats/list", include_in_schema=False) async def legacy_admin_paths(request: Request): admin = await _current_admin(request) if not admin: return _login_redirect() return _dashboard_redirect()