Files
yellow-bank-soal/app/admin_web.py

2535 lines
96 KiB
Python

"""
Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions.
This replaces the previous fastapi-admin runtime path, which depended on
Tortoise-oriented internals that do not match this project.
"""
import secrets
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from html import escape
import json
from typing import Any
import aioredis
from fastapi import APIRouter, Depends, File, Form, Request, UploadFile
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import HTMLResponse, RedirectResponse
from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED
from app.core.config import get_settings
from app.database import get_db
from app.models import (
AIGenerationRun,
Item,
Session,
Tryout,
TryoutImportSnapshot,
TryoutSnapshotQuestion,
UserAnswer,
Website,
)
from app.services.ai_generation import (
SUPPORTED_MODELS,
create_generation_run,
generate_questions_batch,
get_ai_stats,
save_ai_question,
validate_ai_model,
)
from app.services.irt_calibration import get_calibration_status
from app.services.tryout_json_import import (
TryoutImportError,
import_tryout_json_snapshot,
preview_tryout_json_import,
)
settings = get_settings()
router = APIRouter(prefix="/admin", tags=["admin-web"])
SESSION_COOKIE = "access_token"
SESSION_PREFIX = "admin:session:"
IMPORT_PREVIEW_PREFIX = "admin:import-preview:"
IMPORT_PREVIEW_TTL_SECONDS = 900
_admin_redis = None
@dataclass
class AdminPrincipal:
username: str
async def configure_admin_web() -> None:
global _admin_redis
if _admin_redis is not None:
return
if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD:
raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.")
_admin_redis = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
async def shutdown_admin_web() -> None:
global _admin_redis
if _admin_redis is None:
return
try:
await _admin_redis.close()
finally:
_admin_redis = None
async def _current_admin(request: Request) -> AdminPrincipal | None:
if _admin_redis is None:
return None
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
username = await _admin_redis.get(f"{SESSION_PREFIX}{token}")
if not username:
return None
return AdminPrincipal(username=str(username))
def _login_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER)
def _dashboard_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER)
def _render_auth_page(
request: Request,
title: str,
subtitle: str,
body: str,
status_code: int = 200,
) -> HTMLResponse:
remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else ""
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)}</title>
<style>
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: linear-gradient(135deg, #f8fafc, #e2e8f0); font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; color: #0f172a; }}
.panel {{ width: min(420px, calc(100vw - 32px)); background: rgba(255,255,255,0.96); border-radius: 18px; box-shadow: 0 18px 60px rgba(15, 23, 42, 0.14); padding: 28px; }}
h1 {{ margin: 0 0 8px; font-size: 28px; }}
p {{ margin: 0 0 20px; color: #475569; }}
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
input {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
.row {{ display: flex; align-items: center; gap: 10px; margin-top: 16px; color: #334155; font-size: 14px; }}
.row input {{ width: auto; }}
button {{ width: 100%; margin-top: 18px; border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
.muted {{ color: #64748b; font-size: 13px; margin-top: 14px; }}
a {{ color: #0f172a; }}
</style>
</head>
<body>
<main class="panel">
<h1>{escape(title)}</h1>
<p>{escape(subtitle)}</p>
{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
</main>
</body>
</html>"""
return HTMLResponse(html, status_code=status_code)
def _render_admin_page(title: str, page_title: str, body: str) -> HTMLResponse:
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)}</title>
<style>
body {{ margin: 0; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; background: #f5f7fb; color: #162033; }}
.layout {{ display: grid; grid-template-columns: 240px 1fr; min-height: 100vh; }}
.sidebar {{ background: #0f172a; color: #e2e8f0; padding: 24px 16px; }}
.sidebar h1 {{ font-size: 18px; margin: 0 0 24px; }}
.sidebar a {{ display: block; color: #cbd5e1; text-decoration: none; padding: 10px 12px; border-radius: 8px; margin-bottom: 8px; }}
.sidebar a:hover {{ background: #1e293b; color: #fff; }}
.content {{ padding: 32px; }}
.card {{ background: #fff; border-radius: 14px; padding: 24px; box-shadow: 0 8px 30px rgba(15, 23, 42, 0.08); }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-top: 20px; }}
.stat {{ padding: 18px; border: 1px solid #e2e8f0; border-radius: 12px; background: #f8fafc; }}
.stat strong {{ display: block; font-size: 26px; margin-top: 6px; }}
table {{ width: 100%; border-collapse: collapse; margin-top: 16px; background: #fff; border-radius: 12px; overflow: hidden; }}
th, td {{ padding: 12px 14px; border-bottom: 1px solid #e5e7eb; text-align: left; vertical-align: top; font-size: 14px; }}
th {{ background: #f8fafc; font-weight: 600; }}
input, select, textarea {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
button {{ border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
.actions {{ display: flex; gap: 12px; flex-wrap: wrap; margin-top: 18px; }}
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
.success {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #ecfdf5; color: #166534; border: 1px solid #86efac; }}
.muted {{ color: #64748b; font-size: 14px; }}
</style>
</head>
<body>
<div class="layout">
<aside class="sidebar">
<h1>IRT Bank Soal Admin</h1>
<a href="/admin/dashboard">Dashboard</a>
<a href="/admin/websites">Websites</a>
<a href="/admin/tryout-import">Tryout Import</a>
<a href="/admin/basis-items">Basis Items</a>
<a href="/admin/calibration-status">Calibration Status</a>
<a href="/admin/item-statistics">Item Statistics</a>
<a href="/admin/session-overview">Session Overview</a>
<a href="/admin/ai-playground">AI Playground</a>
<a href="/admin/password">Password Info</a>
<a href="/admin/logout">Logout</a>
</aside>
<main class="content">
<div class="card">
<h2>{escape(page_title)}</h2>
{body}
</div>
</main>
</div>
</body>
</html>"""
return HTMLResponse(html)
def _table(headers: list[str], rows: list[list[Any]]) -> str:
head = "".join(f"<th>{escape(str(header))}</th>" for header in headers)
body_rows = []
for row in rows:
cols = "".join(f"<td>{escape(str(value))}</td>" for value in row)
body_rows.append(f"<tr>{cols}</tr>")
body = "".join(body_rows) or f"<tr><td colspan=\"{len(headers)}\">No data</td></tr>"
return f"<table><thead><tr>{head}</tr></thead><tbody>{body}</tbody></table>"
def _truncate(text: str | None, max_length: int = 120) -> str:
if not text:
return ""
if len(text) <= max_length:
return text
return f"{text[: max_length - 3]}..."
def _websites_form_body(
websites: list[Website],
error: str | None = None,
success: str | None = None,
site_name: str = "",
site_url: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
body_rows = []
for website in websites:
actions = f"""
<div class="actions" style="margin-top:0">
<a href="/admin/websites/{website.id}/edit" style="display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;">Edit</a>
<form method="post" action="/admin/websites/{website.id}/delete" onsubmit="return confirm('Delete website {escape(website.site_name)} and all related tryouts, items, sessions, and snapshots?');" style="margin:0">
<button type="submit" style="background:#991b1b;">Delete</button>
</form>
</div>
"""
body_rows.append(
"<tr>"
f"<td>{website.id}</td>"
f"<td>{escape(website.site_name)}</td>"
f"<td>{escape(website.site_url)}</td>"
f"<td>{actions}</td>"
"</tr>"
)
if body_rows:
websites_table = (
"<table><thead><tr><th>ID</th><th>Name</th><th>URL</th><th>Actions</th></tr></thead><tbody>"
+ "".join(body_rows)
+ "</tbody></table>"
)
else:
websites_table = _table(["ID", "Name", "URL", "Actions"], [])
return f"""
<p class="muted">Register websites here so imports and tryout references can be tied to a known source site.</p>
{success_html}
{error_html}
<form method="post" action="/admin/websites" autocomplete="off">
<label for="site_name">Website Name</label>
<input id="site_name" name="site_name" type="text" value="{escape(site_name)}" placeholder="Sejoli Demo Site">
<label for="site_url">Website URL</label>
<input id="site_url" name="site_url" type="url" value="{escape(site_url)}" placeholder="https://example.com">
<button type="submit">Add Website</button>
</form>
<h3 style="margin-top:24px">Registered Websites</h3>
<p class="muted">Use the website ID when importing read-only tryout snapshots.</p>
{websites_table}
"""
def _website_edit_form_body(
website: Website,
error: str | None = None,
success: str | None = None,
site_name: str | None = None,
site_url: str | None = None,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
display_name = website.site_name if site_name is None else site_name
display_url = website.site_url if site_url is None else site_url
return f"""
<p class="muted">Website ID: <strong>{website.id}</strong></p>
{success_html}
{error_html}
<form method="post" action="/admin/websites/{website.id}/edit" autocomplete="off">
<label for="site_name">Website Name</label>
<input id="site_name" name="site_name" type="text" value="{escape(display_name)}">
<label for="site_url">Website URL</label>
<input id="site_url" name="site_url" type="url" value="{escape(display_url)}">
<div class="actions">
<button type="submit">Save Changes</button>
<a href="/admin/websites" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Back</a>
</div>
</form>
"""
def _tryout_import_form_body(
websites: list[Website],
recent_snapshots: list[TryoutImportSnapshot],
error: str | None = None,
success: str | None = None,
selected_website_id: int | None = None,
preview: dict[str, Any] | None = None,
preview_token: str | None = None,
upload_filename: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
website_options = ['<option value="">Select website</option>']
for website in websites:
selected = "selected" if selected_website_id == website.id else ""
website_options.append(
f'<option value="{website.id}" {selected}>{escape(website.site_name)} (#{website.id})</option>'
)
website_map = {website.id: website.site_name for website in websites}
snapshot_rows = []
for snapshot in recent_snapshots:
snapshot_rows.append(
"<tr>"
f"<td>{snapshot.id}</td>"
f"<td>{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})</td>"
f"<td>{escape(snapshot.source_tryout_id)}</td>"
f"<td>{escape(snapshot.title)}</td>"
f"<td>{snapshot.question_count}</td>"
f"<td>{escape(str(snapshot.created_at))}</td>"
f"<td><a href=\"/admin/snapshot-questions?snapshot_id={snapshot.id}\" "
"style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Browse</a></td>"
"</tr>"
)
snapshots_table = (
"<table><thead><tr><th>Snapshot ID</th><th>Website</th><th>Tryout ID</th><th>Title</th><th>Questions</th><th>Imported At</th><th>Actions</th></tr></thead><tbody>"
+ ("".join(snapshot_rows) if snapshot_rows else "<tr><td colspan=\"7\">No data</td></tr>")
+ "</tbody></table>"
)
preview_html = ""
if preview:
totals = preview.get("totals") or {}
tryout_rows = []
for tryout in preview.get("tryouts") or []:
diff = tryout.get("question_diff") or {}
warnings = "; ".join(tryout.get("warnings") or []) or "-"
tryout_rows.append(
[
tryout.get("source_tryout_id"),
tryout.get("title"),
diff.get("total_questions", 0),
diff.get("new_questions", 0),
diff.get("updated_questions", 0),
diff.get("unchanged_questions", 0),
diff.get("removed_questions", 0),
warnings,
]
)
import_form = ""
if preview_token and selected_website_id:
import_form = f"""
<form method="post" action="/admin/tryout-import" autocomplete="off">
<input type="hidden" name="website_id" value="{selected_website_id}">
<input type="hidden" name="preview_token" value="{escape(preview_token)}">
<button type="submit">Import Snapshot</button>
</form>
"""
preview_html = f"""
<h3 style="margin-top:24px">Preview Summary</h3>
<p class="muted">File: <strong>{escape(upload_filename or "uploaded JSON")}</strong></p>
<div class="grid">
<div class="stat">Tryouts<strong>{preview.get("tryout_count", 0)}</strong></div>
<div class="stat">New Questions<strong>{totals.get("new_questions", 0)}</strong></div>
<div class="stat">Updated Questions<strong>{totals.get("updated_questions", 0)}</strong></div>
<div class="stat">Removed Questions<strong>{totals.get("removed_questions", 0)}</strong></div>
</div>
{_table(
["Tryout ID", "Title", "Total", "New", "Updated", "Unchanged", "Removed", "Warnings"],
tryout_rows,
)}
<div class="actions">{import_form}</div>
"""
return f"""
<p class="muted">Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.</p>
<p class="muted">Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.</p>
{success_html}
{error_html}
<form method="post" action="/admin/tryout-import/preview" enctype="multipart/form-data" autocomplete="off">
<label for="website_id">Website</label>
<select id="website_id" name="website_id">{''.join(website_options)}</select>
<label for="file">Tryout Export JSON</label>
<input id="file" name="file" type="file" accept=".json,application/json">
<button type="submit">Preview Import</button>
</form>
{preview_html}
<h3 style="margin-top:24px">Recent Snapshots</h3>
<p class="muted">These are archived imports stored in PostgreSQL for traceability.</p>
{snapshots_table}
"""
def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]:
slot_map: dict[str, int] = {}
questions = (snapshot.raw_payload or {}).get("questions") or []
for index, question in enumerate(questions, start=1):
source_question_id = str((question or {}).get("id") or "").strip()
if source_question_id:
slot_map[source_question_id] = index
return slot_map
def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]:
item_options: dict[str, str] = {}
for option in raw_options or []:
if not isinstance(option, dict):
continue
increment = str(option.get("increment") or "").strip().upper()
text = str(option.get("text") or option.get("label") or "").strip()
if increment and text:
item_options[increment] = text
return item_options
def _snapshot_questions_body(
snapshot: TryoutImportSnapshot,
questions: list[TryoutSnapshotQuestion],
promoted_items_by_slot: dict[int, Item],
error: str | None = None,
success: str | None = None,
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
slot_map = _snapshot_slot_map(snapshot)
rows = []
for question in questions:
slot = slot_map.get(question.source_question_id, 0)
promoted_item = promoted_items_by_slot.get(slot)
if promoted_item:
select_html = ""
action_html = (
f'Item #{promoted_item.id} already exists. '
f'<a href="/admin/ai-playground?basis_item_id={promoted_item.id}">Open in AI Playground</a>'
)
else:
select_html = f'<input type="checkbox" name="snapshot_question_ids" value="{question.id}">'
action_html = "Ready to promote"
rows.append(
"<tr>"
f"<td>{select_html}</td>"
f"<td>{slot or '-'}</td>"
f"<td>{escape(question.source_question_id)}</td>"
f"<td>{escape(question.correct_answer)}</td>"
f"<td>{question.option_count}</td>"
f"<td>{'Yes' if question.is_active else 'No'}</td>"
f"<td>{escape(_truncate(question.question_title or question.question_html, 100))}</td>"
f"<td>{action_html}</td>"
"</tr>"
)
questions_table = (
"<form method=\"post\" action=\"/admin/snapshot-questions/promote-bulk\">"
f"<input type=\"hidden\" name=\"snapshot_id\" value=\"{snapshot.id}\">"
"<div class=\"actions\" style=\"margin:16px 0\">"
"<button type=\"submit\">Promote Selected as Basis Items</button>"
"</div>"
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name=&quot;snapshot_question_ids&quot;]').forEach(el => el.checked = this.checked)\"></th><th>Slot</th><th>Source Question ID</th><th>Correct</th><th>Options</th><th>Active</th><th>Stem</th><th>Action</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"8\">No data</td></tr>")
+ "</tbody></table>"
"</form>"
)
return f"""
<p class="muted">Snapshot ID: <strong>{snapshot.id}</strong> | Website: <strong>{snapshot.website_id}</strong> | Tryout: <strong>{escape(snapshot.source_tryout_id)}</strong></p>
<p class="muted">Promote selected snapshot questions into the live <code>items</code> table as <code>sedang</code> basis items for AI generation.</p>
{success_html}
{error_html}
{questions_table}
<p style="margin-top:20px"><a href="/admin/tryout-import">Back to Tryout Import</a></p>
"""
async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]:
result = await db.execute(
select(Item)
.where(Item.level == "sedang")
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(limit)
)
return list(result.scalars().all())
async def _recent_generation_runs(db: AsyncSession, limit: int = 20) -> list[AIGenerationRun]:
result = await db.execute(
select(AIGenerationRun).order_by(AIGenerationRun.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _recent_generated_variants(
db: AsyncSession,
limit: int = 100,
basis_item_id: int | None = None,
) -> list[Item]:
stmt = select(Item).where(Item.generated_by == "ai")
if basis_item_id is not None:
stmt = stmt.where(Item.basis_item_id == basis_item_id)
result = await db.execute(
stmt.order_by(Item.created_at.desc(), Item.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _usage_metrics_for_items(
db: AsyncSession,
item_ids: list[int],
) -> dict[int, dict[str, float]]:
if not item_ids:
return {}
result = await db.execute(
select(
UserAnswer.item_id,
func.count(UserAnswer.id).label("impressions"),
func.count(func.distinct(UserAnswer.wp_user_id)).label("unique_users"),
)
.where(UserAnswer.item_id.in_(item_ids))
.group_by(UserAnswer.item_id)
)
metrics: dict[int, dict[str, float]] = {}
for item_id, impressions, unique_users in result.all():
impressions_i = int(impressions or 0)
unique_users_i = int(unique_users or 0)
frequency = (impressions_i / unique_users_i) if unique_users_i else 0.0
metrics[int(item_id)] = {
"impressions": float(impressions_i),
"unique_users": float(unique_users_i),
"frequency": float(frequency),
}
return metrics
async def _family_usage_stats(
db: AsyncSession,
basis_item: Item,
variants: list[Item],
) -> tuple[dict[int, dict[str, float]], dict[str, float]]:
family_item_ids = [basis_item.id] + [item.id for item in variants]
usage_metrics = await _usage_metrics_for_items(db, family_item_ids)
family_impressions = int(sum(metric["impressions"] for metric in usage_metrics.values()))
family_unique_users = int(
await db.scalar(
select(func.count(func.distinct(UserAnswer.wp_user_id))).where(
UserAnswer.item_id.in_(family_item_ids)
)
)
or 0
)
family_frequency = (family_impressions / family_unique_users) if family_unique_users else 0.0
return usage_metrics, {
"impressions": float(family_impressions),
"unique_users": float(family_unique_users),
"frequency": float(family_frequency),
}
def _basis_items_list_body(items: list[Item]) -> str:
rows = []
for item in items:
rows.append(
"<tr>"
f"<td>{item.id}</td>"
f"<td>{escape(item.tryout_id)}</td>"
f"<td>{item.slot}</td>"
f"<td>{item.website_id}</td>"
f"<td>{escape(_truncate(item.stem, 120))}</td>"
f"<td>{item.source_snapshot_question_id or '-'}</td>"
f"<td><a href=\"/admin/basis-items/{item.id}\" style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Open Workspace</a></td>"
"</tr>"
)
table = (
"<table><thead><tr><th>Item ID</th><th>Tryout</th><th>Slot</th><th>Website</th><th>Stem</th><th>Source Snapshot QID</th><th>Actions</th></tr></thead><tbody>"
+ ("".join(rows) if rows else "<tr><td colspan=\"7\">No basis items found.</td></tr>")
+ "</tbody></table>"
)
return f"""
<p class="muted">Basis items are canonical parent questions (<code>sedang</code>, non-AI). Open a workspace to generate and review AI child variants.</p>
{table}
"""
def _basis_item_workspace_body(
basis_item: Item,
runs: list[AIGenerationRun],
variants: list[Item],
usage_by_item: dict[int, dict[str, float]],
family_stats: dict[str, float],
filters: dict[str, str],
error: str | None = None,
success: str | None = None,
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
generation_count: str = "1",
operator_notes: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
model_options = "".join(
f'<option value="{escape(model)}" {"selected" if model == ai_model else ""}>{escape(label)}</option>'
for model, label in SUPPORTED_MODELS.items()
)
status_filter = filters.get("status", "")
level_filter = filters.get("level", "")
min_frequency_filter = filters.get("min_frequency", "")
run_id_filter = filters.get("run_id", "")
run_rows = [
[
run.id,
run.target_level,
run.requested_count,
run.model,
run.created_by,
str(run.created_at),
]
for run in runs
]
runs_table = _table(
["Run ID", "Target", "Requested", "Model", "Created By", "Created At"],
run_rows,
)
variant_rows = []
for item in variants:
usage = usage_by_item.get(item.id, {"impressions": 0.0, "unique_users": 0.0, "frequency": 0.0})
variant_rows.append(
"<tr>"
f"<td><input type=\"checkbox\" name=\"item_ids\" value=\"{item.id}\"></td>"
f"<td>{item.id}</td>"
f"<td>{item.generation_run_id or '-'}</td>"
f"<td>{escape(item.level)}</td>"
f"<td>{escape(item.variant_status)}</td>"
f"<td>{escape(item.ai_model or '-')}</td>"
f"<td>{int(usage['impressions'])}</td>"
f"<td>{int(usage['unique_users'])}</td>"
f"<td>{usage['frequency']:.2f}</td>"
f"<td>{escape(_truncate(item.stem, 130))}</td>"
f"<td>{escape(str(item.created_at))}</td>"
"</tr>"
)
variants_table = (
f"<form method=\"post\" action=\"/admin/basis-items/{basis_item.id}/review-bulk\">"
"<div class=\"actions\" style=\"margin:16px 0\">"
"<select name=\"action\" style=\"max-width:260px\">"
"<option value=\"approved\">Approve selected</option>"
"<option value=\"rejected\">Reject selected</option>"
"<option value=\"archived\">Archive selected</option>"
"<option value=\"stale\">Mark stale</option>"
"<option value=\"active\">Activate selected</option>"
"</select>"
"<button type=\"submit\">Apply</button>"
"</div>"
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name=&quot;item_ids&quot;]').forEach(el => el.checked = this.checked)\"></th><th>Item ID</th><th>Run ID</th><th>Level</th><th>Status</th><th>Model</th><th>Impressions</th><th>Unique Users</th><th>Frequency</th><th>Stem</th><th>Created At</th></tr></thead><tbody>"
+ ("".join(variant_rows) if variant_rows else "<tr><td colspan=\"11\">No generated variants yet for this parent.</td></tr>")
+ "</tbody></table></form>"
)
return f"""
{success_html}
{error_html}
<section style="border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#f8fafc;">
<h3 style="margin:0 0 10px;">Parent Summary</h3>
<p class="muted" style="margin:0 0 8px;">
Parent Item: <strong>#{basis_item.id}</strong> |
Tryout: <strong>{escape(basis_item.tryout_id)}</strong> |
Slot: <strong>{basis_item.slot}</strong> |
Website: <strong>{basis_item.website_id}</strong> |
Source Snapshot QID: <strong>{basis_item.source_snapshot_question_id or '-'}</strong>
</p>
<p class="muted" style="margin:0 0 8px;">
Family Usage: impressions=<strong>{int(family_stats.get("impressions", 0.0))}</strong>,
unique users=<strong>{int(family_stats.get("unique_users", 0.0))}</strong>,
frequency=<strong>{family_stats.get("frequency", 0.0):.2f}</strong>
</p>
<p class="muted" style="margin:0;"><strong>Stem:</strong> {escape(_truncate(basis_item.stem, 260))}</p>
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 8px;">Generate Variants</h3>
<p class="muted" style="margin:0 0 12px;">Create new AI child variants for this parent.</p>
<form method="post" action="/admin/basis-items/{basis_item.id}/generate" autocomplete="off">
<label for="target_level">Target Level</label>
<select id="target_level" name="target_level">
<option value="mudah" {"selected" if target_level == "mudah" else ""}>mudah</option>
<option value="sulit" {"selected" if target_level == "sulit" else ""}>sulit</option>
</select>
<label for="ai_model">Model</label>
<select id="ai_model" name="ai_model">{model_options}</select>
<label for="generation_count">Generate Count</label>
<input id="generation_count" name="generation_count" type="number" min="1" max="50" value="{escape(generation_count)}">
<p class="muted">Recommended: 1-3 per run. Larger runs increase overlap and review burden.</p>
<label for="operator_notes">Operator Notes (optional)</label>
<textarea id="operator_notes" name="operator_notes" rows="3">{escape(operator_notes)}</textarea>
<button type="submit">Generate Variants</button>
</form>
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 8px;">Filter Variants</h3>
<p class="muted" style="margin:0 0 12px;">Filter child variants shown in the review table below.</p>
<form method="get" action="/admin/basis-items/{basis_item.id}" autocomplete="off">
<label for="status">Status</label>
<select id="status" name="status">
<option value="" {"selected" if status_filter == "" else ""}>All</option>
<option value="draft" {"selected" if status_filter == "draft" else ""}>draft</option>
<option value="approved" {"selected" if status_filter == "approved" else ""}>approved</option>
<option value="active" {"selected" if status_filter == "active" else ""}>active</option>
<option value="rejected" {"selected" if status_filter == "rejected" else ""}>rejected</option>
<option value="archived" {"selected" if status_filter == "archived" else ""}>archived</option>
<option value="stale" {"selected" if status_filter == "stale" else ""}>stale</option>
</select>
<label for="level">Level</label>
<select id="level" name="level">
<option value="" {"selected" if level_filter == "" else ""}>All</option>
<option value="mudah" {"selected" if level_filter == "mudah" else ""}>mudah</option>
<option value="sulit" {"selected" if level_filter == "sulit" else ""}>sulit</option>
</select>
<label for="run_id">Generation Run ID</label>
<input id="run_id" name="run_id" type="number" min="1" value="{escape(run_id_filter)}">
<label for="min_frequency">Min Frequency</label>
<input id="min_frequency" name="min_frequency" type="number" min="0" step="0.1" value="{escape(min_frequency_filter)}">
<div class="actions">
<button type="submit">Apply Filters</button>
<a href="/admin/basis-items/{basis_item.id}" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Reset</a>
</div>
</form>
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 12px;">Generation Runs for This Parent</h3>
{runs_table}
</section>
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
<h3 style="margin:0 0 8px;">Child Variants for This Parent</h3>
<p class="muted">Filtered variants shown: <strong>{len(variants)}</strong></p>
{variants_table}
</section>
<p style="margin-top:20px"><a href="/admin/basis-items">Back to Basis Items</a></p>
"""
async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item:
result = await db.execute(
select(Item)
.where(
Item.level == "sedang",
Item.generated_by == "manual",
Item.tryout_id == "demo-tryout",
)
.order_by(Item.id.asc())
.limit(1)
)
existing_item = result.scalar_one_or_none()
if existing_item:
return existing_item
website_result = await db.execute(
select(Website).where(Website.site_url == "https://demo.local").limit(1)
)
website = website_result.scalar_one_or_none()
if website is None:
website = Website(site_url="https://demo.local", site_name="Demo Website")
db.add(website)
await db.flush()
tryout_result = await db.execute(
select(Tryout)
.where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout")
.limit(1)
)
tryout = tryout_result.scalar_one_or_none()
if tryout is None:
tryout = Tryout(
website_id=website.id,
tryout_id="demo-tryout",
name="Demo AI Playground Tryout",
description="Seed data for the AI playground.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
item = Item(
tryout_id=tryout.tryout_id,
website_id=website.id,
slot=1,
level="sedang",
stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?",
options={
"A": "Rp260.000",
"B": "Rp300.000",
"C": "Rp320.000",
"D": "Rp360.000",
},
correct_answer="B",
explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.",
generated_by="manual",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
await db.commit()
await db.refresh(item)
return item
async def _load_websites(db: AsyncSession) -> list[Website]:
result = await db.execute(select(Website).order_by(Website.id.asc()))
return list(result.scalars().all())
async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImportSnapshot]:
result = await db.execute(
select(TryoutImportSnapshot).order_by(TryoutImportSnapshot.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout:
result = await db.execute(
select(Tryout).where(
Tryout.website_id == snapshot.website_id,
Tryout.tryout_id == snapshot.source_tryout_id,
)
)
tryout = result.scalar_one_or_none()
if tryout:
return tryout
tryout = Tryout(
website_id=snapshot.website_id,
tryout_id=snapshot.source_tryout_id,
name=snapshot.title,
description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
return tryout
async def _load_snapshot_question_context(
snapshot: TryoutImportSnapshot,
db: AsyncSession,
) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]:
question_result = await db.execute(
select(TryoutSnapshotQuestion)
.where(
TryoutSnapshotQuestion.website_id == snapshot.website_id,
TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id,
)
.order_by(TryoutSnapshotQuestion.source_question_id.asc())
)
questions = list(question_result.scalars().all())
item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()}
slot_map = _snapshot_slot_map(snapshot)
questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id))
return questions, promoted_items_by_slot, slot_map
async def _promote_snapshot_question_to_item(
snapshot: TryoutImportSnapshot,
question: TryoutSnapshotQuestion,
db: AsyncSession,
) -> tuple[Item | None, str]:
if (
question.website_id != snapshot.website_id
or question.source_tryout_id != snapshot.source_tryout_id
):
return None, "mismatch"
slot_map = _snapshot_slot_map(snapshot)
slot = slot_map.get(question.source_question_id)
if not slot:
max_slot = (
await db.scalar(
select(func.max(Item.slot)).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
or 0
)
slot = max_slot + 1
options = _snapshot_options_to_item_options(question.raw_options)
if not options:
return None, "missing_options"
await _ensure_operational_tryout(snapshot, db)
existing_item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.slot == slot,
Item.level == "sedang",
)
)
existing_item = existing_item_result.scalar_one_or_none()
if existing_item is not None:
return existing_item, "existing"
item = Item(
tryout_id=snapshot.source_tryout_id,
website_id=snapshot.website_id,
slot=slot,
level="sedang",
stem=question.question_html,
options=options,
correct_answer=question.correct_answer,
explanation=question.explanation_html,
generated_by="manual",
source_snapshot_question_id=question.id,
variant_status="active",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
return item, "created"
@router.get("", include_in_schema=False)
@router.get("/", include_in_schema=False)
async def admin_root(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
return _login_redirect()
@router.get("/login", include_in_schema=False)
async def login_view(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
body = """
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
<p class="muted">Direct environment-backed admin access.</p>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
)
@router.post("/login", include_in_schema=False)
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
remember_me: str | None = Form(None),
):
if not (
secrets.compare_digest(username, settings.ADMIN_USERNAME)
and secrets.compare_digest(password, settings.ADMIN_PASSWORD)
):
body = f"""
<div class="error">Invalid username or password.</div>
<form method="post" action="/admin/login" autocomplete="off">
<label for="username">Username</label>
<input id="username" name="username" type="text" autocomplete="username" value="{escape(username)}">
<label for="password">Password</label>
<input id="password" name="password" type="password" autocomplete="current-password">
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
<button type="submit">Sign in</button>
</form>
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_401_UNAUTHORIZED,
)
expire = settings.ADMIN_SESSION_EXPIRE_SECONDS
response = _dashboard_redirect()
if remember_me == "on":
expire = max(expire, 3600 * 24 * 30)
response.set_cookie("remember_me", "on", expires=expire, path="/admin")
else:
response.delete_cookie("remember_me", path="/admin")
token = uuid.uuid4().hex
response.set_cookie(
SESSION_COOKIE,
token,
expires=expire,
path="/admin",
httponly=True,
samesite="lax",
)
await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire)
return response
@router.get("/logout", include_in_schema=False)
async def logout(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if token and _admin_redis is not None:
await _admin_redis.delete(f"{SESSION_PREFIX}{token}")
response = _login_redirect()
response.delete_cookie(SESSION_COOKIE, path="/admin")
response.delete_cookie("remember_me", path="/admin")
return response
@router.get("/password", include_in_schema=False)
async def password_view(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = f"""
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
<p>Password changes are disabled in the UI for this deployment.</p>
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
<p>Session expiry is currently set to <strong>{settings.ADMIN_SESSION_EXPIRE_SECONDS}</strong> seconds.</p>
<p><a href="/admin/dashboard">Back to dashboard</a></p>
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
)
@router.post("/password", include_in_schema=False)
async def password_submit(
request: Request,
old_password: str = Form(...),
new_password: str = Form(...),
re_new_password: str = Form(...),
):
_ = (old_password, new_password, re_new_password)
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = """
<div class="error">Password rotation via UI is disabled.</div>
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
<p><a href="/admin/dashboard">Back to dashboard</a></p>
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
status_code=400,
)
@router.get("/dashboard", include_in_schema=False)
async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0
items = await db.scalar(select(func.count()).select_from(Item)) or 0
sessions = await db.scalar(select(func.count()).select_from(Session)) or 0
completed_sessions = (
await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True)))
or 0
)
body = f"""
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
<div class="grid">
<div class="stat">Tryouts<strong>{tryouts}</strong></div>
<div class="stat">Items<strong>{items}</strong></div>
<div class="stat">Sessions<strong>{sessions}</strong></div>
<div class="stat">Completed Sessions<strong>{completed_sessions}</strong></div>
</div>
<p style="margin-top:20px"><a href="/admin/ai-playground">Open AI Playground</a></p>
"""
return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page("Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_view(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
body = _website_edit_form_body(website)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
body = _website_edit_form_body(
website,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
if not normalized_url.startswith(("http://", "https://")):
body = _website_edit_form_body(
website,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
website.site_name = normalized_name
website.site_url = normalized_url
try:
await db.commit()
except IntegrityError:
await db.rollback()
body = _website_edit_form_body(
website,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
await db.refresh(website)
body = _website_edit_form_body(
website,
success=f"Website #{website.id} updated successfully.",
)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/delete", include_in_schema=False)
async def website_delete_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
deleted_label = f"{website.site_name} ({website.site_url})"
await db.delete(website)
await db.commit()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website deleted successfully: {deleted_label}",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/tryout-import", include_in_schema=False)
async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(websites, snapshots)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.post("/tryout-import/preview", include_in_schema=False)
async def tryout_import_preview(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
file: UploadFile = File(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
if not file.filename or not file.filename.lower().endswith(".json"):
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be .json format.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
payload_bytes = await file.read()
payload_text = payload_bytes.decode("utf-8")
payload = json.loads(payload_text)
except UnicodeDecodeError:
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be UTF-8 encoded JSON.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
except json.JSONDecodeError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Invalid JSON file: {exc}",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
preview = await preview_tryout_json_import(payload, website_id, db)
except TryoutImportError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
preview_token = uuid.uuid4().hex
await _admin_redis.set(
f"{IMPORT_PREVIEW_PREFIX}{preview_token}",
payload_text,
ex=IMPORT_PREVIEW_TTL_SECONDS,
)
body = _tryout_import_form_body(
websites,
snapshots,
selected_website_id=website_id,
preview=preview,
preview_token=preview_token,
upload_filename=file.filename or "",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.post("/tryout-import", include_in_schema=False)
async def tryout_import_submit(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
preview_token: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
if not payload_text:
body = _tryout_import_form_body(
websites,
snapshots,
error="Preview token expired. Upload the JSON again and preview before importing.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
payload = json.loads(payload_text)
result = await import_tryout_json_snapshot(payload, website_id, db)
await db.commit()
except TryoutImportError as exc:
await db.rollback()
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
except Exception:
await db.rollback()
raise
finally:
await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
updated_snapshots = await _recent_snapshots(db)
imported_tryouts = result.get("imported_tryouts") or []
imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts)
body = _tryout_import_form_body(
websites,
updated_snapshots,
success=(
f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)."
),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.get("/snapshot-questions", include_in_schema=False)
async def snapshot_questions_view(
request: Request,
snapshot_id: int,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot)
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
@router.post("/snapshot-questions/promote-bulk", include_in_schema=False)
async def snapshot_question_promote_bulk(
request: Request,
snapshot_id: int = Form(...),
snapshot_question_ids: list[int] | None = Form(None),
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
if not snapshot_question_ids:
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
body = _snapshot_questions_body(
snapshot,
questions,
promoted_items_by_slot,
error="Select at least one snapshot question to promote.",
)
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
question_result = await db.execute(
select(TryoutSnapshotQuestion).where(
TryoutSnapshotQuestion.id.in_(snapshot_question_ids)
)
)
selected_questions = list(question_result.scalars().all())
created_items: list[Item] = []
existing_items: list[Item] = []
missing_option_count = 0
mismatch_count = 0
for question in selected_questions:
item, status = await _promote_snapshot_question_to_item(snapshot, question, db)
if status == "created" and item is not None:
created_items.append(item)
elif status == "existing" and item is not None:
existing_items.append(item)
elif status == "missing_options":
missing_option_count += 1
elif status == "mismatch":
mismatch_count += 1
await db.commit()
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
success_parts = []
if created_items:
success_parts.append(f"created {len(created_items)} item(s)")
if existing_items:
success_parts.append(f"reused {len(existing_items)} existing item(s)")
if missing_option_count:
success_parts.append(f"skipped {missing_option_count} question(s) with missing option text")
if mismatch_count:
success_parts.append(f"skipped {mismatch_count} mismatched question(s)")
success_message = "Bulk promote finished: " + ", ".join(success_parts) + "."
if created_items:
success_message += f" Latest basis item ID: {created_items[-1].id}."
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message)
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f'{status["calibration_percentage"]:.2f}%',
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
rows,
)
return _render_admin_page("Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
rows,
)
return _render_admin_page("Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
rows,
)
return _render_admin_page("Session Overview", "Session Overview", body)
@router.get("/basis-items", include_in_schema=False)
async def basis_items_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Item)
.where(Item.level == "sedang", Item.generated_by != "ai")
.order_by(Item.updated_at.desc(), Item.id.desc())
.limit(200)
)
basis_items = list(result.scalars().all())
body = _basis_items_list_body(basis_items)
return _render_admin_page("Basis Items", "Basis Items", body)
@router.get("/basis-items/{basis_item_id}", include_in_schema=False)
async def basis_item_workspace_view(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
status_filter = (request.query_params.get("status") or "").strip()
level_filter = (request.query_params.get("level") or "").strip()
run_id_filter = (request.query_params.get("run_id") or "").strip()
min_frequency_filter = (request.query_params.get("min_frequency") or "").strip()
filters = {
"status": status_filter,
"level": level_filter,
"run_id": run_id_filter,
"min_frequency": min_frequency_filter,
}
basis_item = await db.get(Item, basis_item_id)
if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang":
result = await db.execute(
select(Item)
.where(Item.level == "sedang", Item.generated_by != "ai")
.order_by(Item.updated_at.desc(), Item.id.desc())
.limit(200)
)
body = _basis_items_list_body(list(result.scalars().all()))
return _render_admin_page("Basis Items", "Basis Items", body)
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
runs = list(run_result.scalars().all())
variant_result = await db.execute(
select(Item).where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
variants_all = list(variant_result.scalars().all())
variants = variants_all
if status_filter:
variants = [item for item in variants if item.variant_status == status_filter]
if level_filter in {"mudah", "sulit"}:
variants = [item for item in variants if item.level == level_filter]
if run_id_filter.isdigit():
rid = int(run_id_filter)
variants = [item for item in variants if item.generation_run_id == rid]
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
if min_frequency_filter:
try:
min_freq = float(min_frequency_filter)
variants = [
item
for item in variants
if usage_metrics.get(item.id, {}).get("frequency", 0.0) >= min_freq
]
except ValueError:
pass
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
)
return _render_admin_page(
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
@router.post("/basis-items/{basis_item_id}/generate", include_in_schema=False)
async def basis_item_generate_submit(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
target_level: str = Form(...),
ai_model: str = Form(...),
generation_count: int = Form(1),
operator_notes: str = Form(""),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
basis_item = await db.get(Item, basis_item_id)
if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang":
return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER)
if not settings.OPENROUTER_API_KEY:
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
error="OPENROUTER_API_KEY is not configured.",
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page(
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
if target_level not in {"mudah", "sulit"}:
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
if not validate_ai_model(ai_model):
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
if generation_count < 1 or generation_count > 50:
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
run_id = await create_generation_run(
basis_item_id=basis_item.id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
target_level=target_level,
requested_count=generation_count,
model=ai_model,
created_by=admin.username,
operator_notes=operator_notes.strip() or None,
db=db,
)
generated = await generate_questions_batch(
basis_item=basis_item,
target_level=target_level,
ai_model=ai_model,
count=generation_count,
)
from app.schemas.ai import GeneratedQuestion
saved = 0
for generated_question in generated:
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=generated_question.stem,
options=generated_question.options,
correct=generated_question.correct,
explanation=generated_question.explanation or None,
),
tryout_id=basis_item.tryout_id,
website_id=basis_item.website_id,
basis_item_id=basis_item.id,
slot=basis_item.slot,
level=target_level,
ai_model=ai_model,
generation_run_id=run_id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
variant_status="draft",
db=db,
)
if item_id:
saved += 1
await db.commit()
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
success=f"Run #{run_id} finished. Saved {saved} variant(s).",
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
)
return _render_admin_page(
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
@router.post("/basis-items/{basis_item_id}/review-bulk", include_in_schema=False)
async def basis_item_review_bulk(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
item_ids: list[int] = Form([]),
action: str = Form(...),
):
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
admin = await _current_admin(request)
if not admin:
return _login_redirect()
basis_item = await db.get(Item, basis_item_id)
if basis_item is None:
return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER)
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
if action in valid_actions and item_ids:
result = await db.execute(
select(Item).where(
Item.id.in_(item_ids),
Item.generated_by == "ai",
Item.basis_item_id == basis_item.id,
)
)
items = list(result.scalars().all())
reviewed_at = datetime.now(timezone.utc)
for item in items:
item.variant_status = action
item.reviewed_by = admin.username
item.reviewed_at = reviewed_at
await db.commit()
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
success=f"Applied status '{action}' to selected variants.",
)
return _render_admin_page(
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
error: str | None = None,
success: str | None = None,
generation_summary: dict[str, Any] | None = None,
basis_items: list[Item] | None = None,
generation_runs: list[AIGenerationRun] | None = None,
generated_variants: list[Item] | None = None,
basis_item_id: str = "",
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
generation_count: str = "1",
operator_notes: str = "",
) -> str:
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
options_html = "".join(
f'<option value="{escape(model)}" {"selected" if model == ai_model else ""}>{escape(label)}</option>'
for model, label in SUPPORTED_MODELS.items()
)
basis_items = basis_items or []
generation_runs = generation_runs or []
generated_variants = generated_variants or []
basis_rows = [
[
item.id,
item.tryout_id,
item.slot,
item.website_id,
_truncate(item.stem, 90),
]
for item in basis_items
]
basis_table = _table(
["Item ID", "Tryout", "Slot", "Website", "Stem"],
basis_rows,
)
seed_callout = ""
if not basis_items:
seed_callout = """
<div class="success">
No <code>sedang</code> basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
</div>
<form method="post" action="/admin/ai-playground/seed-demo">
<button type="submit">Seed Demo Basis Item</button>
</form>
"""
summary_html = ""
if generation_summary:
saved_item_ids = generation_summary.get("saved_item_ids") or []
summary_html = f"""
<h3 style="margin-top:24px">Latest Generation Run</h3>
<div class="grid">
<div class="stat">Run ID<strong>{generation_summary.get("run_id", "-")}</strong></div>
<div class="stat">Requested<strong>{generation_summary.get("requested_count", 0)}</strong></div>
<div class="stat">Generated<strong>{generation_summary.get("generated_count", 0)}</strong></div>
<div class="stat">Saved<strong>{len(saved_item_ids)}</strong></div>
</div>
<p class="muted">Each saved output starts as <code>draft</code>. Review per item below to approve, reject, archive, stale, or activate.</p>
"""
run_rows = [
[
run.id,
run.basis_item_id,
run.target_level,
run.requested_count,
run.model,
run.created_by,
str(run.created_at),
]
for run in generation_runs
]
runs_table = _table(
["Run ID", "Basis Item", "Target", "Requested", "Model", "Created By", "Created At"],
run_rows,
)
variant_rows = []
for item in generated_variants:
variant_rows.append(
"<tr>"
f"<td><input type=\"checkbox\" name=\"item_ids\" value=\"{item.id}\"></td>"
f"<td>{item.id}</td>"
f"<td>{item.generation_run_id or '-'}</td>"
f"<td>{item.basis_item_id or '-'}</td>"
f"<td>{escape(item.level)}</td>"
f"<td>{escape(item.variant_status)}</td>"
f"<td>{escape(item.ai_model or '-')}</td>"
f"<td>{escape(_truncate(item.stem, 100))}</td>"
f"<td>{escape(str(item.created_at))}</td>"
"</tr>"
)
variants_table = (
"<form method=\"post\" action=\"/admin/ai-playground/review-bulk\">"
"<div class=\"actions\" style=\"margin:16px 0\">"
"<select name=\"action\" style=\"max-width:260px\">"
"<option value=\"approved\">Approve selected</option>"
"<option value=\"rejected\">Reject selected</option>"
"<option value=\"archived\">Archive selected</option>"
"<option value=\"stale\">Mark stale</option>"
"<option value=\"active\">Activate selected</option>"
"</select>"
"<button type=\"submit\">Apply</button>"
"</div>"
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name=&quot;item_ids&quot;]').forEach(el => el.checked = this.checked)\"></th><th>Item ID</th><th>Run ID</th><th>Basis</th><th>Level</th><th>Status</th><th>Model</th><th>Stem</th><th>Created At</th></tr></thead><tbody>"
+ ("".join(variant_rows) if variant_rows else "<tr><td colspan=\"9\">No AI-generated variants yet.</td></tr>")
+ "</tbody></table></form>"
)
return f"""
<p class="muted">OpenRouter key configured: <strong>{"Yes" if key_configured else "No"}</strong></p>
<p class="muted">Total AI-generated items: <strong>{stats.get("total_ai_items", 0)}</strong></p>
<p class="muted">Hybrid workflow: one run can generate one or many variants; each item remains independently reviewable.</p>
{success_html}
{error_html}
{seed_callout}
<form method="post" action="/admin/ai-playground" autocomplete="off">
<label for="basis_item_id">Basis Item ID</label>
<input id="basis_item_id" name="basis_item_id" type="number" value="{escape(basis_item_id)}">
<label for="target_level">Target Level</label>
<select id="target_level" name="target_level" style="width:100%;box-sizing:border-box;border:1px solid #cbd5e1;border-radius:10px;padding:12px 14px;font-size:15px;">
<option value="mudah" {"selected" if target_level == "mudah" else ""}>mudah</option>
<option value="sulit" {"selected" if target_level == "sulit" else ""}>sulit</option>
</select>
<label for="ai_model">Model</label>
<select id="ai_model" name="ai_model" style="width:100%;box-sizing:border-box;border:1px solid #cbd5e1;border-radius:10px;padding:12px 14px;font-size:15px;">
{options_html}
</select>
<label for="generation_count">Generate Count</label>
<input id="generation_count" name="generation_count" type="number" min="1" max="50" value="{escape(generation_count)}">
<p class="muted">Recommended: 1-3 variants per run. Larger runs can increase overlap and review burden. Backend safety cap: 50.</p>
<label for="operator_notes">Optional Notes (style hints)</label>
<textarea id="operator_notes" name="operator_notes" rows="3" placeholder="Optional generation note for this run">{escape(operator_notes)}</textarea>
<button type="submit">Generate Run</button>
</form>
<h3 style="margin-top:24px">Available Sedang Basis Items</h3>
<p class="muted">The generator needs a <code>sedang</code> item. Use one of these IDs, or seed demo data if the table is empty.</p>
{basis_table}
{summary_html}
<h3 style="margin-top:24px">Recent Generation Runs</h3>
{runs_table}
<h3 style="margin-top:24px">Generated Variants (Review Queue)</h3>
<p class="muted">Use bulk review actions to move items from draft to approved/active, or reject/archive/stale.</p>
{variants_table}
"""
@router.get("/ai-playground", include_in_schema=False)
async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
basis_item_id = request.query_params.get("basis_item_id", "")
generation_runs = await _recent_generation_runs(db)
selected_basis_item_id: int | None = None
if basis_item_id and str(basis_item_id).isdigit():
selected_basis_item_id = int(str(basis_item_id))
generated_variants = await _recent_generated_variants(
db,
basis_item_id=selected_basis_item_id,
)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id or ""),
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground/seed-demo", include_in_schema=False)
async def ai_playground_seed_demo(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
demo_item = await _find_or_create_demo_basis_item(db)
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
generation_runs = await _recent_generation_runs(db)
generated_variants = await _recent_generated_variants(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
success=f"Demo basis item is ready: item #{demo_item.id}, tryout {demo_item.tryout_id}, slot {demo_item.slot}.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(demo_item.id),
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground", include_in_schema=False)
async def ai_playground_submit(
request: Request,
db: AsyncSession = Depends(get_db),
basis_item_id: int = Form(...),
target_level: str = Form(...),
ai_model: str = Form(...),
generation_count: int = Form(1),
operator_notes: str = Form(""),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
generation_runs = await _recent_generation_runs(db)
generated_variants = await _recent_generated_variants(db)
if not settings.OPENROUTER_API_KEY:
body = _ai_form_body(
False,
stats,
error="OPENROUTER_API_KEY is not configured in the environment.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if target_level not in {"mudah", "sulit"}:
body = _ai_form_body(
True,
stats,
error="Target level must be mudah or sulit.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if not validate_ai_model(ai_model):
body = _ai_form_body(
True,
stats,
error="Unsupported AI model.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
result = await db.execute(select(Item).where(Item.id == basis_item_id))
basis_item = result.scalar_one_or_none()
if not basis_item:
body = _ai_form_body(
True,
stats,
error=f"Basis item not found: {basis_item_id}",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if basis_item.level != "sedang":
body = _ai_form_body(
True,
stats,
error=f"Basis item must be sedang level, got: {basis_item.level}",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if generation_count < 1 or generation_count > 50:
body = _ai_form_body(
True,
stats,
error="Generate count must be between 1 and 50.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
run_id = await create_generation_run(
basis_item_id=basis_item.id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
target_level=target_level,
requested_count=generation_count,
model=ai_model,
created_by=admin.username,
operator_notes=operator_notes.strip() or None,
db=db,
)
generated = await generate_questions_batch(
basis_item=basis_item,
target_level=target_level,
ai_model=ai_model,
count=generation_count,
)
saved_item_ids: list[int] = []
from app.schemas.ai import GeneratedQuestion
for generated_question in generated:
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=generated_question.stem,
options=generated_question.options,
correct=generated_question.correct,
explanation=generated_question.explanation or None,
),
tryout_id=basis_item.tryout_id,
website_id=basis_item.website_id,
basis_item_id=basis_item.id,
slot=basis_item.slot,
level=target_level,
ai_model=ai_model,
generation_run_id=run_id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
variant_status="draft",
db=db,
)
if item_id:
saved_item_ids.append(item_id)
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
updated_runs = await _recent_generation_runs(db)
updated_variants = await _recent_generated_variants(db)
if not saved_item_ids:
body = _ai_form_body(
True,
updated_stats,
error="Generation run completed but no items were saved. Check model output and logs.",
basis_items=updated_basis_items,
generation_runs=updated_runs,
generated_variants=updated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
body = _ai_form_body(
True,
updated_stats,
success=f"Generation run #{run_id} completed. Saved {len(saved_item_ids)} item(s).",
generation_summary={
"run_id": run_id,
"requested_count": generation_count,
"generated_count": len(generated),
"saved_item_ids": saved_item_ids,
},
basis_items=updated_basis_items,
generation_runs=updated_runs,
generated_variants=updated_variants,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground/save", include_in_schema=False)
async def ai_playground_save(
request: Request,
db: AsyncSession = Depends(get_db),
basis_item_id: int = Form(...),
tryout_id: str = Form(...),
website_id: int = Form(...),
slot: int = Form(...),
target_level: str = Form(...),
ai_model: str = Form(...),
stem: str = Form(...),
options_json: str = Form(...),
correct: str = Form(...),
explanation: str = Form(""),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
if target_level not in {"mudah", "sulit"}:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Only mudah or sulit generated items can be saved from the playground.",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
try:
options = json.loads(options_json)
except json.JSONDecodeError:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Generated options payload is invalid.",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
from app.schemas.ai import GeneratedQuestion
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=stem,
options=options,
correct=correct,
explanation=explanation or None,
),
tryout_id=tryout_id,
website_id=website_id,
basis_item_id=basis_item_id,
slot=slot,
level=target_level,
ai_model=ai_model,
variant_status="draft",
db=db,
)
if not item_id:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Failed to save generated item. Check server logs for the database error.",
basis_items=basis_items,
)
return _render_admin_page("AI Playground", "AI Playground", body)
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
updated_stats,
success=f"Generated item saved successfully as item #{item_id}.",
basis_items=updated_basis_items,
basis_item_id=str(basis_item_id),
target_level=target_level,
ai_model=ai_model,
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.post("/ai-playground/review-bulk", include_in_schema=False)
async def ai_playground_review_bulk(
request: Request,
db: AsyncSession = Depends(get_db),
item_ids: list[int] = Form([]),
action: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
stats = await get_ai_stats(db)
basis_items = await _basis_items_for_playground(db)
generation_runs = await _recent_generation_runs(db)
generated_variants = await _recent_generated_variants(db)
if action not in valid_actions:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Invalid review action.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
)
return _render_admin_page("AI Playground", "AI Playground", body)
if not item_ids:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="Select at least one generated item.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
)
return _render_admin_page("AI Playground", "AI Playground", body)
result = await db.execute(
select(Item).where(Item.id.in_(item_ids), Item.generated_by == "ai")
)
items = list(result.scalars().all())
if not items:
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
stats,
error="No matching AI-generated items were found for the selected IDs.",
basis_items=basis_items,
generation_runs=generation_runs,
generated_variants=generated_variants,
)
return _render_admin_page("AI Playground", "AI Playground", body)
reviewed_at = datetime.now(timezone.utc)
for item in items:
item.variant_status = action
item.reviewed_by = admin.username
item.reviewed_at = reviewed_at
await db.commit()
updated_stats = await get_ai_stats(db)
updated_basis_items = await _basis_items_for_playground(db)
updated_runs = await _recent_generation_runs(db)
updated_variants = await _recent_generated_variants(db)
body = _ai_form_body(
bool(settings.OPENROUTER_API_KEY),
updated_stats,
success=f"Updated {len(items)} item(s) to status '{action}'.",
basis_items=updated_basis_items,
generation_runs=updated_runs,
generated_variants=updated_variants,
)
return _render_admin_page("AI Playground", "AI Playground", body)
@router.get("/tryout/list", include_in_schema=False)
@router.get("/item/list", include_in_schema=False)
@router.get("/user/list", include_in_schema=False)
@router.get("/session/list", include_in_schema=False)
@router.get("/tryoutstats/list", include_in_schema=False)
async def legacy_admin_paths(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
return _dashboard_redirect()