Files
yellow-bank-soal/app/admin_web.py
2026-04-04 06:32:21 +07:00

1769 lines
66 KiB
Python

"""
Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions.
This replaces the previous fastapi-admin runtime path, which depended on
Tortoise-oriented internals that do not match this project.
"""
import secrets
import uuid
from dataclasses import dataclass
from html import escape
import json
from typing import Any
import aioredis
from fastapi import APIRouter, Depends, File, Form, Request, UploadFile
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import HTMLResponse, RedirectResponse
from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED
from app.core.config import get_settings
from app.database import get_db
from app.models import Item, Session, Tryout, TryoutImportSnapshot, TryoutSnapshotQuestion, Website
from app.services.ai_generation import (
SUPPORTED_MODELS,
generate_question,
get_ai_stats,
save_ai_question,
validate_ai_model,
)
from app.services.irt_calibration import get_calibration_status
from app.services.tryout_json_import import (
TryoutImportError,
import_tryout_json_snapshot,
preview_tryout_json_import,
)
settings = get_settings()
router = APIRouter(prefix="/admin", tags=["admin-web"])
SESSION_COOKIE = "access_token"
SESSION_PREFIX = "admin:session:"
IMPORT_PREVIEW_PREFIX = "admin:import-preview:"
IMPORT_PREVIEW_TTL_SECONDS = 900
_admin_redis = None
@dataclass
class AdminPrincipal:
username: str
async def configure_admin_web() -> None:
global _admin_redis
if _admin_redis is not None:
return
if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD:
raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.")
_admin_redis = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
async def shutdown_admin_web() -> None:
global _admin_redis
if _admin_redis is None:
return
try:
await _admin_redis.close()
finally:
_admin_redis = None
async def _current_admin(request: Request) -> AdminPrincipal | None:
if _admin_redis is None:
return None
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
username = await _admin_redis.get(f"{SESSION_PREFIX}{token}")
if not username:
return None
return AdminPrincipal(username=str(username))
def _login_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER)
def _dashboard_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER)
def _render_auth_page(
request: Request,
title: str,
subtitle: str,
body: str,
status_code: int = 200,
) -> HTMLResponse:
remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else ""
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)}</title>
<style>
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: linear-gradient(135deg, #f8fafc, #e2e8f0); font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; color: #0f172a; }}
.panel {{ width: min(420px, calc(100vw - 32px)); background: rgba(255,255,255,0.96); border-radius: 18px; box-shadow: 0 18px 60px rgba(15, 23, 42, 0.14); padding: 28px; }}
h1 {{ margin: 0 0 8px; font-size: 28px; }}
p {{ margin: 0 0 20px; color: #475569; }}
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
input {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
.row {{ display: flex; align-items: center; gap: 10px; margin-top: 16px; color: #334155; font-size: 14px; }}
.row input {{ width: auto; }}
button {{ width: 100%; margin-top: 18px; border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
.muted {{ color: #64748b; font-size: 13px; margin-top: 14px; }}
a {{ color: #0f172a; }}
</style>
</head>
<body>
<main class="panel">
<h1>{escape(title)}</h1>
<p>{escape(subtitle)}</p>
{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
</main>
</body>
</html>"""
return HTMLResponse(html, status_code=status_code)
def _render_admin_page(title: str, page_title: str, body: str) -> HTMLResponse:
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)}</title>
<style>
body {{ margin: 0; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; background: #f5f7fb; color: #162033; }}
.layout {{ display: grid; grid-template-columns: 240px 1fr; min-height: 100vh; }}
.sidebar {{ background: #0f172a; color: #e2e8f0; padding: 24px 16px; }}
.sidebar h1 {{ font-size: 18px; margin: 0 0 24px; }}
.sidebar a {{ display: block; color: #cbd5e1; text-decoration: none; padding: 10px 12px; border-radius: 8px; margin-bottom: 8px; }}
.sidebar a:hover {{ background: #1e293b; color: #fff; }}
.content {{ padding: 32px; }}
.card {{ background: #fff; border-radius: 14px; padding: 24px; box-shadow: 0 8px 30px rgba(15, 23, 42, 0.08); }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-top: 20px; }}
.stat {{ padding: 18px; border: 1px solid #e2e8f0; border-radius: 12px; background: #f8fafc; }}
.stat strong {{ display: block; font-size: 26px; margin-top: 6px; }}
table {{ width: 100%; border-collapse: collapse; margin-top: 16px; background: #fff; border-radius: 12px; overflow: hidden; }}
th, td {{ padding: 12px 14px; border-bottom: 1px solid #e5e7eb; text-align: left; vertical-align: top; font-size: 14px; }}
th {{ background: #f8fafc; font-weight: 600; }}
input, select, textarea {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
button {{ border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
.actions {{ display: flex; gap: 12px; flex-wrap: wrap; margin-top: 18px; }}
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
.success {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #ecfdf5; color: #166534; border: 1px solid #86efac; }}
.muted {{ color: #64748b; font-size: 14px; }}
</style>
</head>
<body>
<div class="layout">
<aside class="sidebar">
<h1>IRT Bank Soal Admin</h1>
<a href="/admin/dashboard">Dashboard</a>
<a href="/admin/websites">Websites</a>
<a href="/admin/tryout-import">Tryout Import</a>
<a href="/admin/calibration-status">Calibration Status</a>
<a href="/admin/item-statistics">Item Statistics</a>
<a href="/admin/session-overview">Session Overview</a>
<a href="/admin/ai-playground">AI Playground</a>
<a href="/admin/password">Password Info</a>
<a href="/admin/logout">Logout</a>
</aside>
<main class="content">
<div class="card">
<h2>{escape(page_title)}</h2>
{body}
</div>
</main>
</div>
</body>
</html>"""
return HTMLResponse(html)
def _table(headers: list[str], rows: list[list[Any]]) -> str:
head = "".join(f"<th>{escape(str(header))}</th>" for header in headers)
body_rows = []
for row in rows:
cols = "".join(f"<td>{escape(str(value))}</td>" for value in row)
body_rows.append(f"<tr>{cols}</tr>")
body = "".join(body_rows) or f"<tr><td colspan=\"{len(headers)}\">No data</td></tr>"
return f"<table><thead><tr>{head}</tr></thead><tbody>{body}</tbody></table>"
def _truncate(text: str | None, max_length: int = 120) -> str:
if not text:
return ""
if len(text) <= max_length:
return text
return f"{text[: max_length - 3]}..."
def _websites_form_body(
websites: list[Website],
error: str | None = None,
success: str | None = None,
site_name: str = "",
site_url: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
body_rows = []
for website in websites:
actions = f"""
<div class="actions" style="margin-top:0">
<a href="/admin/websites/{website.id}/edit" style="display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;">Edit</a>
<form method="post" action="/admin/websites/{website.id}/delete" onsubmit="return confirm('Delete website {escape(website.site_name)} and all related tryouts, items, sessions, and snapshots?');" style="margin:0">
<button type="submit" style="background:#991b1b;">Delete</button>
</form>
</div>
"""
body_rows.append(
"<tr>"
f"<td>{website.id}</td>"
f"<td>{escape(website.site_name)}</td>"
f"<td>{escape(website.site_url)}</td>"
f"<td>{actions}</td>"
"</tr>"
)
if body_rows:
websites_table = (
"<table><thead><tr><th>ID</th><th>Name</th><th>URL</th><th>Actions</th></tr></thead><tbody>"
+ "".join(body_rows)
+ "</tbody></table>"
)
else:
websites_table = _table(["ID", "Name", "URL", "Actions"], [])
return f"""
<p class="muted">Register websites here so imports and tryout references can be tied to a known source site.</p>
{success_html}
{error_html}
<form method="post" action="/admin/websites" autocomplete="off">
<label for="site_name">Website Name</label>
<input id="site_name" name="site_name" type="text" value="{escape(site_name)}" placeholder="Sejoli Demo Site">
<label for="site_url">Website URL</label>
<input id="site_url" name="site_url" type="url" value="{escape(site_url)}" placeholder="https://example.com">
<button type="submit">Add Website</button>
</form>
<h3 style="margin-top:24px">Registered Websites</h3>
<p class="muted">Use the website ID when importing read-only tryout snapshots.</p>
{websites_table}
"""
def _website_edit_form_body(
website: Website,
error: str | None = None,
success: str | None = None,
site_name: str | None = None,
site_url: str | None = None,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
display_name = website.site_name if site_name is None else site_name
display_url = website.site_url if site_url is None else site_url
return f"""
<p class="muted">Website ID: <strong>{website.id}</strong></p>
{success_html}
{error_html}
<form method="post" action="/admin/websites/{website.id}/edit" autocomplete="off">
<label for="site_name">Website Name</label>
<input id="site_name" name="site_name" type="text" value="{escape(display_name)}">
<label for="site_url">Website URL</label>
<input id="site_url" name="site_url" type="url" value="{escape(display_url)}">
<div class="actions">
<button type="submit">Save Changes</button>
<a href="/admin/websites" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Back</a>
</div>
</form>
"""
def _tryout_import_form_body(
websites: list[Website],
recent_snapshots: list[TryoutImportSnapshot],
error: str | None = None,
success: str | None = None,
selected_website_id: int | None = None,
preview: dict[str, Any] | None = None,
preview_token: str | None = None,
upload_filename: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
website_options = ['<option value="">Select website</option>']
for website in websites:
selected = "selected" if selected_website_id == website.id else ""
website_options.append(
f'<option value="{website.id}" {selected}>{escape(website.site_name)} (#{website.id})</option>'
)
website_map = {website.id: website.site_name for website in websites}
snapshot_rows = []
for snapshot in recent_snapshots:
snapshot_rows.append(
"<tr>"
f"<td>{snapshot.id}</td>"
f"<td>{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})</td>"
f"<td>{escape(snapshot.source_tryout_id)}</td>"
f"<td>{escape(snapshot.title)}</td>"
f"<td>{snapshot.question_count}</td>"
f"<td>{escape(str(snapshot.created_at))}</td>"
f"<td><a href=\"/admin/snapshot-questions?snapshot_id={snapshot.id}\" "
"style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Browse</a></td>"
"</tr>"
)
snapshots_table = (
"<table><thead><tr><th>Snapshot ID</th><th>Website</th><th>Tryout ID</th><th>Title</th><th>Questions</th><th>Imported At</th><th>Actions</th></tr></thead><tbody>"
+ ("".join(snapshot_rows) if snapshot_rows else "<tr><td colspan=\"7\">No data</td></tr>")
+ "</tbody></table>"
)
preview_html = ""
if preview:
totals = preview.get("totals") or {}
tryout_rows = []
for tryout in preview.get("tryouts") or []:
diff = tryout.get("question_diff") or {}
warnings = "; ".join(tryout.get("warnings") or []) or "-"
tryout_rows.append(
[
tryout.get("source_tryout_id"),
tryout.get("title"),
diff.get("total_questions", 0),
diff.get("new_questions", 0),
diff.get("updated_questions", 0),
diff.get("unchanged_questions", 0),
diff.get("removed_questions", 0),
warnings,
]
)
import_form = ""
if preview_token and selected_website_id:
import_form = f"""
<form method="post" action="/admin/tryout-import" autocomplete="off">
<input type="hidden" name="website_id" value="{selected_website_id}">
<input type="hidden" name="preview_token" value="{escape(preview_token)}">
<button type="submit">Import Snapshot</button>
</form>
"""
preview_html = f"""
<h3 style="margin-top:24px">Preview Summary</h3>
<p class="muted">File: <strong>{escape(upload_filename or "uploaded JSON")}</strong></p>
<div class="grid">
<div class="stat">Tryouts<strong>{preview.get("tryout_count", 0)}</strong></div>
<div class="stat">New Questions<strong>{totals.get("new_questions", 0)}</strong></div>
<div class="stat">Updated Questions<strong>{totals.get("updated_questions", 0)}</strong></div>
<div class="stat">Removed Questions<strong>{totals.get("removed_questions", 0)}</strong></div>
</div>
{_table(
["Tryout ID", "Title", "Total", "New", "Updated", "Unchanged", "Removed", "Warnings"],
tryout_rows,
)}
<div class="actions">{import_form}</div>
"""
return f"""
<p class="muted">Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.</p>
<p class="muted">Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.</p>
{success_html}
{error_html}
<form method="post" action="/admin/tryout-import/preview" enctype="multipart/form-data" autocomplete="off">
<label for="website_id">Website</label>
<select id="website_id" name="website_id">{''.join(website_options)}</select>
<label for="file">Tryout Export JSON</label>
<input id="file" name="file" type="file" accept=".json,application/json">
<button type="submit">Preview Import</button>
</form>
{preview_html}
<h3 style="margin-top:24px">Recent Snapshots</h3>
<p class="muted">These are archived imports stored in PostgreSQL for traceability.</p>
{snapshots_table}
"""
def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]:
slot_map: dict[str, int] = {}
questions = (snapshot.raw_payload or {}).get("questions") or []
for index, question in enumerate(questions, start=1):
source_question_id = str((question or {}).get("id") or "").strip()
if source_question_id:
slot_map[source_question_id] = index
return slot_map
def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]:
item_options: dict[str, str] = {}
for option in raw_options or []:
if not isinstance(option, dict):
continue
increment = str(option.get("increment") or "").strip().upper()
text = str(option.get("text") or option.get("label") or "").strip()
if increment and text:
item_options[increment] = text
return item_options
def _snapshot_questions_body(
snapshot: TryoutImportSnapshot,
questions: list[TryoutSnapshotQuestion],
promoted_items_by_slot: dict[int, Item],
error: str | None = None,
success: str | None = None,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
slot_map = _snapshot_slot_map(snapshot)
rows = []
for question in questions:
slot = slot_map.get(question.source_question_id, 0)
promoted_item = promoted_items_by_slot.get(slot)
if promoted_item:
select_html = ""
action_html = (
f'Item #{promoted_item.id} already exists. '
f'<a href="/admin/ai-playground?basis_item_id={promoted_item.id}">Open in AI Playground</a>'
)
else:
select_html = f'<input type="checkbox" name="snapshot_question_ids" value="{question.id}">'
action_html = "Ready to promote"
rows.append(
"<tr>"
f"<td>{select_html}</td>"
f"<td>{slot or '-'}</td>"
f"<td>{escape(question.source_question_id)}</td>"
f"<td>{escape(question.correct_answer)}</td>"
f"<td>{question.option_count}</td>"
f"<td>{'Yes' if question.is_active else 'No'}</td>"
f"<td>{escape(_truncate(question.question_title or question.question_html, 100))}</td>"
f"<td>{action_html}</td>"
"</tr>"
)
questions_table = (
"<form method=\"post\" action=\"/admin/snapshot-questions/promote-bulk\">"
f"<input type=\"hidden\" name=\"snapshot_id\" value=\"{snapshot.id}\">"
"<div class=\"actions\" style=\"margin:16px 0\">"
"<button type=\"submit\">Promote Selected as Basis Items</button>"
"</div>"
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name=&quot;snapshot_question_ids&quot;]').forEach(el => el.checked = this.checked)\"></th><th>Slot</th><th>Source Question ID</th><th>Correct</th><th>Options</th><th>Active</th><th>Stem</th><th>Action</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"8\">No data</td></tr>")
+ "</tbody></table>"
"</form>"
)
return f"""
<p class="muted">Snapshot ID: <strong>{snapshot.id}</strong> | Website: <strong>{snapshot.website_id}</strong> | Tryout: <strong>{escape(snapshot.source_tryout_id)}</strong></p>
<p class="muted">Promote selected snapshot questions into the live <code>items</code> table as <code>sedang</code> basis items for AI generation.</p>
{success_html}
{error_html}
{questions_table}
<p style="margin-top:20px"><a href="/admin/tryout-import">Back to Tryout Import</a></p>
"""
async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]:
result = await db.execute(
select(Item)
.where(Item.level == "sedang")
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(limit)
)
return list(result.scalars().all())
async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item:
result = await db.execute(
select(Item)
.where(
Item.level == "sedang",
Item.generated_by == "manual",
Item.tryout_id == "demo-tryout",
)
.order_by(Item.id.asc())
.limit(1)
)
existing_item = result.scalar_one_or_none()
if existing_item:
return existing_item
website_result = await db.execute(
select(Website).where(Website.site_url == "https://demo.local").limit(1)
)
website = website_result.scalar_one_or_none()
if website is None:
website = Website(site_url="https://demo.local", site_name="Demo Website")
db.add(website)
await db.flush()
tryout_result = await db.execute(
select(Tryout)
.where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout")
.limit(1)
)
tryout = tryout_result.scalar_one_or_none()
if tryout is None:
tryout = Tryout(
website_id=website.id,
tryout_id="demo-tryout",
name="Demo AI Playground Tryout",
description="Seed data for the AI playground.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
item = Item(
tryout_id=tryout.tryout_id,
website_id=website.id,
slot=1,
level="sedang",
stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?",
options={
"A": "Rp260.000",
"B": "Rp300.000",
"C": "Rp320.000",
"D": "Rp360.000",
},
correct_answer="B",
explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.",
generated_by="manual",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
await db.commit()
await db.refresh(item)
return item
async def _load_websites(db: AsyncSession) -> list[Website]:
result = await db.execute(select(Website).order_by(Website.id.asc()))
return list(result.scalars().all())
async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImportSnapshot]:
result = await db.execute(
select(TryoutImportSnapshot).order_by(TryoutImportSnapshot.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout:
result = await db.execute(
select(Tryout).where(
Tryout.website_id == snapshot.website_id,
Tryout.tryout_id == snapshot.source_tryout_id,
)
)
tryout = result.scalar_one_or_none()
if tryout:
return tryout
tryout = Tryout(
website_id=snapshot.website_id,
tryout_id=snapshot.source_tryout_id,
name=snapshot.title,
description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
return tryout
async def _load_snapshot_question_context(
snapshot: TryoutImportSnapshot,
db: AsyncSession,
) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]:
question_result = await db.execute(
select(TryoutSnapshotQuestion)
.where(
TryoutSnapshotQuestion.website_id == snapshot.website_id,
TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id,
)
.order_by(TryoutSnapshotQuestion.source_question_id.asc())
)
questions = list(question_result.scalars().all())
item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()}
slot_map = _snapshot_slot_map(snapshot)
questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id))
return questions, promoted_items_by_slot, slot_map
async def _promote_snapshot_question_to_item(
snapshot: TryoutImportSnapshot,
question: TryoutSnapshotQuestion,
db: AsyncSession,
) -> tuple[Item | None, str]:
if (
question.website_id != snapshot.website_id
or question.source_tryout_id != snapshot.source_tryout_id
):
return None, "mismatch"
slot_map = _snapshot_slot_map(snapshot)
slot = slot_map.get(question.source_question_id)
if not slot:
max_slot = (
await db.scalar(
select(func.max(Item.slot)).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
or 0
)
slot = max_slot + 1
options = _snapshot_options_to_item_options(question.raw_options)
if not options:
return None, "missing_options"
await _ensure_operational_tryout(snapshot, db)
existing_item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.slot == slot,
Item.level == "sedang",
)
)
existing_item = existing_item_result.scalar_one_or_none()
if existing_item is not None:
return existing_item, "existing"
item = Item(
tryout_id=snapshot.source_tryout_id,
website_id=snapshot.website_id,
slot=slot,
level="sedang",
stem=question.question_html,
options=options,
correct_answer=question.correct_answer,
explanation=question.explanation_html,
generated_by="manual",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
return item, "created"
@router.get("", include_in_schema=False)
@router.get("/", include_in_schema=False)
async def admin_root(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
return _login_redirect()
@router.get("/login", include_in_schema=False)
async def login_view(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
body = """
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
<p class="muted">Direct environment-backed admin access.</p>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
)
@router.post("/login", include_in_schema=False)
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
remember_me: str | None = Form(None),
):
if not (
secrets.compare_digest(username, settings.ADMIN_USERNAME)
and secrets.compare_digest(password, settings.ADMIN_PASSWORD)
):
body = f"""
<div class="error">Invalid username or password.</div>
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username" value="{escape(username)}">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_401_UNAUTHORIZED,
)
expire = settings.ADMIN_SESSION_EXPIRE_SECONDS
response = _dashboard_redirect()
if remember_me == "on":
expire = max(expire, 3600 * 24 * 30)
response.set_cookie("remember_me", "on", expires=expire, path="/admin")
else:
response.delete_cookie("remember_me", path="/admin")
token = uuid.uuid4().hex
response.set_cookie(
SESSION_COOKIE,
token,
expires=expire,
path="/admin",
httponly=True,
samesite="lax",
)
await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire)
return response
@router.get("/logout", include_in_schema=False)
async def logout(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if token and _admin_redis is not None:
await _admin_redis.delete(f"{SESSION_PREFIX}{token}")
response = _login_redirect()
response.delete_cookie(SESSION_COOKIE, path="/admin")
response.delete_cookie("remember_me", path="/admin")
return response
@router.get("/password", include_in_schema=False)
async def password_view(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = f"""
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
<p>Password changes are disabled in the UI for this deployment.</p>
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
<p>Session expiry is currently set to <strong>{settings.ADMIN_SESSION_EXPIRE_SECONDS}</strong> seconds.</p>
<p><a href="/admin/dashboard">Back to dashboard</a></p>
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
)
@router.post("/password", include_in_schema=False)
async def password_submit(
request: Request,
old_password: str = Form(...),
new_password: str = Form(...),
re_new_password: str = Form(...),
):
_ = (old_password, new_password, re_new_password)
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = """
<div class="error">Password rotation via UI is disabled.</div>
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
<p><a href="/admin/dashboard">Back to dashboard</a></p>
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
status_code=400,
)
@router.get("/dashboard", include_in_schema=False)
async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0
items = await db.scalar(select(func.count()).select_from(Item)) or 0
sessions = await db.scalar(select(func.count()).select_from(Session)) or 0
completed_sessions = (
await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True)))
or 0
)
body = f"""
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
<div class="grid">
<div class="stat">Tryouts<strong>{tryouts}</strong></div>
<div class="stat">Items<strong>{items}</strong></div>
<div class="stat">Sessions<strong>{sessions}</strong></div>
<div class="stat">Completed Sessions<strong>{completed_sessions}</strong></div>
</div>
<p style="margin-top:20px"><a href="/admin/ai-playground">Open AI Playground</a></p>
"""
return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page("Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_view(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
body = _website_edit_form_body(website)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
body = _website_edit_form_body(
website,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
if not normalized_url.startswith(("http://", "https://")):
body = _website_edit_form_body(
website,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
website.site_name = normalized_name
website.site_url = normalized_url
try:
await db.commit()
except IntegrityError:
await db.rollback()
body = _website_edit_form_body(
website,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
await db.refresh(website)
body = _website_edit_form_body(
website,
success=f"Website #{website.id} updated successfully.",
)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/delete", include_in_schema=False)
async def website_delete_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
deleted_label = f"{website.site_name} ({website.site_url})"
await db.delete(website)
await db.commit()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website deleted successfully: {deleted_label}",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/tryout-import", include_in_schema=False)
async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(websites, snapshots)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.post("/tryout-import/preview", include_in_schema=False)
async def tryout_import_preview(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
file: UploadFile = File(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
if not file.filename or not file.filename.lower().endswith(".json"):
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be .json format.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
payload_bytes = await file.read()
payload_text = payload_bytes.decode("utf-8")
payload = json.loads(payload_text)
except UnicodeDecodeError:
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be UTF-8 encoded JSON.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
except json.JSONDecodeError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Invalid JSON file: {exc}",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
preview = await preview_tryout_json_import(payload, website_id, db)
except TryoutImportError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
preview_token = uuid.uuid4().hex
await _admin_redis.set(
f"{IMPORT_PREVIEW_PREFIX}{preview_token}",
payload_text,
ex=IMPORT_PREVIEW_TTL_SECONDS,
)
body = _tryout_import_form_body(
websites,
snapshots,
selected_website_id=website_id,
preview=preview,
preview_token=preview_token,
upload_filename=file.filename or "",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.post("/tryout-import", include_in_schema=False)
async def tryout_import_submit(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
preview_token: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
if not payload_text:
body = _tryout_import_form_body(
websites,
snapshots,
error="Preview token expired. Upload the JSON again and preview before importing.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
payload = json.loads(payload_text)
result = await import_tryout_json_snapshot(payload, website_id, db)
await db.commit()
except TryoutImportError as exc:
await db.rollback()
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
except Exception:
await db.rollback()
raise
finally:
await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
updated_snapshots = await _recent_snapshots(db)
imported_tryouts = result.get("imported_tryouts") or []
imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts)
body = _tryout_import_form_body(
websites,
updated_snapshots,
success=(
f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)."
),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.get("/snapshot-questions", include_in_schema=False)
async def snapshot_questions_view(
request: Request,
snapshot_id: int,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot)
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
@router.post("/snapshot-questions/promote-bulk", include_in_schema=False)
async def snapshot_question_promote_bulk(
request: Request,
snapshot_id: int = Form(...),
snapshot_question_ids: list[int] | None = Form(None),
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
if not snapshot_question_ids:
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
body = _snapshot_questions_body(
snapshot,
questions,
promoted_items_by_slot,
error="Select at least one snapshot question to promote.",
)
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
question_result = await db.execute(
select(TryoutSnapshotQuestion).where(
TryoutSnapshotQuestion.id.in_(snapshot_question_ids)
)
)
selected_questions = list(question_result.scalars().all())
created_items: list[Item] = []
existing_items: list[Item] = []
missing_option_count = 0
mismatch_count = 0
for question in selected_questions:
item, status = await _promote_snapshot_question_to_item(snapshot, question, db)
if status == "created" and item is not None:
created_items.append(item)
elif status == "existing" and item is not None:
existing_items.append(item)
elif status == "missing_options":
missing_option_count += 1
elif status == "mismatch":
mismatch_count += 1
await db.commit()
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
success_parts = []
if created_items:
success_parts.append(f"created {len(created_items)} item(s)")
if existing_items:
success_parts.append(f"reused {len(existing_items)} existing item(s)")
if missing_option_count:
success_parts.append(f"skipped {missing_option_count} question(s) with missing option text")
if mismatch_count:
success_parts.append(f"skipped {mismatch_count} mismatched question(s)")
success_message = "Bulk promote finished: " + ", ".join(success_parts) + "."
if created_items:
success_message += f" Latest basis item ID: {created_items[-1].id}."
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message)
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f'{status["calibration_percentage"]:.2f}%',
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
rows,
)
return _render_admin_page("Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
rows,
)
return _render_admin_page("Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
rows,
)
return _render_admin_page("Session Overview", "Session Overview", body)
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
error: str | None = None,
success: str | None = None,
result: dict[str, Any] | None = None,
basis_items: list[Item] | None = None,
basis_item_id: str = "",
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
options_html = "".join(
f'<option value="{escape(model)}" {"selected" if model == ai_model else ""}>{escape(label)}</option>'
for model, label in SUPPORTED_MODELS.items()
)
basis_items = basis_items or []
basis_rows = [
[
item.id,
item.tryout_id,
item.slot,
item.website_id,
_truncate(item.stem, 90),
]
for item in basis_items
]
basis_table = _table(
["Item ID", "Tryout", "Slot", "Website", "Stem"],
basis_rows,
)
seed_callout = ""
if not basis_items:
seed_callout = """
<div class="success">
No <code>sedang</code> basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
</div>
<form method="post" action="/admin/ai-playground/seed-demo">
<button type="submit">Seed Demo Basis Item</button>
</form>
"""
result_html = ""
if result:
options = result.get("options") or {}
save_html = ""
if result.get("basis_item_id") and not result.get("existing_item_id"):
save_html = f"""
<form method="post" action="/admin/ai-playground/save">
<input type="hidden" name="basis_item_id" value="{escape(str(result.get("basis_item_id", "")))}">
<input type="hidden" name="tryout_id" value="{escape(str(result.get("tryout_id", "")))}">
<input type="hidden" name="website_id" value="{escape(str(result.get("website_id", "")))}">
<input type="hidden" name="slot" value="{escape(str(result.get("slot", "")))}">
<input type="hidden" name="target_level" value="{escape(str(result.get("target_level", "")))}">
<input type="hidden" name="ai_model" value="{escape(str(result.get("ai_model", "")))}">
<input type="hidden" name="stem" value="{escape(str(result.get("stem", "")))}">
<input type="hidden" name="options_json" value="{escape(json.dumps(options))}">
<input type="hidden" name="correct" value="{escape(str(result.get("correct", "")))}">
<input type="hidden" name="explanation" value="{escape(str(result.get("explanation", "")))}">
<button type="submit">Save Generated Item</button>
</form>
"""
elif result.get("existing_item_id"):
save_html = f"""
<div class="error">
Slot {escape(str(result.get("slot", "")))} already has a <code>{escape(str(result.get("target_level", "")))}</code> item
for this tryout. Existing item ID: <strong>{escape(str(result.get("existing_item_id")))}</strong>.
</div>
"""
result_html = f"""
<h3>Preview Result</h3>
<p><strong>Model:</strong> {escape(result.get("ai_model", ""))}</p>
<p><strong>Basis Item:</strong> #{escape(str(result.get("basis_item_id", "")))} | <strong>Tryout:</strong> {escape(result.get("tryout_id", ""))} | <strong>Slot:</strong> {escape(str(result.get("slot", "")))}</p>
<p><strong>Stem:</strong><br>{escape(result.get("stem", ""))}</p>
<p><strong>Options:</strong></p>
{_table(["Key", "Text"], [[key, value] for key, value in options.items()])}
<p><strong>Correct:</strong> {escape(result.get("correct", ""))}</p>
<p><strong>Explanation:</strong><br>{escape(result.get("explanation", ""))}</p>
<div class="actions">{save_html}</div>
"""
return f"""
<p class="muted">OpenRouter key configured: <strong>{"Yes" if key_configured else "No"}</strong></p>
<p class="muted">Total AI-generated items: <strong>{stats.get("total_ai_items", 0)}</strong></p>
{success_html}
{error_html}
{seed_callout}
<form method="post" action="/admin/ai-playground" autocomplete="off">
<label for="basis_item_id">Basis Item ID</label>
<input id="basis_item_id" name="basis_item_id" type="number" value="{escape(basis_item_id)}">
<label for="target_level">Target Level</label>
<select id="target_level" name="target_level" style="width:100%;box-sizing:border-box;border:1px solid #cbd5e1;border-radius:10px;padding:12px 14px;font-size:15px;">
<option value="mudah" {"selected" if target_level == "mudah" else ""}>mudah</option>
<option value="sulit" {"selected" if target_level == "sulit" else ""}>sulit</option>
</select>
<label for="ai_model">Model</label>
<select id="ai_model" name="ai_model" style="width:100%;box-sizing:border-box;border:1px solid #cbd5e1;border-radius:10px;padding:12px 14px;font-size:15px;">
{options_html}
</select>
<button type="submit">Generate Preview</button>
</form>
<h3 style="margin-top:24px">Available Sedang Basis Items</h3>
<p class="muted">The generator needs a <code>sedang</code> item. Use one of these IDs, or seed demo data if the table is empty.</p>
{basis_table}
{result_html}
"""
@router.get("/ai-playground", include_in_schema=False)
async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
basis_item_id = request.query_params.get("basis_item_id", "")
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
basis_items=basis_items,
basis_item_id=str(basis_item_id or ""),
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground/seed-demo", include_in_schema=False)
async def ai_playground_seed_demo(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
demo_item = await _find_or_create_demo_basis_item(db)
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
success=f"Demo basis item is ready: item #{demo_item.id}, tryout {demo_item.tryout_id}, slot {demo_item.slot}.",
basis_items=basis_items,
basis_item_id=str(demo_item.id),
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground", include_in_schema=False)
async def ai_playground_submit(
request: Request,
db: AsyncSession = Depends(get_db),
basis_item_id: int = Form(...),
target_level: str = Form(...),
ai_model: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
if not settings.OPENROUTER_API_KEY:
body = _ai_form_body(
False,
stats,
error="OPENROUTER_API_KEY is not configured in the environment.",
basis_items=basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if target_level not in {"mudah", "sulit"}:
body = _ai_form_body(
True,
stats,
error="Target level must be mudah or sulit.",
basis_items=basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if not validate_ai_model(ai_model):
body = _ai_form_body(
True,
stats,
error="Unsupported AI model.",
basis_items=basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
result = await db.execute(select(Item).where(Item.id == basis_item_id))
basis_item = result.scalar_one_or_none()
if not basis_item:
body = _ai_form_body(
True,
stats,
error=f"Basis item not found: {basis_item_id}",
basis_items=basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if basis_item.level != "sedang":
body = _ai_form_body(
True,
stats,
error=f"Basis item must be sedang level, got: {basis_item.level}",
basis_items=basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
generated = await generate_question(
basis_item=basis_item,
target_level=target_level,
ai_model=ai_model,
)
if not generated:
body = _ai_form_body(
True,
stats,
error="AI generation failed. Check OPENROUTER_API_KEY, model availability, and server logs.",
basis_items=basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
existing_item_result = await db.execute(
select(Item.id).where(
Item.tryout_id == basis_item.tryout_id,
Item.website_id == basis_item.website_id,
Item.slot == basis_item.slot,
Item.level == target_level,
)
)
existing_item_id = existing_item_result.scalar_one_or_none()
body = _ai_form_body(
True,
stats,
basis_items=basis_items,
result={
"basis_item_id": basis_item.id,
"tryout_id": basis_item.tryout_id,
"website_id": basis_item.website_id,
"slot": basis_item.slot,
"target_level": target_level,
"stem": generated.stem,
"options": generated.options,
"correct": generated.correct,
"explanation": generated.explanation or "",
"ai_model": ai_model,
"existing_item_id": existing_item_id,
},
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground/save", include_in_schema=False)
async def ai_playground_save(
request: Request,
db: AsyncSession = Depends(get_db),
basis_item_id: int = Form(...),
tryout_id: str = Form(...),
website_id: int = Form(...),
slot: int = Form(...),
target_level: str = Form(...),
ai_model: str = Form(...),
stem: str = Form(...),
options_json: str = Form(...),
correct: str = Form(...),
explanation: str = Form(""),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
if target_level not in {"mudah", "sulit"}:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Only mudah or sulit generated items can be saved from the playground.",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
try:
options = json.loads(options_json)
except json.JSONDecodeError:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Generated options payload is invalid.",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
existing_result = await db.execute(
select(Item.id).where(
Item.tryout_id == tryout_id,
Item.website_id == website_id,
Item.slot == slot,
Item.level == target_level,
)
)
existing_item_id = existing_result.scalar_one_or_none()
if existing_item_id:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error=f"Item already exists at tryout={tryout_id}, slot={slot}, level={target_level} (item #{existing_item_id}).",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
from app.schemas.ai import GeneratedQuestion
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=stem,
options=options,
correct=correct,
explanation=explanation or None,
),
tryout_id=tryout_id,
website_id=website_id,
basis_item_id=basis_item_id,
slot=slot,
level=target_level,
ai_model=ai_model,
db=db,
)
if not item_id:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Failed to save generated item. Check server logs for the database error.",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
updated_stats,
success=f"Generated item saved successfully as item #{item_id}.",
basis_items=updated_basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.get("/tryout/list", include_in_schema=False)
@router.get("/item/list", include_in_schema=False)
@router.get("/user/list", include_in_schema=False)
@router.get("/session/list", include_in_schema=False)
@router.get("/tryoutstats/list", include_in_schema=False)
async def legacy_admin_paths(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
return _dashboard_redirect()