Files
yellow-bank-soal/app/admin_web.py
2026-06-07 00:38:12 +07:00

3096 lines
120 KiB
Python

"""
Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions.
This replaces the previous fastapi-admin runtime path, which depended on
Tortoise-oriented internals that do not match this project.
"""
import secrets
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from html import escape, unescape
import json
import re
from typing import Any
import aioredis
from fastapi import APIRouter, Depends, File, Form, Request, UploadFile
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import HTMLResponse, RedirectResponse
from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED, HTTP_429_TOO_MANY_REQUESTS
from app.core.config import get_settings
from app.database import get_db
from app.models import (
AIGenerationRun,
Item,
Session,
Tryout,
TryoutImportSnapshot,
TryoutSnapshotQuestion,
UserAnswer,
Website,
)
from app.services.ai_generation import (
create_generation_run,
generate_questions_batch,
get_ai_stats,
save_ai_question,
)
from app.services.irt_calibration import get_calibration_status
from app.services.tryout_json_import import (
TryoutImportError,
import_tryout_json_snapshot,
preview_tryout_json_import,
)
settings = get_settings()
router = APIRouter(prefix="/admin", tags=["admin-web"])
SESSION_COOKIE = "access_token"
CSRF_COOKIE = "admin_csrf_token"
SESSION_PREFIX = "admin:session:"
IMPORT_PREVIEW_PREFIX = "admin:import-preview:"
IMPORT_PREVIEW_TTL_SECONDS = 900
LOGIN_RATE_LIMIT_PREFIX = "admin:login:attempts:"
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 10
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 300
_admin_redis = None
@dataclass
class AdminPrincipal:
username: str
async def configure_admin_web() -> None:
global _admin_redis
if _admin_redis is not None:
return
if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD:
raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.")
_admin_redis = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
async def shutdown_admin_web() -> None:
global _admin_redis
if _admin_redis is None:
return
try:
await _admin_redis.close()
finally:
_admin_redis = None
async def _current_admin(request: Request) -> AdminPrincipal | None:
if _admin_redis is None:
return None
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
username = await _admin_redis.get(f"{SESSION_PREFIX}{token}")
if not username:
return None
return AdminPrincipal(username=str(username))
def _login_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER)
def _dashboard_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER)
def _render_auth_page(
request: Request,
title: str,
subtitle: str,
body: str,
status_code: int = 200,
) -> HTMLResponse:
remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else ""
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)}</title>
<style>
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: linear-gradient(135deg, #f8fafc, #e2e8f0); font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; color: #0f172a; }}
.panel {{ width: min(420px, calc(100vw - 32px)); background: rgba(255,255,255,0.96); border-radius: 18px; box-shadow: 0 18px 60px rgba(15, 23, 42, 0.14); padding: 28px; }}
h1 {{ margin: 0 0 8px; font-size: 28px; }}
p {{ margin: 0 0 20px; color: #475569; }}
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
input {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
.row {{ display: flex; align-items: center; gap: 10px; margin-top: 16px; color: #334155; font-size: 14px; }}
.row input {{ width: auto; }}
button {{ width: 100%; margin-top: 18px; border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
.muted {{ color: #64748b; font-size: 13px; margin-top: 14px; }}
a {{ color: #0f172a; }}
</style>
</head>
<body>
<main class="panel">
<h1>{escape(title)}</h1>
<p>{escape(subtitle)}</p>
{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
</main>
</body>
</html>"""
csrf_token = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
csrf_input = f'<input type="hidden" name="csrf_token" value="{escape(csrf_token)}">'
html = re.sub(
r'(<form[^>]*method="post"[^>]*>)',
r"\1" + csrf_input,
html,
flags=re.IGNORECASE,
)
response = HTMLResponse(html, status_code=status_code)
response.set_cookie(
CSRF_COOKIE,
csrf_token,
path="/admin",
httponly=False,
secure=settings.ENVIRONMENT == "production",
samesite="lax",
)
return response
def _render_admin_page(request: Request, title: str, page_title: str, body: str) -> HTMLResponse:
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)}</title>
<style>
body {{ margin: 0; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; background: #f5f7fb; color: #162033; }}
.layout {{ display: grid; grid-template-columns: 240px 1fr; min-height: 100vh; }}
.sidebar {{ background: #0f172a; color: #e2e8f0; padding: 24px 16px; }}
.sidebar h1 {{ font-size: 18px; margin: 0 0 24px; }}
.sidebar a {{ display: block; color: #cbd5e1; text-decoration: none; padding: 10px 12px; border-radius: 8px; margin-bottom: 8px; }}
.sidebar a:hover {{ background: #1e293b; color: #fff; }}
.content {{ padding: 32px; }}
.card {{ background: #fff; border-radius: 14px; padding: 24px; box-shadow: 0 8px 30px rgba(15, 23, 42, 0.08); }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-top: 20px; }}
.stat {{ padding: 18px; border: 1px solid #e2e8f0; border-radius: 12px; background: #f8fafc; }}
.stat strong {{ display: block; font-size: 26px; margin-top: 6px; }}
table {{ width: 100%; border-collapse: collapse; margin-top: 16px; background: #fff; border-radius: 12px; overflow: hidden; }}
th, td {{ padding: 12px 14px; border-bottom: 1px solid #e5e7eb; text-align: left; vertical-align: top; font-size: 14px; }}
th {{ background: #f8fafc; font-weight: 600; }}
input, select, textarea {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
button {{ border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
.actions {{ display: flex; gap: 12px; flex-wrap: wrap; margin-top: 18px; }}
.row {{ display: flex; align-items: center; gap: 10px; margin-top: 12px; color: #334155; font-size: 14px; }}
.row input {{ width: auto; }}
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
.success {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #ecfdf5; color: #166534; border: 1px solid #86efac; }}
.muted {{ color: #64748b; font-size: 14px; }}
.tabs {{ display: flex; gap: 8px; flex-wrap: wrap; margin: 18px 0 18px; border-bottom: 1px solid #e2e8f0; }}
.tabs a {{ display: inline-flex; align-items: center; min-height: 38px; padding: 0 14px; color: #475569; text-decoration: none; border: 1px solid transparent; border-bottom: 0; border-radius: 8px 8px 0 0; font-weight: 700; font-size: 14px; }}
.tabs a.active {{ background: #fff; border-color: #e2e8f0; color: #0f172a; box-shadow: 0 -1px 0 #fff inset; }}
.compact-strip {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 10px; margin: 14px 0; }}
.compact-stat {{ border: 1px solid #e2e8f0; border-radius: 8px; background: #f8fafc; padding: 12px 14px; }}
.compact-stat span {{ display: block; color: #64748b; font-size: 12px; font-weight: 700; text-transform: uppercase; }}
.compact-stat strong {{ display: block; margin-top: 4px; color: #0f172a; font-size: 20px; line-height: 1.1; }}
.field-grid {{ display: grid; grid-template-columns: repeat(2, minmax(180px, 1fr)); gap: 12px 16px; align-items: end; }}
.field-grid .wide {{ grid-column: 1 / -1; }}
.tab-panel {{ margin-top: 8px; }}
.toolbar {{ display: flex; align-items: end; gap: 12px; flex-wrap: wrap; margin: 12px 0 16px; }}
.toolbar label {{ min-width: 150px; margin-top: 0; }}
.toolbar input, .toolbar select {{ min-width: 150px; }}
.table-wrap {{ width: 100%; overflow-x: auto; }}
.table-wrap table {{ min-width: 860px; }}
.status-pill {{ display: inline-flex; align-items: center; min-height: 22px; padding: 0 8px; border-radius: 999px; background: #e2e8f0; color: #334155; font-size: 12px; font-weight: 700; }}
.status-approved, .status-active {{ background: #dcfce7; color: #166534; }}
.status-rejected, .status-archived {{ background: #fee2e2; color: #991b1b; }}
.status-draft {{ background: #e0f2fe; color: #075985; }}
.status-stale {{ background: #fef3c7; color: #92400e; }}
.button-link {{ display: inline-block; padding: 9px 12px; border-radius: 8px; background: #0f172a; color: #fff; text-decoration: none; font-size: 13px; font-weight: 700; }}
.secondary-link {{ display: inline-block; padding: 10px 12px; border-radius: 8px; background: #e2e8f0; color: #0f172a; text-decoration: none; font-size: 14px; font-weight: 700; }}
.question-block {{ margin: 16px 0; padding: 16px; border: 1px solid #e2e8f0; border-radius: 8px; background: #f8fafc; }}
.question-block h3 {{ margin-top: 0; }}
.option-key {{ width: 56px; font-weight: 800; color: #0f172a; }}
.correct-option td {{ background: #ecfdf5; color: #166534; font-weight: 700; }}
@media (max-width: 860px) {{
.layout {{ grid-template-columns: 1fr; }}
.sidebar {{ position: static; }}
.content {{ padding: 18px; }}
.field-grid {{ grid-template-columns: 1fr; }}
}}
</style>
</head>
<body>
<div class="layout">
<aside class="sidebar">
<h1>IRT Bank Soal Admin</h1>
<a href="/admin/dashboard">Dashboard</a>
<a href="/admin/websites">Websites</a>
<a href="/admin/tryout-import">Tryout Import</a>
<a href="/admin/basis-items">Basis Items</a>
<a href="/admin/calibration-status">Calibration Status</a>
<a href="/admin/item-statistics">Item Statistics</a>
<a href="/admin/session-overview">Session Overview</a>
<a href="/admin/ai-playground">AI Playground</a>
<a href="/admin/password">Password Info</a>
<a href="/admin/logout">Logout</a>
</aside>
<main class="content">
<div class="card">
<h2>{escape(page_title)}</h2>
{body}
</div>
</main>
</div>
</body>
</html>"""
csrf_token = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
csrf_input = f'<input type="hidden" name="csrf_token" value="{escape(csrf_token)}">'
html = re.sub(
r'(<form[^>]*method="post"[^>]*>)',
r"\1" + csrf_input,
html,
flags=re.IGNORECASE,
)
response = HTMLResponse(html)
response.set_cookie(
CSRF_COOKIE,
csrf_token,
path="/admin",
httponly=False,
secure=settings.ENVIRONMENT == "production",
samesite="lax",
)
return response
def _verify_csrf(request: Request, csrf_token: str | None) -> None:
cookie_token = request.cookies.get(CSRF_COOKIE)
if not cookie_token or not csrf_token:
raise HTTPException(status_code=403, detail="CSRF validation failed")
if not secrets.compare_digest(cookie_token, csrf_token):
raise HTTPException(status_code=403, detail="CSRF validation failed")
async def _enforce_csrf(request: Request) -> None:
form = await request.form()
_verify_csrf(request, form.get("csrf_token"))
async def _csrf_route_guard(request: Request) -> None:
if request.method.upper() != "POST":
return
await _enforce_csrf(request)
router.dependencies.append(Depends(_csrf_route_guard))
def _table(headers: list[str], rows: list[list[Any]]) -> str:
head = "".join(f"<th>{escape(str(header))}</th>" for header in headers)
body_rows = []
for row in rows:
cols = "".join(f"<td>{escape(str(value))}</td>" for value in row)
body_rows.append(f"<tr>{cols}</tr>")
body = "".join(body_rows) or f"<tr><td colspan=\"{len(headers)}\">No data</td></tr>"
return f"<table><thead><tr>{head}</tr></thead><tbody>{body}</tbody></table>"
def _truncate(text: str | None, max_length: int = 120) -> str:
if not text:
return ""
if len(text) <= max_length:
return text
return f"{text[: max_length - 3]}..."
def _html_to_text(value: str | None) -> str:
if not value:
return ""
text = re.sub(r"<[^>]+>", " ", value)
text = unescape(text)
text = re.sub(r"\s+", " ", text).strip()
return text
def _websites_form_body(
websites: list[Website],
error: str | None = None,
success: str | None = None,
site_name: str = "",
site_url: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
body_rows = []
for website in websites:
actions = f"""
<div class="actions" style="margin-top:0">
<a href="/admin/websites/{website.id}/edit" style="display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;">Edit</a>
<form method="post" action="/admin/websites/{website.id}/delete" onsubmit="return confirm('Delete website {escape(website.site_name)} and all related tryouts, items, sessions, and snapshots?');" style="margin:0">
<button type="submit" style="background:#991b1b;">Delete</button>
</form>
</div>
"""
body_rows.append(
"<tr>"
f"<td>{website.id}</td>"
f"<td>{escape(website.site_name)}</td>"
f"<td>{escape(website.site_url)}</td>"
f"<td>{actions}</td>"
"</tr>"
)
if body_rows:
websites_table = (
"<table><thead><tr><th>ID</th><th>Name</th><th>URL</th><th>Actions</th></tr></thead><tbody>"
+ "".join(body_rows)
+ "</tbody></table>"
)
else:
websites_table = _table(["ID", "Name", "URL", "Actions"], [])
return f"""
<p class="muted">Register websites here so imports and tryout references can be tied to a known source site.</p>
{success_html}
{error_html}
<form method="post" action="/admin/websites" autocomplete="off">
<label for="site_name">Website Name</label>
<input id="site_name" name="site_name" type="text" value="{escape(site_name)}" placeholder="Sejoli Demo Site">
<label for="site_url">Website URL</label>
<input id="site_url" name="site_url" type="url" value="{escape(site_url)}" placeholder="https://example.com">
<button type="submit">Add Website</button>
</form>
<h3 style="margin-top:24px">Registered Websites</h3>
<p class="muted">Use the website ID when importing read-only tryout snapshots.</p>
{websites_table}
"""
def _website_edit_form_body(
website: Website,
error: str | None = None,
success: str | None = None,
site_name: str | None = None,
site_url: str | None = None,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
display_name = website.site_name if site_name is None else site_name
display_url = website.site_url if site_url is None else site_url
return f"""
<p class="muted">Website ID: <strong>{website.id}</strong></p>
{success_html}
{error_html}
<form method="post" action="/admin/websites/{website.id}/edit" autocomplete="off">
<label for="site_name">Website Name</label>
<input id="site_name" name="site_name" type="text" value="{escape(display_name)}">
<label for="site_url">Website URL</label>
<input id="site_url" name="site_url" type="url" value="{escape(display_url)}">
<div class="actions">
<button type="submit">Save Changes</button>
<a href="/admin/websites" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Back</a>
</div>
</form>
"""
def _tryout_import_form_body(
websites: list[Website],
recent_snapshots: list[TryoutImportSnapshot],
error: str | None = None,
success: str | None = None,
selected_website_id: int | None = None,
preview: dict[str, Any] | None = None,
preview_token: str | None = None,
upload_filename: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
website_options = ['<option value="">Select website</option>']
for website in websites:
selected = "selected" if selected_website_id == website.id else ""
website_options.append(
f'<option value="{website.id}" {selected}>{escape(website.site_name)} (#{website.id})</option>'
)
website_map = {website.id: website.site_name for website in websites}
snapshot_rows = []
for snapshot in recent_snapshots:
snapshot_rows.append(
"<tr>"
f"<td>{snapshot.id}</td>"
f"<td>{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})</td>"
f"<td>{escape(snapshot.source_tryout_id)}</td>"
f"<td>{escape(snapshot.title)}</td>"
f"<td>{snapshot.question_count}</td>"
f"<td>{escape(str(snapshot.created_at))}</td>"
f"<td><a href=\"/admin/snapshot-questions?snapshot_id={snapshot.id}\" "
"style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Browse</a></td>"
"</tr>"
)
snapshots_table = (
"<table><thead><tr><th>Snapshot ID</th><th>Website</th><th>Tryout ID</th><th>Title</th><th>Questions</th><th>Imported At</th><th>Actions</th></tr></thead><tbody>"
+ ("".join(snapshot_rows) if snapshot_rows else "<tr><td colspan=\"7\">No data</td></tr>")
+ "</tbody></table>"
)
preview_html = ""
if preview:
totals = preview.get("totals") or {}
tryout_rows = []
for tryout in preview.get("tryouts") or []:
diff = tryout.get("question_diff") or {}
warnings = "; ".join(tryout.get("warnings") or []) or "-"
tryout_rows.append(
[
tryout.get("source_tryout_id"),
tryout.get("title"),
diff.get("total_questions", 0),
diff.get("new_questions", 0),
diff.get("updated_questions", 0),
diff.get("unchanged_questions", 0),
diff.get("removed_questions", 0),
warnings,
]
)
import_form = ""
if preview_token and selected_website_id:
import_form = f"""
<form method="post" action="/admin/tryout-import" autocomplete="off">
<input type="hidden" name="website_id" value="{selected_website_id}">
<input type="hidden" name="preview_token" value="{escape(preview_token)}">
<button type="submit">Import Snapshot</button>
</form>
"""
preview_html = f"""
<h3 style="margin-top:24px">Preview Summary</h3>
<p class="muted">File: <strong>{escape(upload_filename or "uploaded JSON")}</strong></p>
<div class="grid">
<div class="stat">Tryouts<strong>{preview.get("tryout_count", 0)}</strong></div>
<div class="stat">New Questions<strong>{totals.get("new_questions", 0)}</strong></div>
<div class="stat">Updated Questions<strong>{totals.get("updated_questions", 0)}</strong></div>
<div class="stat">Removed Questions<strong>{totals.get("removed_questions", 0)}</strong></div>
</div>
{_table(
["Tryout ID", "Title", "Total", "New", "Updated", "Unchanged", "Removed", "Warnings"],
tryout_rows,
)}
<div class="actions">{import_form}</div>
"""
return f"""
<p class="muted">Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.</p>
<p class="muted">Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.</p>
{success_html}
{error_html}
<form method="post" action="/admin/tryout-import/preview" enctype="multipart/form-data" autocomplete="off">
<label for="website_id">Website</label>
<select id="website_id" name="website_id">{''.join(website_options)}</select>
<label for="file">Tryout Export JSON</label>
<input id="file" name="file" type="file" accept=".json,application/json">
<button type="submit">Preview Import</button>
</form>
{preview_html}
<h3 style="margin-top:24px">Recent Snapshots</h3>
<p class="muted">These are archived imports stored in PostgreSQL for traceability.</p>
{snapshots_table}
"""
def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]:
slot_map: dict[str, int] = {}
questions = (snapshot.raw_payload or {}).get("questions") or []
for index, question in enumerate(questions, start=1):
source_question_id = str((question or {}).get("id") or "").strip()
if source_question_id:
slot_map[source_question_id] = index
return slot_map
def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]:
item_options: dict[str, str] = {}
for option in raw_options or []:
if not isinstance(option, dict):
continue
increment = str(option.get("increment") or "").strip().upper()
text = str(option.get("text") or option.get("label") or "").strip()
if increment and text:
item_options[increment] = text
return item_options
def _snapshot_questions_body(
snapshot: TryoutImportSnapshot,
questions: list[TryoutSnapshotQuestion],
promoted_items_by_slot: dict[int, Item],
error: str | None = None,
success: str | None = None,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
slot_map = _snapshot_slot_map(snapshot)
rows = []
for question in questions:
slot = slot_map.get(question.source_question_id, 0)
promoted_item = promoted_items_by_slot.get(slot)
if promoted_item:
select_html = ""
action_html = (
f'Item #{promoted_item.id} already exists. '
f'<a href="/admin/ai-playground?basis_item_id={promoted_item.id}">Open in AI Playground</a>'
)
else:
select_html = f'<input type="checkbox" name="snapshot_question_ids" value="{question.id}">'
action_html = "Ready to promote"
rows.append(
"<tr>"
f"<td>{select_html}</td>"
f"<td>{slot or '-'}</td>"
f"<td>{escape(question.source_question_id)}</td>"
f"<td>{escape(question.correct_answer)}</td>"
f"<td>{question.option_count}</td>"
f"<td>{'Yes' if question.is_active else 'No'}</td>"
f"<td>{escape(_truncate(question.question_title or question.question_html, 100))}</td>"
f"<td>{action_html}</td>"
"</tr>"
)
questions_table = (
"<form method=\"post\" action=\"/admin/snapshot-questions/promote-bulk\">"
f"<input type=\"hidden\" name=\"snapshot_id\" value=\"{snapshot.id}\">"
"<div class=\"actions\" style=\"margin:16px 0\">"
"<button type=\"submit\">Promote Selected as Basis Items</button>"
"</div>"
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name=&quot;snapshot_question_ids&quot;]').forEach(el => el.checked = this.checked)\"></th><th>Slot</th><th>Source Question ID</th><th>Correct</th><th>Options</th><th>Active</th><th>Stem</th><th>Action</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"8\">No data</td></tr>")
+ "</tbody></table>"
"</form>"
)
return f"""
<p class="muted">Snapshot ID: <strong>{snapshot.id}</strong> | Website: <strong>{snapshot.website_id}</strong> | Tryout: <strong>{escape(snapshot.source_tryout_id)}</strong></p>
<p class="muted">Promote selected snapshot questions into the live <code>items</code> table as <code>sedang</code> basis items for AI generation.</p>
{success_html}
{error_html}
{questions_table}
<p style="margin-top:20px"><a href="/admin/tryout-import">Back to Tryout Import</a></p>
"""
async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]:
result = await db.execute(
select(Item)
.where(Item.level == "sedang")
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(limit)
)
return list(result.scalars().all())
async def _recent_generation_runs(db: AsyncSession, limit: int = 20) -> list[AIGenerationRun]:
result = await db.execute(
select(AIGenerationRun).order_by(AIGenerationRun.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _recent_generated_variants(
db: AsyncSession,
limit: int = 100,
basis_item_id: int | None = None,
status_filter: str | None = None,
level_filter: str | None = None,
run_id_filter: int | None = None,
) -> list[Item]:
stmt = select(Item).where(Item.generated_by == "ai")
if basis_item_id is not None:
stmt = stmt.where(Item.basis_item_id == basis_item_id)
if status_filter:
stmt = stmt.where(Item.variant_status == status_filter)
if level_filter:
stmt = stmt.where(Item.level == level_filter)
if run_id_filter is not None:
stmt = stmt.where(Item.generation_run_id == run_id_filter)
result = await db.execute(
stmt.order_by(Item.created_at.desc(), Item.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _usage_metrics_for_items(
db: AsyncSession,
item_ids: list[int],
) -> dict[int, dict[str, float]]:
if not item_ids:
return {}
result = await db.execute(
select(
UserAnswer.item_id,
func.count(UserAnswer.id).label("impressions"),
func.count(func.distinct(UserAnswer.wp_user_id)).label("unique_users"),
)
.where(UserAnswer.item_id.in_(item_ids))
.group_by(UserAnswer.item_id)
)
metrics: dict[int, dict[str, float]] = {}
for item_id, impressions, unique_users in result.all():
impressions_i = int(impressions or 0)
unique_users_i = int(unique_users or 0)
frequency = (impressions_i / unique_users_i) if unique_users_i else 0.0
metrics[int(item_id)] = {
"impressions": float(impressions_i),
"unique_users": float(unique_users_i),
"frequency": float(frequency),
}
return metrics
async def _family_usage_stats(
db: AsyncSession,
basis_item: Item,
variants: list[Item],
) -> tuple[dict[int, dict[str, float]], dict[str, float]]:
family_item_ids = [basis_item.id] + [item.id for item in variants]
usage_metrics = await _usage_metrics_for_items(db, family_item_ids)
family_impressions = int(sum(metric["impressions"] for metric in usage_metrics.values()))
family_unique_users = int(
await db.scalar(
select(func.count(func.distinct(UserAnswer.wp_user_id))).where(
UserAnswer.item_id.in_(family_item_ids)
)
)
or 0
)
family_frequency = (family_impressions / family_unique_users) if family_unique_users else 0.0
return usage_metrics, {
"impressions": float(family_impressions),
"unique_users": float(family_unique_users),
"frequency": float(family_frequency),
}
def _basis_items_list_body(items: list[Item]) -> str:
rows = []
for item in items:
rows.append(
"<tr>"
f"<td>{item.id}</td>"
f"<td>{escape(item.tryout_id)}</td>"
f"<td>{item.slot}</td>"
f"<td>{item.website_id}</td>"
f"<td>{escape(_truncate(item.stem, 120))}</td>"
f"<td>{item.source_snapshot_question_id or '-'}</td>"
f"<td><a href=\"/admin/basis-items/{item.id}\" style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Open Workspace</a></td>"
"</tr>"
)
table = (
"<table><thead><tr><th>Item ID</th><th>Tryout</th><th>Slot</th><th>Website</th><th>Stem</th><th>Source Snapshot QID</th><th>Actions</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"7\">No basis items found.</td></tr>")
+ "</tbody></table>"
)
return f"""
<p class="muted">Basis items are canonical parent questions (<code>sedang</code>, non-AI). Open a workspace to generate and review AI child variants.</p>
{table}
"""
def _basis_item_workspace_body(
basis_item: Item,
runs: list[AIGenerationRun],
variants: list[Item],
usage_by_item: dict[int, dict[str, float]],
family_stats: dict[str, float],
filters: dict[str, str],
error: str | None = None,
success: str | None = None,
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_LLAMA,
generation_count: str = "1",
operator_notes: str = "",
include_note_for_admin: bool = True,
include_note_in_prompt: bool = False,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
status_filter = filters.get("status", "")
level_filter = filters.get("level", "")
min_frequency_filter = filters.get("min_frequency", "")
run_id_filter = filters.get("run_id", "")
run_rows = [
[
run.id,
run.target_level,
run.requested_count,
run.model,
run.created_by,
str(run.created_at),
]
for run in runs
]
runs_table = _table(
["Run ID", "Target", "Requested", "Model", "Created By", "Created At"],
run_rows,
)
variant_rows = []
for item in variants:
usage = usage_by_item.get(item.id, {"impressions": 0.0, "unique_users": 0.0, "frequency": 0.0})
options = item.options if isinstance(item.options, dict) else {}
options_rows = "".join(
f"<tr><td style=\"padding:6px 8px;border-bottom:1px solid #e5e7eb;\"><strong>{escape(str(key))}</strong></td>"
f"<td style=\"padding:6px 8px;border-bottom:1px solid #e5e7eb;\">{escape(str(value))}</td></tr>"
for key, value in options.items()
) or "<tr><td colspan=\"2\" style=\"padding:6px 8px;\">No options</td></tr>"
review_html = (
"<details style=\"margin-top:8px;\">"
"<summary style=\"cursor:pointer;color:#0f172a;font-weight:600;\">Review full content</summary>"
f"<div style=\"margin-top:8px;padding:10px;border:1px solid #e2e8f0;border-radius:8px;background:#f8fafc;\">"
f"<p style=\"margin:0 0 8px;\"><strong>Full Stem</strong><br>{escape(_html_to_text(item.stem))}</p>"
"<table style=\"margin:0 0 8px;width:100%;border-collapse:collapse;\">"
"<thead><tr><th style=\"text-align:left;padding:6px 8px;background:#eef2ff;\">Option</th><th style=\"text-align:left;padding:6px 8px;background:#eef2ff;\">Text</th></tr></thead>"
f"<tbody>{options_rows}</tbody>"
"</table>"
f"<p style=\"margin:0 0 6px;\"><strong>Correct Answer:</strong> {escape(item.correct_answer or '-')}</p>"
f"<p style=\"margin:0;\"><strong>Explanation:</strong> {escape(_html_to_text(item.explanation) or '-')}</p>"
"</div>"
"</details>"
)
variant_rows.append(
"<tr>"
f"<td><input type=\"checkbox\" name=\"item_ids\" value=\"{item.id}\"></td>"
f"<td>{item.id}</td>"
f"<td>{item.generation_run_id or '-'}</td>"
f"<td>{escape(item.level)}</td>"
f"<td>{escape(item.variant_status)}</td>"
f"<td>{escape(item.ai_model or '-')}</td>"
f"<td>{int(usage['impressions'])}</td>"
f"<td>{int(usage['unique_users'])}</td>"
f"<td>{usage['frequency']:.2f}</td>"
f"<td>{escape(_truncate(_html_to_text(item.stem), 130))}{review_html}</td>"
f"<td>{escape(str(item.created_at))}</td>"
"</tr>"
)
variants_table = (
f"<form method=\"post\" action=\"/admin/basis-items/{basis_item.id}/review-bulk\">"
"<div class=\"actions\" style=\"margin:16px 0\">"
"<select name=\"action\" style=\"max-width:260px\">"
"<option value=\"approved\">Approve selected</option>"
"<option value=\"rejected\">Reject selected</option>"
"<option value=\"archived\">Archive selected</option>"
"<option value=\"stale\">Mark stale</option>"
"<option value=\"active\">Activate selected</option>"
"</select>"
"<button type=\"submit\">Apply</button>"
"</div>"
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name=&quot;item_ids&quot;]').forEach(el => el.checked = this.checked)\"></th><th>Item ID</th><th>Run ID</th><th>Level</th><th>Status</th><th>Model</th><th>Impressions</th><th>Unique Users</th><th>Frequency</th><th>Stem</th><th>Created At</th></tr></thead><tbody>"
+ ("".join(variant_rows) if variant_rows else "<tr><td colspan=\"11\">No generated variants yet for this parent.</td></tr>")
+ "</tbody></table></form>"
)
return f"""
{success_html}
{error_html}
<section style="border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#f8fafc;">
<h3 style="margin:0 0 10px;">Parent Summary</h3>
<p class="muted" style="margin:0 0 8px;">
Parent Item: <strong>#{basis_item.id}</strong> |
Tryout: <strong>{escape(basis_item.tryout_id)}</strong> |
Slot: <strong>{basis_item.slot}</strong> |
Website: <strong>{basis_item.website_id}</strong> |
Source Snapshot QID: <strong>{basis_item.source_snapshot_question_id or '-'}</strong>
</p>
<p class="muted" style="margin:0 0 8px;">
Family Usage: impressions=<strong>{int(family_stats.get("impressions", 0.0))}</strong>,
unique users=<strong>{int(family_stats.get("unique_users", 0.0))}</strong>,
frequency=<strong>{family_stats.get("frequency", 0.0):.2f}</strong>
</p>
<p class="muted" style="margin:0;"><strong>Stem:</strong> {escape(_truncate(_html_to_text(basis_item.stem), 260))}</p>
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 8px;">Generate Variants</h3>
<p class="muted" style="margin:0 0 12px;">Create new AI child variants for this parent.</p>
<form method="post" action="/admin/basis-items/{basis_item.id}/generate" autocomplete="off">
<label for="target_level">Target Level</label>
<select id="target_level" name="target_level">
<option value="mudah" {"selected" if target_level == "mudah" else ""}>mudah</option>
<option value="sulit" {"selected" if target_level == "sulit" else ""}>sulit</option>
</select>
<label for="ai_model">Model</label>
<input id="ai_model" name="ai_model" type="text" value="{escape(settings.OPENROUTER_MODEL_LLAMA)}" readonly>
<label for="generation_count">Generate Count</label>
<input id="generation_count" name="generation_count" type="number" min="1" max="50" value="{escape(generation_count)}">
<p class="muted">Recommended: 1-3 per run. Larger runs increase overlap and review burden.</p>
<label for="operator_notes">Operator Notes (optional)</label>
<textarea id="operator_notes" name="operator_notes" rows="3">{escape(operator_notes)}</textarea>
<label class="row"><input type="checkbox" name="include_note_for_admin" {"checked" if include_note_for_admin else ""}> Save note for admin team (visible in run history)</label>
<label class="row"><input type="checkbox" name="include_note_in_prompt" {"checked" if include_note_in_prompt else ""}> Include note in AI prompt payload</label>
<p class="muted" style="margin-top:6px;">Example note: <code>Use clinical language, avoid negatives, keep stem under 40 words.</code></p>
<button type="submit">Generate Variants</button>
</form>
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 8px;">Filter Variants</h3>
<p class="muted" style="margin:0 0 12px;">Filter child variants shown in the review table below.</p>
<form method="get" action="/admin/basis-items/{basis_item.id}" autocomplete="off">
<div style="display:grid;grid-template-columns:repeat(auto-fit,minmax(190px,1fr));gap:12px;align-items:end;">
<div>
<label for="status" style="margin:0 0 6px;">Status</label>
<select id="status" name="status">
<option value="" {"selected" if status_filter == "" else ""}>All</option>
<option value="draft" {"selected" if status_filter == "draft" else ""}>draft</option>
<option value="approved" {"selected" if status_filter == "approved" else ""}>approved</option>
<option value="active" {"selected" if status_filter == "active" else ""}>active</option>
<option value="rejected" {"selected" if status_filter == "rejected" else ""}>rejected</option>
<option value="archived" {"selected" if status_filter == "archived" else ""}>archived</option>
<option value="stale" {"selected" if status_filter == "stale" else ""}>stale</option>
</select>
</div>
<div>
<label for="level" style="margin:0 0 6px;">Level</label>
<select id="level" name="level">
<option value="" {"selected" if level_filter == "" else ""}>All</option>
<option value="mudah" {"selected" if level_filter == "mudah" else ""}>mudah</option>
<option value="sulit" {"selected" if level_filter == "sulit" else ""}>sulit</option>
</select>
</div>
<div>
<label for="run_id" style="margin:0 0 6px;">Run ID</label>
<input id="run_id" name="run_id" type="number" min="1" value="{escape(run_id_filter)}">
</div>
<div>
<label for="min_frequency" style="margin:0 0 6px;">Min Frequency</label>
<input id="min_frequency" name="min_frequency" type="number" min="0" step="0.1" value="{escape(min_frequency_filter)}">
</div>
<div style="display:flex;gap:10px;align-items:center;flex-wrap:wrap;">
<button type="submit">Apply</button>
<a href="/admin/basis-items/{basis_item.id}" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Reset</a>
</div>
</div>
</form>
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 8px;">Child Variants for This Parent</h3>
<p class="muted">Filtered variants shown: <strong>{len(variants)}</strong></p>
{variants_table}
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 12px;">Generation Runs for This Parent</h3>
<p class="muted" style="margin:0 0 10px;">Run history is reference/audit data and is intentionally separated from variant review workflow.</p>
{runs_table}
</section>
<p style="margin-top:20px"><a href="/admin/basis-items">Back to Basis Items</a></p>
"""
async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item:
result = await db.execute(
select(Item)
.where(
Item.level == "sedang",
Item.generated_by == "manual",
Item.tryout_id == "demo-tryout",
)
.order_by(Item.id.asc())
.limit(1)
)
existing_item = result.scalar_one_or_none()
if existing_item:
return existing_item
website_result = await db.execute(
select(Website).where(Website.site_url == "https://demo.local").limit(1)
)
website = website_result.scalar_one_or_none()
if website is None:
website = Website(site_url="https://demo.local", site_name="Demo Website")
db.add(website)
await db.flush()
tryout_result = await db.execute(
select(Tryout)
.where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout")
.limit(1)
)
tryout = tryout_result.scalar_one_or_none()
if tryout is None:
tryout = Tryout(
website_id=website.id,
tryout_id="demo-tryout",
name="Demo AI Playground Tryout",
description="Seed data for the AI playground.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
item = Item(
tryout_id=tryout.tryout_id,
website_id=website.id,
slot=1,
level="sedang",
stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?",
options={
"A": "Rp260.000",
"B": "Rp300.000",
"C": "Rp320.000",
"D": "Rp360.000",
},
correct_answer="B",
explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.",
generated_by="manual",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
await db.commit()
await db.refresh(item)
return item
async def _load_websites(db: AsyncSession) -> list[Website]:
result = await db.execute(select(Website).order_by(Website.id.asc()))
return list(result.scalars().all())
async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImportSnapshot]:
result = await db.execute(
select(TryoutImportSnapshot).order_by(TryoutImportSnapshot.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout:
result = await db.execute(
select(Tryout).where(
Tryout.website_id == snapshot.website_id,
Tryout.tryout_id == snapshot.source_tryout_id,
)
)
tryout = result.scalar_one_or_none()
if tryout:
return tryout
tryout = Tryout(
website_id=snapshot.website_id,
tryout_id=snapshot.source_tryout_id,
name=snapshot.title,
description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
return tryout
async def _load_snapshot_question_context(
snapshot: TryoutImportSnapshot,
db: AsyncSession,
) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]:
question_result = await db.execute(
select(TryoutSnapshotQuestion)
.where(
TryoutSnapshotQuestion.website_id == snapshot.website_id,
TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id,
)
.order_by(TryoutSnapshotQuestion.source_question_id.asc())
)
questions = list(question_result.scalars().all())
item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()}
slot_map = _snapshot_slot_map(snapshot)
questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id))
return questions, promoted_items_by_slot, slot_map
async def _promote_snapshot_question_to_item(
snapshot: TryoutImportSnapshot,
question: TryoutSnapshotQuestion,
db: AsyncSession,
) -> tuple[Item | None, str]:
if (
question.website_id != snapshot.website_id
or question.source_tryout_id != snapshot.source_tryout_id
):
return None, "mismatch"
slot_map = _snapshot_slot_map(snapshot)
slot = slot_map.get(question.source_question_id)
if not slot:
max_slot = (
await db.scalar(
select(func.max(Item.slot)).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
or 0
)
slot = max_slot + 1
options = _snapshot_options_to_item_options(question.raw_options)
if not options:
return None, "missing_options"
await _ensure_operational_tryout(snapshot, db)
existing_item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.slot == slot,
Item.level == "sedang",
)
)
existing_item = existing_item_result.scalar_one_or_none()
if existing_item is not None:
return existing_item, "existing"
item = Item(
tryout_id=snapshot.source_tryout_id,
website_id=snapshot.website_id,
slot=slot,
level="sedang",
stem=question.question_html,
options=options,
correct_answer=question.correct_answer,
explanation=question.explanation_html,
generated_by="manual",
source_snapshot_question_id=question.id,
variant_status="active",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
return item, "created"
@router.get("", include_in_schema=False)
@router.get("/", include_in_schema=False)
async def admin_root(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
return _login_redirect()
@router.get("/login", include_in_schema=False)
async def login_view(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
body = """
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
<p class="muted">Direct environment-backed admin access.</p>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
)
@router.post("/login", include_in_schema=False)
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
remember_me: str | None = Form(None),
):
if _admin_redis is None:
body = """
<div class="error">Admin backend is temporarily unavailable. Please try again.</div>
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=503,
)
client_ip = request.client.host if request.client else "unknown"
rate_limit_key = f"{LOGIN_RATE_LIMIT_PREFIX}{client_ip}"
attempts_raw = await _admin_redis.get(rate_limit_key)
attempts = int(attempts_raw) if attempts_raw else 0
if attempts >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
body = """
<div class="error">Too many login attempts. Please wait a few minutes and try again.</div>
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_429_TOO_MANY_REQUESTS,
)
if not (
secrets.compare_digest(username, settings.ADMIN_USERNAME)
and secrets.compare_digest(password, settings.ADMIN_PASSWORD)
):
attempts = await _admin_redis.incr(rate_limit_key)
if attempts == 1:
await _admin_redis.expire(rate_limit_key, LOGIN_RATE_LIMIT_WINDOW_SECONDS)
body = f"""
<div class="error">Invalid username or password.</div>
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username" value="{escape(username)}">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_401_UNAUTHORIZED,
)
await _admin_redis.delete(rate_limit_key)
expire = settings.ADMIN_SESSION_EXPIRE_SECONDS
response = _dashboard_redirect()
secure_cookie = settings.ENVIRONMENT == "production"
if remember_me == "on":
expire = max(expire, 3600 * 24 * 30)
response.set_cookie(
"remember_me",
"on",
expires=expire,
path="/admin",
secure=secure_cookie,
samesite="lax",
)
else:
response.delete_cookie("remember_me", path="/admin")
token = uuid.uuid4().hex
response.set_cookie(
SESSION_COOKIE,
token,
expires=expire,
path="/admin",
httponly=True,
secure=secure_cookie,
samesite="lax",
)
await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire)
return response
@router.get("/logout", include_in_schema=False)
async def logout(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if token and _admin_redis is not None:
await _admin_redis.delete(f"{SESSION_PREFIX}{token}")
response = _login_redirect()
response.delete_cookie(SESSION_COOKIE, path="/admin")
response.delete_cookie("remember_me", path="/admin")
return response
@router.get("/password", include_in_schema=False)
async def password_view(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = f"""
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
<p>Password changes are disabled in the UI for this deployment.</p>
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
<p>Session expiry is currently set to <strong>{settings.ADMIN_SESSION_EXPIRE_SECONDS}</strong> seconds.</p>
<p><a href="/admin/dashboard">Back to dashboard</a></p>
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
)
@router.post("/password", include_in_schema=False)
async def password_submit(
request: Request,
old_password: str = Form(...),
new_password: str = Form(...),
re_new_password: str = Form(...),
):
_ = (old_password, new_password, re_new_password)
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = """
<div class="error">Password rotation via UI is disabled.</div>
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
<p><a href="/admin/dashboard">Back to dashboard</a></p>
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
status_code=400,
)
@router.get("/dashboard", include_in_schema=False)
async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0
items = await db.scalar(select(func.count()).select_from(Item)) or 0
sessions = await db.scalar(select(func.count()).select_from(Session)) or 0
completed_sessions = (
await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True)))
or 0
)
body = f"""
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
<div class="grid">
<div class="stat">Tryouts<strong>{tryouts}</strong></div>
<div class="stat">Items<strong>{items}</strong></div>
<div class="stat">Sessions<strong>{sessions}</strong></div>
<div class="stat">Completed Sessions<strong>{completed_sessions}</strong></div>
</div>
<p style="margin-top:20px"><a href="/admin/ai-playground">Open AI Playground</a></p>
"""
return _render_admin_page(request, "IRT Bank Soal Admin", "Dashboard", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page(request, "Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page(request, "Websites", "Websites", body)
@router.get("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_view(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page(request, "Websites", "Websites", body)
body = _website_edit_form_body(website)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page(request, "Websites", "Websites", body)
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
body = _website_edit_form_body(
website,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
if not normalized_url.startswith(("http://", "https://")):
body = _website_edit_form_body(
website,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
website.site_name = normalized_name
website.site_url = normalized_url
try:
await db.commit()
except IntegrityError:
await db.rollback()
body = _website_edit_form_body(
website,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
await db.refresh(website)
body = _website_edit_form_body(
website,
success=f"Website #{website.id} updated successfully.",
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/delete", include_in_schema=False)
async def website_delete_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page(request, "Websites", "Websites", body)
deleted_label = f"{website.site_name} ({website.site_url})"
await db.delete(website)
await db.commit()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website deleted successfully: {deleted_label}",
)
return _render_admin_page(request, "Websites", "Websites", body)
@router.get("/tryout-import", include_in_schema=False)
async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(websites, snapshots)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
@router.post("/tryout-import/preview", include_in_schema=False)
async def tryout_import_preview(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
file: UploadFile = File(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
if not file.filename or not file.filename.lower().endswith(".json"):
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be .json format.",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
try:
payload_bytes = await file.read()
payload_text = payload_bytes.decode("utf-8")
payload = json.loads(payload_text)
except UnicodeDecodeError:
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be UTF-8 encoded JSON.",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
except json.JSONDecodeError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Invalid JSON file: {exc}",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
try:
preview = await preview_tryout_json_import(payload, website_id, db)
except TryoutImportError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
preview_token = uuid.uuid4().hex
await _admin_redis.set(
f"{IMPORT_PREVIEW_PREFIX}{preview_token}",
payload_text,
ex=IMPORT_PREVIEW_TTL_SECONDS,
)
body = _tryout_import_form_body(
websites,
snapshots,
selected_website_id=website_id,
preview=preview,
preview_token=preview_token,
upload_filename=file.filename or "",
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
@router.post("/tryout-import", include_in_schema=False)
async def tryout_import_submit(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
preview_token: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
if not payload_text:
body = _tryout_import_form_body(
websites,
snapshots,
error="Preview token expired. Upload the JSON again and preview before importing.",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
try:
payload = json.loads(payload_text)
result = await import_tryout_json_snapshot(payload, website_id, db)
await db.commit()
except TryoutImportError as exc:
await db.rollback()
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
except Exception:
await db.rollback()
raise
finally:
await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
updated_snapshots = await _recent_snapshots(db)
imported_tryouts = result.get("imported_tryouts") or []
imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts)
body = _tryout_import_form_body(
websites,
updated_snapshots,
success=(
f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)."
),
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
@router.get("/snapshot-questions", include_in_schema=False)
async def snapshot_questions_view(
request: Request,
snapshot_id: int,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot)
return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body)
@router.post("/snapshot-questions/promote-bulk", include_in_schema=False)
async def snapshot_question_promote_bulk(
request: Request,
snapshot_id: int = Form(...),
snapshot_question_ids: list[int] | None = Form(None),
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
if not snapshot_question_ids:
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
body = _snapshot_questions_body(
snapshot,
questions,
promoted_items_by_slot,
error="Select at least one snapshot question to promote.",
)
return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body)
question_result = await db.execute(
select(TryoutSnapshotQuestion).where(
TryoutSnapshotQuestion.id.in_(snapshot_question_ids)
)
)
selected_questions = list(question_result.scalars().all())
created_items: list[Item] = []
existing_items: list[Item] = []
missing_option_count = 0
mismatch_count = 0
for question in selected_questions:
item, status = await _promote_snapshot_question_to_item(snapshot, question, db)
if status == "created" and item is not None:
created_items.append(item)
elif status == "existing" and item is not None:
existing_items.append(item)
elif status == "missing_options":
missing_option_count += 1
elif status == "mismatch":
mismatch_count += 1
await db.commit()
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
success_parts = []
if created_items:
success_parts.append(f"created {len(created_items)} item(s)")
if existing_items:
success_parts.append(f"reused {len(existing_items)} existing item(s)")
if missing_option_count:
success_parts.append(f"skipped {missing_option_count} question(s) with missing option text")
if mismatch_count:
success_parts.append(f"skipped {mismatch_count} mismatched question(s)")
success_message = "Bulk promote finished: " + ", ".join(success_parts) + "."
if created_items:
success_message += f" Latest basis item ID: {created_items[-1].id}."
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message)
return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f'{status["calibration_percentage"]:.2f}%',
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
rows,
)
return _render_admin_page(request, "Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
rows,
)
return _render_admin_page(request, "Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
rows,
)
return _render_admin_page(request, "Session Overview", "Session Overview", body)
@router.get("/basis-items", include_in_schema=False)
async def basis_items_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Item)
.where(Item.level == "sedang", Item.generated_by != "ai")
.order_by(Item.updated_at.desc(), Item.id.desc())
.limit(200)
)
basis_items = list(result.scalars().all())
body = _basis_items_list_body(basis_items)
return _render_admin_page(request, "Basis Items", "Basis Items", body)
@router.get("/basis-items/{basis_item_id}", include_in_schema=False)
async def basis_item_workspace_view(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
status_filter = (request.query_params.get("status") or "").strip()
level_filter = (request.query_params.get("level") or "").strip()
run_id_filter = (request.query_params.get("run_id") or "").strip()
min_frequency_filter = (request.query_params.get("min_frequency") or "").strip()
filters = {
"status": status_filter,
"level": level_filter,
"run_id": run_id_filter,
"min_frequency": min_frequency_filter,
}
basis_item = await db.get(Item, basis_item_id)
if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang":
result = await db.execute(
select(Item)
.where(Item.level == "sedang", Item.generated_by != "ai")
.order_by(Item.updated_at.desc(), Item.id.desc())
.limit(200)
)
body = _basis_items_list_body(list(result.scalars().all()))
return _render_admin_page(request, "Basis Items", "Basis Items", body)
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
runs = list(run_result.scalars().all())
variant_result = await db.execute(
select(Item).where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
variants_all = list(variant_result.scalars().all())
variants = variants_all
if status_filter:
variants = [item for item in variants if item.variant_status == status_filter]
if level_filter in {"mudah", "sulit"}:
variants = [item for item in variants if item.level == level_filter]
if run_id_filter.isdigit():
rid = int(run_id_filter)
variants = [item for item in variants if item.generation_run_id == rid]
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
if min_frequency_filter:
try:
min_freq = float(min_frequency_filter)
variants = [
item
for item in variants
if usage_metrics.get(item.id, {}).get("frequency", 0.0) >= min_freq
]
except ValueError:
pass
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
)
return _render_admin_page(request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
@router.post("/basis-items/{basis_item_id}/generate", include_in_schema=False)
async def basis_item_generate_submit(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
target_level: str = Form(...),
ai_model: str = Form(""),
generation_count: int = Form(1),
operator_notes: str = Form(""),
include_note_for_admin: str | None = Form(None),
include_note_in_prompt: str | None = Form(None),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
basis_item = await db.get(Item, basis_item_id)
if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang":
return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER)
# Llama-only policy for production quality consistency.
ai_model = settings.OPENROUTER_MODEL_LLAMA
note_for_admin = include_note_for_admin == "on"
note_in_prompt = include_note_in_prompt == "on"
if not settings.OPENROUTER_API_KEY:
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
error="OPENROUTER_API_KEY is not configured.",
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
if target_level not in {"mudah", "sulit"}:
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
if generation_count < 1 or generation_count > 50:
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
run_id = await create_generation_run(
basis_item_id=basis_item.id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
target_level=target_level,
requested_count=generation_count,
model=ai_model,
created_by=admin.username,
operator_notes=(operator_notes.strip() or None) if note_for_admin else None,
db=db,
)
generated = await generate_questions_batch(
basis_item=basis_item,
target_level=target_level,
ai_model=ai_model,
count=generation_count,
operator_notes=operator_notes if note_in_prompt else None,
)
from app.schemas.ai import GeneratedQuestion
saved = 0
for generated_question in generated:
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=generated_question.stem,
options=generated_question.options,
correct=generated_question.correct,
explanation=generated_question.explanation or None,
),
tryout_id=basis_item.tryout_id,
website_id=basis_item.website_id,
basis_item_id=basis_item.id,
slot=basis_item.slot,
level=target_level,
ai_model=ai_model,
generation_run_id=run_id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
variant_status="draft",
db=db,
)
if item_id:
saved += 1
await db.commit()
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
status_message = (
f"Run #{run_id} failed to produce savable variants. "
f"Requested={generation_count}, Generated={len(generated)}, Saved={saved}. "
"Check model output/credentials and server logs."
if saved == 0
else f"Run #{run_id} finished. Requested={generation_count}, Generated={len(generated)}, Saved={saved}."
)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
error=status_message if saved == 0 else None,
success=status_message if saved > 0 else None,
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
@router.post("/basis-items/{basis_item_id}/review-bulk", include_in_schema=False)
async def basis_item_review_bulk(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
item_ids: list[int] = Form([]),
action: str = Form(...),
):
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
admin = await _current_admin(request)
if not admin:
return _login_redirect()
basis_item = await db.get(Item, basis_item_id)
if basis_item is None:
return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER)
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
if action in valid_actions and item_ids:
result = await db.execute(
select(Item).where(
Item.id.in_(item_ids),
Item.generated_by == "ai",
Item.basis_item_id == basis_item.id,
)
)
items = list(result.scalars().all())
reviewed_at = datetime.now(timezone.utc)
for item in items:
item.variant_status = action
item.reviewed_by = admin.username
item.reviewed_at = reviewed_at
await db.commit()
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
success=f"Applied status '{action}' to selected variants.",
)
return _render_admin_page(request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
AI_PLAYGROUND_TABS = (
("generate", "Generate"),
("review", "Review Queue"),
("runs", "Runs"),
("basis", "Basis Items"),
)
AI_VARIANT_STATUSES = ("draft", "approved", "active", "rejected", "archived", "stale")
AI_VARIANT_LEVELS = ("mudah", "sulit")
def _selected_option(value: str, selected_value: str) -> str:
return "selected" if value == selected_value else ""
def _ai_tab_nav(active_tab: str) -> str:
links = []
for tab, label in AI_PLAYGROUND_TABS:
active_class = "active" if tab == active_tab else ""
aria = ' aria-current="page"' if tab == active_tab else ""
links.append(
f'<a class="{active_class}" href="/admin/ai-playground?tab={tab}"{aria}>{escape(label)}</a>'
)
return f'<nav class="tabs" aria-label="AI Playground sections">{"".join(links)}</nav>'
def _status_pill(status: str | None) -> str:
value = status or "unknown"
css_value = re.sub(r"[^a-z0-9_-]+", "-", value.lower())
return f'<span class="status-pill status-{escape(css_value)}">{escape(value)}</span>'
def _ai_status_strip(
key_configured: bool,
stats: dict[str, Any],
generation_runs: list[AIGenerationRun],
generation_summary: dict[str, Any] | None = None,
) -> str:
latest_run = "-"
latest_saved = "-"
if generation_summary:
latest_run = str(generation_summary.get("run_id", "-"))
latest_saved = str(len(generation_summary.get("saved_item_ids") or []))
elif generation_runs:
latest_run = str(generation_runs[0].id)
return f"""
<div class="compact-strip">
<div class="compact-stat"><span>OpenRouter</span><strong>{"Yes" if key_configured else "No"}</strong></div>
<div class="compact-stat"><span>AI Items</span><strong>{stats.get("total_ai_items", 0)}</strong></div>
<div class="compact-stat"><span>Latest Run</span><strong>{escape(latest_run)}</strong></div>
<div class="compact-stat"><span>Saved</span><strong>{escape(latest_saved)}</strong></div>
</div>
"""
def _ai_generation_summary(generation_summary: dict[str, Any] | None) -> str:
if not generation_summary:
return ""
saved_item_ids = generation_summary.get("saved_item_ids") or []
return f"""
<div class="compact-strip">
<div class="compact-stat"><span>Run ID</span><strong>{generation_summary.get("run_id", "-")}</strong></div>
<div class="compact-stat"><span>Requested</span><strong>{generation_summary.get("requested_count", 0)}</strong></div>
<div class="compact-stat"><span>Generated</span><strong>{generation_summary.get("generated_count", 0)}</strong></div>
<div class="compact-stat"><span>Saved</span><strong>{len(saved_item_ids)}</strong></div>
</div>
"""
def _ai_generate_tab(
basis_items: list[Item],
generation_summary: dict[str, Any] | None,
basis_item_id: str,
target_level: str,
ai_model: str,
generation_count: str,
operator_notes: str,
include_note_for_admin: bool,
include_note_in_prompt: bool,
) -> str:
seed_callout = ""
if not basis_items:
seed_callout = """
<div class="success">No <code>sedang</code> basis items found yet.</div>
<form method="post" action="/admin/ai-playground/seed-demo">
<button type="submit">Seed Demo Basis Item</button>
</form>
"""
selected_basis_id = str(basis_item_id or "")
basis_options = ['<option value="">Select a sedang basis item</option>']
for item in basis_items:
item_id = str(item.id)
selected = _selected_option(item_id, selected_basis_id)
stem_preview = _truncate(_html_to_text(item.stem), 82)
basis_options.append(
f'<option value="{item.id}" {selected}>#{item.id} | Tryout {escape(str(item.tryout_id))} | Slot {item.slot} | {escape(stem_preview)}</option>'
)
return f"""
<section class="tab-panel">
{seed_callout}
{_ai_generation_summary(generation_summary)}
<form method="post" action="/admin/ai-playground?tab=generate" autocomplete="off">
<div class="field-grid">
<div>
<label for="basis_item_id">Basis Item</label>
<select id="basis_item_id" name="basis_item_id" required>
{"".join(basis_options)}
</select>
</div>
<div>
<label for="target_level">Target Level</label>
<select id="target_level" name="target_level">
<option value="mudah" {_selected_option("mudah", target_level)}>mudah</option>
<option value="sulit" {_selected_option("sulit", target_level)}>sulit</option>
</select>
</div>
<div>
<label for="generation_count">Generate Count</label>
<input id="generation_count" name="generation_count" type="number" min="1" max="50" value="{escape(generation_count)}">
</div>
<div>
<label for="ai_model">Model</label>
<input id="ai_model" name="ai_model" type="text" value="{escape(ai_model or settings.OPENROUTER_MODEL_LLAMA)}" readonly style="background:#f8fafc;">
</div>
<div class="wide">
<label for="operator_notes">Optional Notes</label>
<textarea id="operator_notes" name="operator_notes" rows="3" placeholder="Optional generation note for this run">{escape(operator_notes)}</textarea>
</div>
</div>
<label class="row"><input type="checkbox" name="include_note_for_admin" {"checked" if include_note_for_admin else ""}> Save note for admin team</label>
<label class="row"><input type="checkbox" name="include_note_in_prompt" {"checked" if include_note_in_prompt else ""}> Include note in AI prompt payload</label>
<div class="actions">
<button type="submit">Generate Run</button>
<a class="secondary-link" href="/admin/ai-playground?tab=basis">Find Basis Item</a>
</div>
</form>
</section>
"""
def _ai_basis_tab(basis_items: list[Item]) -> str:
rows = []
for item in basis_items:
stem_preview = _truncate(_html_to_text(item.stem), 140)
rows.append(
"<tr>"
f"<td>{item.id}</td>"
f"<td>{escape(str(item.tryout_id))}</td>"
f"<td>{item.slot}</td>"
f"<td>{item.website_id}</td>"
f"<td>{escape(stem_preview)}</td>"
f"<td><a class=\"button-link\" href=\"/admin/ai-playground?tab=generate&basis_item_id={item.id}\">Use</a></td>"
"</tr>"
)
table = (
"<div class=\"table-wrap\"><table><thead><tr><th>Item ID</th><th>Tryout</th><th>Slot</th><th>Website</th><th>Stem</th><th>Action</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"6\">No sedang basis items found.</td></tr>")
+ "</tbody></table></div>"
)
return f"""
<section class="tab-panel">
<div class="actions" style="margin-top:0">
<form method="post" action="/admin/ai-playground/seed-demo">
<button type="submit">Seed Demo Basis Item</button>
</form>
</div>
{table}
</section>
"""
def _ai_runs_tab(
generation_runs: list[AIGenerationRun],
generation_summary: dict[str, Any] | None,
) -> str:
rows = []
for run in generation_runs:
rows.append(
"<tr>"
f"<td>{run.id}</td>"
f"<td>{run.basis_item_id}</td>"
f"<td>{escape(run.target_level)}</td>"
f"<td>{run.requested_count}</td>"
f"<td>{escape(_truncate(run.model, 54))}</td>"
f"<td>{escape(run.created_by)}</td>"
f"<td>{escape(str(run.created_at))}</td>"
f"<td><a class=\"secondary-link\" href=\"/admin/ai-playground?tab=review&run_id={run.id}\">Review</a></td>"
"</tr>"
)
table = (
"<div class=\"table-wrap\"><table><thead><tr><th>Run ID</th><th>Basis Item</th><th>Target</th><th>Requested</th><th>Model</th><th>Created By</th><th>Created At</th><th>Action</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"8\">No generation runs yet.</td></tr>")
+ "</tbody></table></div>"
)
return f"""
<section class="tab-panel">
{_ai_generation_summary(generation_summary)}
{table}
</section>
"""
def _ai_review_tab(
generated_variants: list[Item],
status_filter: str,
level_filter: str,
run_id_filter: str,
) -> str:
status_options = ['<option value="">All statuses</option>']
for status in AI_VARIANT_STATUSES:
status_options.append(
f'<option value="{status}" {_selected_option(status, status_filter)}>{status}</option>'
)
level_options = ['<option value="">All levels</option>']
for level in AI_VARIANT_LEVELS:
level_options.append(
f'<option value="{level}" {_selected_option(level, level_filter)}>{level}</option>'
)
variant_rows = []
for item in generated_variants:
stem_preview = _truncate(_html_to_text(item.stem), 120)
variant_rows.append(
"<tr>"
f"<td><input type=\"checkbox\" name=\"item_ids\" value=\"{item.id}\"></td>"
f"<td>{item.id}</td>"
f"<td>{item.generation_run_id or '-'}</td>"
f"<td>{item.basis_item_id or '-'}</td>"
f"<td>{escape(item.level)}</td>"
f"<td>{_status_pill(item.variant_status)}</td>"
f"<td>{escape(_truncate(item.ai_model or '-', 42))}</td>"
f"<td>{escape(stem_preview)}</td>"
f"<td>{escape(str(item.created_at))}</td>"
f"<td><a class=\"secondary-link\" href=\"/admin/ai-playground/variants/{item.id}\">View</a></td>"
"</tr>"
)
variant_table_rows = (
"".join(variant_rows)
if variant_rows
else '<tr><td colspan="10">No AI-generated variants match this view.</td></tr>'
)
return f"""
<section class="tab-panel">
<form class="toolbar" method="get" action="/admin/ai-playground">
<input type="hidden" name="tab" value="review">
<label>Status
<select name="status">{"".join(status_options)}</select>
</label>
<label>Level
<select name="level">{"".join(level_options)}</select>
</label>
<label>Run ID
<input name="run_id" type="number" min="1" value="{escape(run_id_filter)}">
</label>
<button type="submit">Filter</button>
<a class="secondary-link" href="/admin/ai-playground?tab=review">Clear</a>
</form>
<form method="post" action="/admin/ai-playground/review-bulk?tab=review">
<div class="actions" style="margin:16px 0">
<select name="action" style="max-width:260px">
<option value="approved">Approve selected</option>
<option value="rejected">Reject selected</option>
<option value="archived">Archive selected</option>
<option value="stale">Mark stale</option>
<option value="active">Activate selected</option>
</select>
<button type="submit">Apply</button>
</div>
<div class="table-wrap">
<table>
<thead>
<tr><th><input type="checkbox" onclick="document.querySelectorAll('input[name=&quot;item_ids&quot;]').forEach(el => el.checked = this.checked)"></th><th>Item ID</th><th>Run ID</th><th>Basis</th><th>Level</th><th>Status</th><th>Model</th><th>Stem</th><th>Created At</th><th>Action</th></tr>
</thead>
<tbody>
{variant_table_rows}
</tbody>
</table>
</div>
</form>
</section>
"""
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
error: str | None = None,
success: str | None = None,
generation_summary: dict[str, Any] | None = None,
basis_items: list[Item] | None = None,
generation_runs: list[AIGenerationRun] | None = None,
generated_variants: list[Item] | None = None,
basis_item_id: str = "",
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_LLAMA,
generation_count: str = "1",
operator_notes: str = "",
include_note_for_admin: bool = True,
include_note_in_prompt: bool = False,
active_tab: str = "generate",
variant_status_filter: str = "",
variant_level_filter: str = "",
variant_run_id_filter: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
basis_items = basis_items or []
generation_runs = generation_runs or []
generated_variants = generated_variants or []
if active_tab not in {tab for tab, _ in AI_PLAYGROUND_TABS}:
active_tab = "generate"
tab_html = {
"generate": _ai_generate_tab(
basis_items,
generation_summary,
basis_item_id,
target_level,
ai_model,
generation_count,
operator_notes,
include_note_for_admin,
include_note_in_prompt,
),
"review": _ai_review_tab(
generated_variants,
variant_status_filter,
variant_level_filter,
variant_run_id_filter,
),
"runs": _ai_runs_tab(generation_runs, generation_summary),
"basis": _ai_basis_tab(basis_items),
}[active_tab]
return f"""
{_ai_status_strip(key_configured, stats, generation_runs, generation_summary)}
{success_html}
{error_html}
{_ai_tab_nav(active_tab)}
{tab_html}
"""
def _options_table(options: Any, correct_answer: str | None) -> str:
normalized_correct = str(correct_answer or "").strip().upper()
rows = []
if isinstance(options, dict):
options_by_key = {str(key).strip().upper(): value for key, value in options.items()}
option_keys = [key for key in ("A", "B", "C", "D") if key in options_by_key]
option_keys.extend(sorted(key for key in options_by_key.keys() if key not in option_keys))
for key in option_keys:
value = options_by_key.get(key)
row_class = ' class="correct-option"' if str(key).upper() == normalized_correct else ""
rows.append(
f"<tr{row_class}>"
f'<td class="option-key">{escape(str(key).upper())}</td>'
f"<td>{escape(_html_to_text(str(value)))}</td>"
"</tr>"
)
else:
rows.append(
f'<tr><td colspan="2">{escape(_html_to_text(str(options or "")))}</td></tr>'
)
return (
'<div class="table-wrap"><table><thead><tr><th>Option</th><th>Text</th></tr></thead><tbody>'
+ ("".join(rows) if rows else '<tr><td colspan="2">No options stored.</td></tr>')
+ "</tbody></table></div>"
)
def _ai_variant_detail_body(variant: Item, basis_item: Item | None) -> str:
explanation = _html_to_text(variant.explanation) if variant.explanation else "-"
basis_preview = "-"
if basis_item is not None:
basis_preview = (
f"#{basis_item.id} | Tryout {escape(str(basis_item.tryout_id))} | "
f"Slot {basis_item.slot} | {escape(_truncate(_html_to_text(basis_item.stem), 160))}"
)
review_url = "/admin/ai-playground?tab=review"
if variant.generation_run_id:
review_url = f"{review_url}&run_id={variant.generation_run_id}"
return f"""
<div class="compact-strip">
<div class="compact-stat"><span>Item</span><strong>{variant.id}</strong></div>
<div class="compact-stat"><span>Run</span><strong>{variant.generation_run_id or "-"}</strong></div>
<div class="compact-stat"><span>Level</span><strong>{escape(variant.level)}</strong></div>
<div class="compact-stat"><span>Status</span><strong>{escape(variant.variant_status)}</strong></div>
</div>
<div class="question-block">
<h3>Question</h3>
<p>{escape(_html_to_text(variant.stem))}</p>
</div>
<h3>Answer Options</h3>
{_options_table(variant.options, variant.correct_answer)}
<div class="question-block">
<h3>Correct Answer</h3>
<p><strong>{escape(variant.correct_answer)}</strong></p>
<h3>Pembahasan</h3>
<p>{escape(explanation)}</p>
</div>
<div class="question-block">
<h3>Generation Context</h3>
<p class="muted">Basis item: <strong>{basis_preview}</strong></p>
<p class="muted">Model: <strong>{escape(variant.ai_model or "-")}</strong></p>
<p class="muted">Created at: <strong>{escape(str(variant.created_at))}</strong></p>
</div>
<form method="post" action="/admin/ai-playground/review-bulk?tab=review">
<input type="hidden" name="item_ids" value="{variant.id}">
<div class="actions">
<select name="action" style="max-width:260px">
<option value="approved">Approve this item</option>
<option value="rejected">Reject this item</option>
<option value="archived">Archive this item</option>
<option value="stale">Mark stale</option>
<option value="active">Activate this item</option>
</select>
<button type="submit">Apply</button>
<a class="secondary-link" href="{review_url}">Back to Review Queue</a>
</div>
</form>
"""
@router.get("/ai-playground", include_in_schema=False)
async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
basis_item_id = request.query_params.get("basis_item_id", "")
active_tab = request.query_params.get("tab", "generate")
if active_tab not in {tab for tab, _ in AI_PLAYGROUND_TABS}:
active_tab = "generate"
status_filter = request.query_params.get("status", "")
if status_filter not in AI_VARIANT_STATUSES:
status_filter = ""
level_filter = request.query_params.get("level", "")
if level_filter not in AI_VARIANT_LEVELS:
level_filter = ""
run_id_filter = request.query_params.get("run_id", "").strip()
run_id_filter_int = int(run_id_filter) if run_id_filter.isdigit() else None
generation_runs = await _recent_generation_runs(db)
selected_basis_item_id: int | None = None
if basis_item_id and str(basis_item_id).isdigit():
selected_basis_item_id = int(str(basis_item_id))
generated_variants = await _recent_generated_variants(
db,
basis_item_id=selected_basis_item_id,
status_filter=status_filter or None,
level_filter=level_filter or None,
run_id_filter=run_id_filter_int,
)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id or ""),
active_tab=active_tab,
variant_status_filter=status_filter,
variant_level_filter=level_filter,
variant_run_id_filter=run_id_filter if run_id_filter_int is not None else "",
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
@router.get("/ai-playground/variants/{item_id}", include_in_schema=False)
async def ai_playground_variant_detail(
item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Item).where(Item.id == item_id, Item.generated_by == "ai")
)
variant = result.scalar_one_or_none()
if variant is None:
body = """
<div class="error">Generated variant was not found.</div>
<a class="secondary-link" href="/admin/ai-playground?tab=review">Back to Review Queue</a>
"""
return _render_admin_page(request, "Generated Variant", "Generated Variant", body)
basis_item = None
if variant.basis_item_id:
basis_item = await db.get(Item, variant.basis_item_id)
body = _ai_variant_detail_body(variant, basis_item)
return _render_admin_page(
request,
f"Generated Variant #{variant.id}",
f"Generated Variant #{variant.id}",
body,
)
@router.post("/ai-playground/seed-demo", include_in_schema=False)
async def ai_playground_seed_demo(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
demo_item = await _find_or_create_demo_basis_item(db)
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
generation_runs = await _recent_generation_runs(db)
generated_variants = await _recent_generated_variants(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
success=f"Demo basis item is ready: item #{demo_item.id}, tryout {demo_item.tryout_id}, slot {demo_item.slot}.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(demo_item.id),
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
@router.post("/ai-playground", include_in_schema=False)
async def ai_playground_submit(
request: Request,
db: AsyncSession = Depends(get_db),
basis_item_id: int = Form(...),
target_level: str = Form(...),
ai_model: str = Form(""),
generation_count: int = Form(1),
operator_notes: str = Form(""),
include_note_for_admin: str | None = Form(None),
include_note_in_prompt: str | None = Form(None),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
generation_runs = await _recent_generation_runs(db)
generated_variants = await _recent_generated_variants(db)
# Llama-only policy for production quality consistency.
ai_model = settings.OPENROUTER_MODEL_LLAMA
note_for_admin = include_note_for_admin == "on"
note_in_prompt = include_note_in_prompt == "on"
if not settings.OPENROUTER_API_KEY:
body = _ai_form_body(
False,
stats,
error="OPENROUTER_API_KEY is not configured in the environment.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
if target_level not in {"mudah", "sulit"}:
body = _ai_form_body(
True,
stats,
error="Target level must be mudah or sulit.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
result = await db.execute(select(Item).where(Item.id == basis_item_id))
basis_item = result.scalar_one_or_none()
if not basis_item:
body = _ai_form_body(
True,
stats,
error=f"Basis item not found: {basis_item_id}",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
if basis_item.level != "sedang":
body = _ai_form_body(
True,
stats,
error=f"Basis item must be sedang level, got: {basis_item.level}",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
if generation_count < 1 or generation_count > 50:
body = _ai_form_body(
True,
stats,
error="Generate count must be between 1 and 50.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
run_id = await create_generation_run(
basis_item_id=basis_item.id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
target_level=target_level,
requested_count=generation_count,
model=ai_model,
created_by=admin.username,
operator_notes=(operator_notes.strip() or None) if note_for_admin else None,
db=db,
)
generated = await generate_questions_batch(
basis_item=basis_item,
target_level=target_level,
ai_model=ai_model,
count=generation_count,
operator_notes=operator_notes if note_in_prompt else None,
)
saved_item_ids: list[int] = []
from app.schemas.ai import GeneratedQuestion
for generated_question in generated:
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=generated_question.stem,
options=generated_question.options,
correct=generated_question.correct,
explanation=generated_question.explanation or None,
),
tryout_id=basis_item.tryout_id,
website_id=basis_item.website_id,
basis_item_id=basis_item.id,
slot=basis_item.slot,
level=target_level,
ai_model=ai_model,
generation_run_id=run_id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
variant_status="draft",
db=db,
)
if item_id:
saved_item_ids.append(item_id)
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
updated_runs = await _recent_generation_runs(db)
updated_variants = await _recent_generated_variants(db)
if not saved_item_ids:
body = _ai_form_body(
True,
updated_stats,
error="Generation run completed but no items were saved. Check model output and logs.",
basis_items=updated_basis_items,
generation_runs=updated_runs,
generated_variants=updated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
body = _ai_form_body(
True,
updated_stats,
success=f"Generation run #{run_id} completed. Saved {len(saved_item_ids)} item(s).",
generation_summary={
"run_id": run_id,
"requested_count": generation_count,
"generated_count": len(generated),
"saved_item_ids": saved_item_ids,
},
basis_items=updated_basis_items,
generation_runs=updated_runs,
generated_variants=updated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
@router.post("/ai-playground/save", include_in_schema=False)
async def ai_playground_save(
request: Request,
db: AsyncSession = Depends(get_db),
basis_item_id: int = Form(...),
tryout_id: str = Form(...),
website_id: int = Form(...),
slot: int = Form(...),
target_level: str = Form(...),
ai_model: str = Form(...),
stem: str = Form(...),
options_json: str = Form(...),
correct: str = Form(...),
explanation: str = Form(""),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
if target_level not in {"mudah", "sulit"}:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Only mudah or sulit generated items can be saved from the playground.",
basis_items=basis_items,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
try:
options = json.loads(options_json)
except json.JSONDecodeError:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Generated options payload is invalid.",
basis_items=basis_items,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
from app.schemas.ai import GeneratedQuestion
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=stem,
options=options,
correct=correct,
explanation=explanation or None,
),
tryout_id=tryout_id,
website_id=website_id,
basis_item_id=basis_item_id,
slot=slot,
level=target_level,
ai_model=ai_model,
variant_status="draft",
db=db,
)
if not item_id:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Failed to save generated item. Check server logs for the database error.",
basis_items=basis_items,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
updated_stats,
success=f"Generated item saved successfully as item #{item_id}.",
basis_items=updated_basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
@router.post("/ai-playground/review-bulk", include_in_schema=False)
async def ai_playground_review_bulk(
request: Request,
db: AsyncSession = Depends(get_db),
item_ids: list[int] = Form([]),
action: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
generation_runs = await _recent_generation_runs(db)
generated_variants = await _recent_generated_variants(db)
if action not in valid_actions:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Invalid review action.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
active_tab="review",
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
if not item_ids:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Select at least one generated item.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
active_tab="review",
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
result = await db.execute(
select(Item).where(Item.id.in_(item_ids), Item.generated_by == "ai")
)
items = list(result.scalars().all())
if not items:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="No matching AI-generated items were found for the selected IDs.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
active_tab="review",
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
reviewed_at = datetime.now(timezone.utc)
for item in items:
item.variant_status = action
item.reviewed_by = admin.username
item.reviewed_at = reviewed_at
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
updated_runs = await _recent_generation_runs(db)
updated_variants = await _recent_generated_variants(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
updated_stats,
success=f"Updated {len(items)} item(s) to status '{action}'.",
basis_items=updated_basis_items,
generation_runs=updated_runs,
generated_variants=updated_variants,
active_tab="review",
)
return _render_admin_page(request, "AI Playground", "AI Playground", body)
@router.get("/tryout/list", include_in_schema=False)
@router.get("/item/list", include_in_schema=False)
@router.get("/user/list", include_in_schema=False)
@router.get("/session/list", include_in_schema=False)
@router.get("/tryoutstats/list", include_in_schema=False)
async def legacy_admin_paths(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
return _dashboard_redirect()