"""
Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions.
This replaces the previous fastapi-admin runtime path, which depended on
Tortoise-oriented internals that do not match this project.
"""
import json
import re
import secrets
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from html import escape, unescape
from typing import Any
import aioredis
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile
from sqlalchemy import Integer, func, or_, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from starlette.responses import HTMLResponse, RedirectResponse
from starlette.status import (
HTTP_303_SEE_OTHER,
HTTP_401_UNAUTHORIZED,
HTTP_429_TOO_MANY_REQUESTS,
)
from app.admin_web_icons import EMOJI_TO_ICON, NAV_ICONS_SVG
from app.core.config import get_settings
from app.database import get_db
from app.models import (
AIGenerationRun,
Item,
Session,
Tryout,
TryoutImportSnapshot,
TryoutSnapshotQuestion,
UserAnswer,
Website,
)
from app.services.ai_generation import (
create_generation_run,
generate_questions_batch,
get_ai_stats,
save_ai_question,
)
from app.services.irt_calibration import get_calibration_status
from app.services.tryout_json_import import (
TryoutImportError,
import_tryout_json_snapshot,
preview_tryout_json_import,
)
settings = get_settings()
router = APIRouter(prefix="/admin", tags=["admin-web"])
SESSION_COOKIE = "access_token"
CSRF_COOKIE = "admin_csrf_token"
SESSION_PREFIX = "admin:session:"
IMPORT_PREVIEW_PREFIX = "admin:import-preview:"
IMPORT_PREVIEW_TTL_SECONDS = 900
LOGIN_RATE_LIMIT_PREFIX = "admin:login:attempts:"
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 10
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 300
_admin_redis = None
@dataclass
class AdminPrincipal:
username: str
async def configure_admin_web() -> None:
global _admin_redis
if _admin_redis is not None:
return
if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD:
raise RuntimeError(
"ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set."
)
_admin_redis = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
async def shutdown_admin_web() -> None:
global _admin_redis
if _admin_redis is None:
return
try:
await _admin_redis.close()
finally:
_admin_redis = None
async def _current_admin(request: Request) -> AdminPrincipal | None:
if _admin_redis is None:
return None
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
username = await _admin_redis.get(f"{SESSION_PREFIX}{token}")
if not username:
return None
return AdminPrincipal(username=str(username))
def _login_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER)
def _dashboard_redirect() -> RedirectResponse:
return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER)
# ============================================================
# ADMIN NAVIGATION - Human-friendly labels
# ============================================================
# Structure: (Label, URL, Child URL prefixes)
# Organized by workflow: Dashboard > Questions > Exams > Reports > Settings
ADMIN_NAV_ITEMS = (
# Main navigation groups
("Dashboard", "/admin/dashboard", ("/admin/dashboard",)),
# Questions section
(
"Questions",
"/admin/questions",
(
"/admin/questions",
"/admin/templates",
"/admin/question-quality",
),
),
(
"Import Questions",
"/admin/tryout-import",
(
"/admin/tryout-import",
"/admin/snapshot-questions",
),
),
# Exams section
(
"Exams",
"/admin/exams",
(
"/admin/exams",
"/admin/student-attempts",
"/admin/normalization",
),
),
# Reports section
(
"Reports",
"/admin/reports",
(
"/admin/reports",
"/admin/item-statistics",
"/admin/calibration-status",
"/admin/session-overview",
),
),
# Settings section
(
"Settings",
"/admin/settings",
(
"/admin/settings",
"/admin/websites",
"/admin/hierarchy",
"/admin/password",
),
),
# Logout (special - no active state)
("Logout", "/admin/logout", ("/admin/logout",)),
)
# URL mapping for backwards compatibility (old URLs -> new URLs)
LEGACY_URL_MAP = {
"/admin/basis-items": "/admin/ai-generation",
"/admin/calibration-status": "/admin/question-quality",
"/admin/item-statistics": "/admin/reports",
"/admin/session-overview": "/admin/exams",
}
# Navigation section icons (using SVG for consistent professional look)
NAV_ICONS = NAV_ICONS_SVG
def _replace_emojis_with_icons(html: str) -> str:
"""Replace emoji characters with SVG icons in HTML content."""
for emoji, icon_svg in EMOJI_TO_ICON.items():
if emoji in html:
wrapped_svg = f'{icon_svg} '
html = html.replace(emoji, wrapped_svg)
return html
def _is_admin_nav_active(
current_path: str,
nav_path: str,
child_prefixes: tuple[str, ...],
) -> bool:
if current_path == nav_path:
return True
for prefix in child_prefixes:
if current_path == prefix or current_path.startswith(f"{prefix}/"):
return True
return False
def _admin_nav_links(request: Request) -> str:
"""Render human-friendly navigation links with icons."""
current_path = request.url.path
# Check for legacy URLs and redirect if needed
for legacy_url, new_url in LEGACY_URL_MAP.items():
if current_path.startswith(legacy_url):
current_path = new_url
break
links = []
for label, path, child_prefixes in ADMIN_NAV_ITEMS:
# Special handling for Logout
if label == "Logout":
links.append(f'{escape(label)} ')
continue
active = _is_admin_nav_active(current_path, path, child_prefixes)
icon = NAV_ICONS.get(label, "")
label_html = f"{icon} {escape(label)}" if icon else escape(label)
class_attr = ' class="active"' if active else ""
aria = ' aria-current="page"' if active else ""
links.append(f'{label_html} ')
return "\n ".join(links)
def _render_auth_page(
request: Request,
title: str,
subtitle: str,
body: str,
status_code: int = 200,
) -> HTMLResponse:
remember_me_checked = (
"checked" if request.cookies.get("remember_me") == "on" else ""
)
html = f"""
{escape(title)}
?
{escape(title)}
{escape(subtitle)}
{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
"""
csrf_token = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
csrf_input = f' '
html = re.sub(
r'(
Website Name
Website URL
Add Website
Registered Websites
Use the website ID when importing read-only tryout snapshots.
{websites_table}
"""
def _website_edit_form_body(
website: Website,
error: str | None = None,
success: str | None = None,
site_name: str | None = None,
site_url: str | None = None,
) -> str:
error_html = f'{escape(error)}
' if error else ""
success_html = f'{escape(success)}
' if success else ""
display_name = website.site_name if site_name is None else site_name
display_url = website.site_url if site_url is None else site_url
return f"""
Website ID: {website.id}
{success_html}
{error_html}
Website Name
Website URL
"""
def _tryout_import_form_body(
websites: list[Website],
recent_snapshots: list[TryoutImportSnapshot],
error: str | None = None,
success: str | None = None,
selected_website_id: int | None = None,
preview: dict[str, Any] | None = None,
preview_token: str | None = None,
upload_filename: str = "",
) -> str:
error_html = f'{escape(error)}
' if error else ""
success_html = f'{escape(success)}
' if success else ""
website_options = ['Select website ']
for website in websites:
selected = "selected" if selected_website_id == website.id else ""
website_options.append(
f'{escape(website.site_name)} (#{website.id}) '
)
website_map = {website.id: website.site_name for website in websites}
snapshot_rows = []
for snapshot in recent_snapshots:
snapshot_rows.append(
""
f"{snapshot.id} "
f"{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id}) "
f"{escape(snapshot.source_tryout_id)} "
f"{escape(snapshot.title)} "
f"{snapshot.question_count} "
f"{escape(str(snapshot.created_at))} "
f'Browse '
" "
)
snapshots_table = (
"Snapshot ID Website Tryout ID Title Questions Imported At Actions "
+ (
"".join(snapshot_rows)
if snapshot_rows
else 'No data '
)
+ "
"
)
preview_html = ""
if preview:
totals = preview.get("totals") or {}
tryout_rows = []
for tryout in preview.get("tryouts") or []:
diff = tryout.get("question_diff") or {}
warnings = "; ".join(tryout.get("warnings") or []) or "-"
tryout_rows.append(
[
tryout.get("source_tryout_id"),
tryout.get("title"),
diff.get("total_questions", 0),
diff.get("new_questions", 0),
diff.get("updated_questions", 0),
diff.get("unchanged_questions", 0),
diff.get("removed_questions", 0),
warnings,
]
)
import_form = ""
if preview_token and selected_website_id:
import_form = f"""
Import Snapshot
"""
preview_html = f"""
Preview Summary
File: {
escape(upload_filename or "uploaded JSON")
}
Tryouts{
preview.get("tryout_count", 0)
}
New Questions{
totals.get("new_questions", 0)
}
Updated Questions{
totals.get("updated_questions", 0)
}
Removed Questions{
totals.get("removed_questions", 0)
}
{
_table(
[
"Tryout ID",
"Title",
"Total",
"New",
"Updated",
"Unchanged",
"Removed",
"Warnings",
],
tryout_rows,
)
}
{import_form}
"""
return f"""
Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.
Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.
{success_html}
{error_html}
Website
{"".join(website_options)}
Tryout Export JSON
Preview Import
{preview_html}
Recent Snapshots
These are archived imports stored in PostgreSQL for traceability.
{snapshots_table}
"""
def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]:
slot_map: dict[str, int] = {}
questions = (snapshot.raw_payload or {}).get("questions") or []
for index, question in enumerate(questions, start=1):
source_question_id = str((question or {}).get("id") or "").strip()
if source_question_id:
slot_map[source_question_id] = index
return slot_map
def _snapshot_options_to_item_options(
raw_options: list[dict[str, Any]] | list[Any],
) -> dict[str, str]:
item_options: dict[str, str] = {}
for option in raw_options or []:
if not isinstance(option, dict):
continue
increment = str(option.get("increment") or "").strip().upper()
text = str(option.get("text") or option.get("label") or "").strip()
if increment and text:
item_options[increment] = text
return item_options
def _snapshot_questions_body(
snapshot: TryoutImportSnapshot,
questions: list[TryoutSnapshotQuestion],
promoted_items_by_slot: dict[int, Item],
error: str | None = None,
success: str | None = None,
) -> str:
error_html = f'{escape(error)}
' if error else ""
success_html = f'{escape(success)}
' if success else ""
slot_map = _snapshot_slot_map(snapshot)
rows = []
for question in questions:
slot = slot_map.get(question.source_question_id, 0)
promoted_item = promoted_items_by_slot.get(slot)
if promoted_item:
select_html = ""
action_html = (
f"Item #{promoted_item.id} already exists. "
f'Open in Variant Generator '
)
else:
select_html = f' '
action_html = "Ready to promote"
rows.append(
""
f"{select_html} "
f"{slot or '-'} "
f"{escape(question.source_question_id)} "
f"{escape(question.correct_answer)} "
f"{question.option_count} "
f"{'Yes' if question.is_active else 'No'} "
f"{escape(_truncate(question.question_title or question.question_html, 100))} "
f"{action_html} "
" "
)
questions_table = (
''
f' '
''
'Promote Selected as Basis Items '
"
"
'"
" "
)
return f"""
Snapshot ID: {snapshot.id} | Website: {snapshot.website_id} | Tryout: {escape(snapshot.source_tryout_id)}
Promote selected snapshot questions into the live items table as original basis items (Medium difficulty) for AI generation.
{success_html}
{error_html}
{questions_table}
Back to Tryout Import
"""
async def _recent_generation_runs(
db: AsyncSession, limit: int = 20
) -> list[AIGenerationRun]:
result = await db.execute(
select(AIGenerationRun).order_by(AIGenerationRun.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _recent_generated_variants(
db: AsyncSession,
limit: int = 100,
basis_item_id: int | None = None,
status_filter: str | None = None,
level_filter: str | None = None,
run_id_filter: int | None = None,
) -> list[Item]:
stmt = select(Item).where(Item.generated_by == "ai")
if basis_item_id is not None:
stmt = stmt.where(Item.basis_item_id == basis_item_id)
if status_filter:
stmt = stmt.where(Item.variant_status == status_filter)
if level_filter:
stmt = stmt.where(Item.level == level_filter)
if run_id_filter is not None:
stmt = stmt.where(Item.generation_run_id == run_id_filter)
result = await db.execute(
stmt.order_by(Item.created_at.desc(), Item.id.desc()).limit(limit)
)
return list(result.scalars().all())
async def _load_hierarchy_context(db: AsyncSession) -> dict[str, list[Any]]:
website_result = await db.execute(select(Website).order_by(Website.id.asc()))
snapshot_result = await db.execute(
select(TryoutImportSnapshot).order_by(
TryoutImportSnapshot.website_id.asc(),
TryoutImportSnapshot.source_tryout_id.asc(),
TryoutImportSnapshot.id.desc(),
)
)
question_result = await db.execute(
select(TryoutSnapshotQuestion).order_by(
TryoutSnapshotQuestion.website_id.asc(),
TryoutSnapshotQuestion.source_tryout_id.asc(),
TryoutSnapshotQuestion.source_question_id.asc(),
)
)
basis_result = await db.execute(
select(Item)
.where(Item.generated_by != "ai", Item.level == "sedang")
.order_by(
Item.website_id.asc(), Item.tryout_id.asc(), Item.slot.asc(), Item.id.asc()
)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai")
.order_by(Item.website_id.asc(), Item.basis_item_id.asc(), Item.id.desc())
)
run_result = await db.execute(
select(AIGenerationRun).order_by(
AIGenerationRun.basis_item_id.asc(),
AIGenerationRun.id.desc(),
)
)
return {
"websites": list(website_result.scalars().all()),
"snapshots": list(snapshot_result.scalars().all()),
"questions": list(question_result.scalars().all()),
"basis_items": list(basis_result.scalars().all()),
"variants": list(variant_result.scalars().all()),
"runs": list(run_result.scalars().all()),
}
def _append_grouped(grouped: dict[Any, list[Any]], key: Any, value: Any) -> None:
grouped.setdefault(key, []).append(value)
def _variant_status_counts_html(variants: list[Item]) -> str:
if not variants:
return 'No variants '
counts: dict[str, int] = {}
for variant in variants:
counts[variant.variant_status] = counts.get(variant.variant_status, 0) + 1
return " ".join(
f"{_status_pill(status)} {count} "
for status, count in sorted(counts.items())
)
def _hierarchy_flow_strip() -> str:
steps = (
("1", "Website", "Owner/source site"),
("2", "Snapshot", "Imported tryout export"),
("3", "Source Question", "Read-only imported question"),
("4", "Basis Item", "Promoted original parent"),
("5", "Run", "AI generation request"),
("6", "Variant", "Generated child question"),
)
return (
''
+ "".join(
f'
{step} {escape(title)} {escape(copy)}
'
for step, title, copy in steps
)
+ "
"
)
def _hierarchy_attention_html(
snapshots_without_basis: list[TryoutImportSnapshot],
basis_without_variants: list[Item],
variants_without_basis: list[Item],
basis_missing_source: list[Item],
) -> str:
rows = []
if snapshots_without_basis:
rows.append(
f'Snapshot {len(snapshots_without_basis)} snapshots have no promoted basis items yet. '
f'(e.g., {escape(snapshots_without_basis[0].title)}) '
)
if basis_without_variants:
rows.append(
f'Basis Item {len(basis_without_variants)} promoted basis items have no generated variants yet. '
f'Go to Basis Items to select an item for generation '
)
if variants_without_basis:
rows.append(
f'Variant {len(variants_without_basis)} variants are orphaned (not linked to an existing basis item). '
)
if basis_missing_source:
rows.append(
f'Basis Item {len(basis_missing_source)} basis items are missing a source snapshot question reference. '
)
if not rows:
return """
No hierarchy gaps detected in the current data.
"""
return f"""
"""
def _basis_hierarchy_item_html(
basis_item: Item,
source_question: TryoutSnapshotQuestion | None,
variants: list[Item],
runs: list[AIGenerationRun],
) -> str:
latest_run = runs[0] if runs else None
source_label = "-"
if source_question is not None:
source_label = f"{escape(source_question.source_question_id)}"
run_html = "-"
if latest_run is not None:
run_html = f"Batch #{latest_run.id} ({escape(latest_run.target_level)})"
stem_preview = escape(_truncate(_html_to_text(basis_item.stem), 120))
variant_counts = (
_variant_status_counts_html(variants)
if variants
else '0 variants '
)
target_tab = "review" if variants else "generate"
return f"""
{basis_item.slot}
{stem_preview}
{variant_counts}
Workspace
"""
def _hierarchy_view_body(context: dict[str, list[Any]]) -> str:
websites: list[Website] = context["websites"]
snapshots: list[TryoutImportSnapshot] = context["snapshots"]
questions: list[TryoutSnapshotQuestion] = context["questions"]
basis_items: list[Item] = context["basis_items"]
variants: list[Item] = context["variants"]
runs: list[AIGenerationRun] = context["runs"]
snapshots_by_website: dict[int, list[TryoutImportSnapshot]] = {}
questions_by_website: dict[int, list[TryoutSnapshotQuestion]] = {}
questions_by_snapshot: dict[int, list[TryoutSnapshotQuestion]] = {}
questions_by_id = {question.id: question for question in questions}
basis_by_website: dict[int, list[Item]] = {}
basis_by_source_question: dict[int, list[Item]] = {}
variants_by_website: dict[int, list[Item]] = {}
variants_by_basis: dict[int, list[Item]] = {}
runs_by_basis: dict[int, list[AIGenerationRun]] = {}
basis_by_id = {item.id: item for item in basis_items}
for snapshot in snapshots:
_append_grouped(snapshots_by_website, snapshot.website_id, snapshot)
for question in questions:
_append_grouped(questions_by_website, question.website_id, question)
if question.latest_snapshot_id is not None:
_append_grouped(
questions_by_snapshot, question.latest_snapshot_id, question
)
for item in basis_items:
_append_grouped(basis_by_website, item.website_id, item)
if item.source_snapshot_question_id is not None:
_append_grouped(
basis_by_source_question, item.source_snapshot_question_id, item
)
for variant in variants:
_append_grouped(variants_by_website, variant.website_id, variant)
if variant.basis_item_id is not None:
_append_grouped(variants_by_basis, variant.basis_item_id, variant)
for run in runs:
_append_grouped(runs_by_basis, run.basis_item_id, run)
snapshots_without_basis = []
for snapshot in snapshots:
snapshot_question_ids = {
question.id for question in questions_by_snapshot.get(snapshot.id, [])
}
linked_basis = [
item
for question_id in snapshot_question_ids
for item in basis_by_source_question.get(question_id, [])
]
if not linked_basis:
snapshots_without_basis.append(snapshot)
basis_without_variants = [
item for item in basis_items if not variants_by_basis.get(item.id)
]
variants_without_basis = [
item
for item in variants
if item.basis_item_id is None or item.basis_item_id not in basis_by_id
]
basis_missing_source = [
item for item in basis_items if item.source_snapshot_question_id is None
]
website_sections = []
for website in websites:
website_snapshots = snapshots_by_website.get(website.id, [])
website_questions = questions_by_website.get(website.id, [])
website_basis = basis_by_website.get(website.id, [])
website_variants = variants_by_website.get(website.id, [])
website_runs = [
run for item in website_basis for run in runs_by_basis.get(item.id, [])
]
snapshot_groups = []
for snapshot in website_snapshots:
snapshot_questions = questions_by_snapshot.get(snapshot.id, [])
snapshot_question_ids = {question.id for question in snapshot_questions}
snapshot_basis = sorted(
[
item
for question_id in snapshot_question_ids
for item in basis_by_source_question.get(question_id, [])
],
key=lambda item: (item.slot, item.id),
)
if snapshot_basis:
basis_html = (
"""
Slot
Stem Preview
Variants
Action
"""
+ "".join(
_basis_hierarchy_item_html(
item,
questions_by_id.get(item.source_snapshot_question_id),
variants_by_basis.get(item.id, []),
runs_by_basis.get(item.id, []),
)
for item in snapshot_basis
)
+ """
"""
)
else:
basis_html = 'No promoted basis items for this snapshot yet.
'
snapshot_groups.append(
f"""
Snapshot {escape(snapshot.title)}
Tryout: {escape(snapshot.source_tryout_id)} | Snapshot #{snapshot.id} | Imported: {escape(str(snapshot.created_at))}
Questions in export: {snapshot.question_count} | Current source rows: {len(snapshot_questions)} | Promoted basis items: {len(snapshot_basis)}
{basis_html}
"""
)
website_sections.append(
f"""
Website {escape(website.site_name)} | #{website.id}
{escape(website.site_url)}
Snapshots {len(website_snapshots)}
Source Questions {len(website_questions)}
Basis Items {len(website_basis)}
AI Runs {len(website_runs)}
Variants {len(website_variants)}
{"".join(snapshot_groups) if snapshot_groups else 'No tryout imports have been recorded for this website yet.
'}
"""
)
if not website_sections:
website_sections.append(
'No websites have been registered yet.
'
)
return f"""
This read-only view shows how source tryout data becomes reviewable AI-generated question variants.
{_hierarchy_flow_strip()}
{_hierarchy_attention_html(snapshots_without_basis, basis_without_variants, variants_without_basis, basis_missing_source)}
{"".join(website_sections)}
"""
async def _usage_metrics_for_items(
db: AsyncSession,
item_ids: list[int],
) -> dict[int, dict[str, float]]:
if not item_ids:
return {}
result = await db.execute(
select(
UserAnswer.item_id,
func.count(UserAnswer.id).label("impressions"),
func.count(func.distinct(UserAnswer.wp_user_id)).label("unique_users"),
)
.where(UserAnswer.item_id.in_(item_ids))
.group_by(UserAnswer.item_id)
)
metrics: dict[int, dict[str, float]] = {}
for item_id, impressions, unique_users in result.all():
impressions_i = int(impressions or 0)
unique_users_i = int(unique_users or 0)
frequency = (impressions_i / unique_users_i) if unique_users_i else 0.0
metrics[int(item_id)] = {
"impressions": float(impressions_i),
"unique_users": float(unique_users_i),
"frequency": float(frequency),
}
return metrics
async def _family_usage_stats(
db: AsyncSession,
basis_item: Item,
variants: list[Item],
) -> tuple[dict[int, dict[str, float]], dict[str, float]]:
family_item_ids = [basis_item.id] + [item.id for item in variants]
usage_metrics = await _usage_metrics_for_items(db, family_item_ids)
family_impressions = int(
sum(metric["impressions"] for metric in usage_metrics.values())
)
family_unique_users = int(
await db.scalar(
select(func.count(func.distinct(UserAnswer.wp_user_id))).where(
UserAnswer.item_id.in_(family_item_ids)
)
)
or 0
)
family_frequency = (
(family_impressions / family_unique_users) if family_unique_users else 0.0
)
return usage_metrics, {
"impressions": float(family_impressions),
"unique_users": float(family_unique_users),
"frequency": float(family_frequency),
}
def _basis_items_list_body(items: list[Item]) -> str:
rows = []
for item in items:
rows.append(
""
f"{item.id} "
f"{escape(item.tryout_id)} "
f"{item.slot} "
f"{item.website_id} "
f"{escape(_truncate(item.stem, 120))} "
f"{item.source_snapshot_question_id or '-'} "
f'Open Workspace '
" "
)
table = (
"Item ID Tryout Slot Website Stem Source Snapshot QID Actions "
+ (
"".join(rows)
if rows
else 'No basis items found. '
)
+ "
"
)
return f"""
Basis items are original parent questions (Medium difficulty, non-AI). Open a workspace to generate and review AI child variants.
{table}
"""
def _basis_item_workspace_body(
basis_item: Item,
runs: list[AIGenerationRun],
variants: list[Item],
usage_by_item: dict[int, dict[str, float]],
family_stats: dict[str, float],
filters: dict[str, str],
error: str | None = None,
success: str | None = None,
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_LLAMA,
generation_count: str = "1",
operator_notes: str = "",
include_note_for_admin: bool = True,
include_note_in_prompt: bool = False,
) -> str:
error_html = f'{escape(error)}
' if error else ""
success_html = f'{escape(success)}
' if success else ""
status_filter = filters.get("status", "")
level_filter = filters.get("level", "")
min_frequency_filter = filters.get("min_frequency", "")
run_id_filter = filters.get("run_id", "")
run_rows = [
[
run.id,
run.target_level,
run.requested_count,
run.model,
run.created_by,
str(run.created_at),
]
for run in runs
]
runs_table = _table(
["Run ID", "Target", "Requested", "Model", "Created By", "Created At"],
run_rows,
)
variant_rows = []
for item in variants:
usage = usage_by_item.get(
item.id, {"impressions": 0.0, "unique_users": 0.0, "frequency": 0.0}
)
options = item.options if isinstance(item.options, dict) else {}
options_rows = (
"".join(
f'{escape(str(key))} '
f'{escape(str(value))} '
for key, value in options.items()
)
or 'No options '
)
review_html = (
''
'Review full content '
f''
f'
Full Stem {escape(_html_to_text(item.stem))}
'
'
'
'Option Text '
f"{options_rows} "
"
"
f'
Correct Answer: {escape(item.correct_answer or "-")}
'
f'
Explanation: {escape(_html_to_text(item.explanation) or "-")}
'
"
"
" "
)
variant_rows.append(
""
f' '
f"{item.id} "
f"{item.generation_run_id or '-'} "
f"{escape(item.level)} "
f"{escape(item.variant_status)} "
f"{escape(item.ai_model or '-')} "
f"{int(usage['impressions'])} "
f"{int(usage['unique_users'])} "
f"{usage['frequency']:.2f} "
f"{escape(_truncate(_html_to_text(item.stem), 130))}{review_html} "
f"{escape(str(item.created_at))} "
" "
)
variants_table = (
f''
''
''
'Approve selected '
'Reject selected '
'Archive selected '
'Mark stale '
'Activate selected '
" "
'Apply '
"
"
' "
)
return f"""
{success_html}
{error_html}
Parent Summary
Parent Item: #{basis_item.id} |
Tryout: {escape(basis_item.tryout_id)} |
Slot: {basis_item.slot} |
Website: {basis_item.website_id} |
Source Snapshot QID: {basis_item.source_snapshot_question_id or "-"}
Family Usage: impressions={int(family_stats.get("impressions", 0.0))} ,
unique users={int(family_stats.get("unique_users", 0.0))} ,
frequency={family_stats.get("frequency", 0.0):.2f}
Stem: {escape(_truncate(_html_to_text(basis_item.stem), 260))}
Filter Variants
Filter child variants shown in the review table below.
Child Variants for This Parent
Filtered variants shown: {len(variants)}
{variants_table}
Generation Runs for This Parent
Run history is reference/audit data and is intentionally separated from variant review workflow.
{runs_table}
Back to Basis Items
"""
async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item:
result = await db.execute(
select(Item)
.where(
Item.level == "sedang",
Item.generated_by == "manual",
Item.tryout_id == "demo-tryout",
)
.order_by(Item.id.asc())
.limit(1)
)
existing_item = result.scalar_one_or_none()
if existing_item:
return existing_item
website_result = await db.execute(
select(Website).where(Website.site_url == "https://demo.local").limit(1)
)
website = website_result.scalar_one_or_none()
if website is None:
website = Website(site_url="https://demo.local", site_name="Demo Website")
db.add(website)
await db.flush()
tryout_result = await db.execute(
select(Tryout)
.where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout")
.limit(1)
)
tryout = tryout_result.scalar_one_or_none()
if tryout is None:
tryout = Tryout(
website_id=website.id,
tryout_id="demo-tryout",
name="Demo Variant Generator Tryout",
description="Seed data for the AI playground.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
item = Item(
tryout_id=tryout.tryout_id,
website_id=website.id,
slot=1,
level="sedang",
stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?",
options={
"A": "Rp260.000",
"B": "Rp300.000",
"C": "Rp320.000",
"D": "Rp360.000",
},
correct_answer="B",
explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.",
generated_by="manual",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
await db.commit()
await db.refresh(item)
return item
async def _load_websites(db: AsyncSession) -> list[Website]:
result = await db.execute(select(Website).order_by(Website.id.asc()))
return list(result.scalars().all())
async def _recent_snapshots(
db: AsyncSession, limit: int = 20
) -> list[TryoutImportSnapshot]:
result = await db.execute(
select(TryoutImportSnapshot)
.order_by(TryoutImportSnapshot.id.desc())
.limit(limit)
)
return list(result.scalars().all())
async def _ensure_operational_tryout(
snapshot: TryoutImportSnapshot, db: AsyncSession
) -> Tryout:
result = await db.execute(
select(Tryout).where(
Tryout.website_id == snapshot.website_id,
Tryout.tryout_id == snapshot.source_tryout_id,
)
)
tryout = result.scalar_one_or_none()
if tryout:
return tryout
tryout = Tryout(
website_id=snapshot.website_id,
tryout_id=snapshot.source_tryout_id,
name=snapshot.title,
description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
return tryout
async def _load_snapshot_question_context(
snapshot: TryoutImportSnapshot,
db: AsyncSession,
) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]:
question_result = await db.execute(
select(TryoutSnapshotQuestion)
.where(
TryoutSnapshotQuestion.website_id == snapshot.website_id,
TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id,
)
.order_by(TryoutSnapshotQuestion.source_question_id.asc())
)
questions = list(question_result.scalars().all())
item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()}
slot_map = _snapshot_slot_map(snapshot)
questions.sort(
key=lambda row: (
slot_map.get(row.source_question_id, 10**9),
row.source_question_id,
)
)
return questions, promoted_items_by_slot, slot_map
async def _promote_snapshot_question_to_item(
snapshot: TryoutImportSnapshot,
question: TryoutSnapshotQuestion,
db: AsyncSession,
) -> tuple[Item | None, str]:
if (
question.website_id != snapshot.website_id
or question.source_tryout_id != snapshot.source_tryout_id
):
return None, "mismatch"
slot_map = _snapshot_slot_map(snapshot)
slot = slot_map.get(question.source_question_id)
if not slot:
max_slot = (
await db.scalar(
select(func.max(Item.slot)).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.level == "sedang",
)
)
or 0
)
slot = max_slot + 1
options = _snapshot_options_to_item_options(question.raw_options)
if not options:
return None, "missing_options"
await _ensure_operational_tryout(snapshot, db)
existing_item_result = await db.execute(
select(Item).where(
Item.website_id == snapshot.website_id,
Item.tryout_id == snapshot.source_tryout_id,
Item.slot == slot,
Item.level == "sedang",
)
)
existing_item = existing_item_result.scalar_one_or_none()
if existing_item is not None:
return existing_item, "existing"
item = Item(
tryout_id=snapshot.source_tryout_id,
website_id=snapshot.website_id,
slot=slot,
level="sedang",
stem=question.question_html,
options=options,
correct_answer=question.correct_answer,
explanation=question.explanation_html,
generated_by="manual",
source_snapshot_question_id=question.id,
variant_status="active",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
return item, "created"
@router.get("", include_in_schema=False)
@router.get("/", include_in_schema=False)
async def admin_root(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
return _login_redirect()
@router.get("/login", include_in_schema=False)
async def login_view(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
body = """
Username
Password
Remember me
Sign in
Direct environment-backed admin access.
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
)
@router.post("/login", include_in_schema=False)
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
remember_me: str | None = Form(None),
):
if _admin_redis is None:
body = """
Admin backend is temporarily unavailable. Please try again.
Username
Password
Remember me
Sign in
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=503,
)
client_ip = request.client.host if request.client else "unknown"
rate_limit_key = f"{LOGIN_RATE_LIMIT_PREFIX}{client_ip}"
attempts_raw = await _admin_redis.get(rate_limit_key)
attempts = int(attempts_raw) if attempts_raw else 0
if attempts >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
body = """
Too many login attempts. Please wait a few minutes and try again.
Username
Password
Remember me
Sign in
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_429_TOO_MANY_REQUESTS,
)
if not (
secrets.compare_digest(username, settings.ADMIN_USERNAME)
and secrets.compare_digest(password, settings.ADMIN_PASSWORD)
):
attempts = await _admin_redis.incr(rate_limit_key)
if attempts == 1:
await _admin_redis.expire(rate_limit_key, LOGIN_RATE_LIMIT_WINDOW_SECONDS)
body = f"""
Invalid username or password.
Username
Password
Remember me
Sign in
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_401_UNAUTHORIZED,
)
await _admin_redis.delete(rate_limit_key)
expire = settings.ADMIN_SESSION_EXPIRE_SECONDS
response = _dashboard_redirect()
secure_cookie = settings.ENVIRONMENT == "production"
if remember_me == "on":
expire = max(expire, 3600 * 24 * 30)
response.set_cookie(
"remember_me",
"on",
expires=expire,
path="/admin",
secure=secure_cookie,
samesite="lax",
)
else:
response.delete_cookie("remember_me", path="/admin")
token = uuid.uuid4().hex
response.set_cookie(
SESSION_COOKIE,
token,
expires=expire,
path="/admin",
httponly=True,
secure=secure_cookie,
samesite="lax",
)
await _admin_redis.set(
f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire
)
return response
@router.get("/logout", include_in_schema=False)
async def logout(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if token and _admin_redis is not None:
await _admin_redis.delete(f"{SESSION_PREFIX}{token}")
response = _login_redirect()
response.delete_cookie(SESSION_COOKIE, path="/admin")
response.delete_cookie("remember_me", path="/admin")
return response
@router.get("/password", include_in_schema=False)
async def password_view(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = f"""
Signed in as {escape(admin.username)} .
Password changes are disabled in the UI for this deployment.
Update ADMIN_PASSWORD in the server environment, then restart the app.
Session expiry is currently set to {settings.ADMIN_SESSION_EXPIRE_SECONDS} seconds.
Back to dashboard
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
)
@router.post("/password", include_in_schema=False)
async def password_submit(
request: Request,
old_password: str = Form(...),
new_password: str = Form(...),
re_new_password: str = Form(...),
):
_ = (old_password, new_password, re_new_password)
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = """
Password rotation via UI is disabled.
Update ADMIN_PASSWORD in the server environment, then restart the app.
Back to dashboard
"""
return _render_auth_page(
request,
"Password Management",
"Runtime password rotation is intentionally disabled.",
body,
status_code=400,
)
@router.get("/dashboard", include_in_schema=False)
async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
# Get basic counts
tryouts_count = await db.scalar(select(func.count()).select_from(Tryout)) or 0
items_count = await db.scalar(select(func.count()).select_from(Item)) or 0
sessions_count = await db.scalar(select(func.count()).select_from(Session)) or 0
completed_count = (
await db.scalar(
select(func.count())
.select_from(Session)
.where(Session.is_completed.is_(True))
)
or 0
)
# Get websites count
websites_count = await db.scalar(select(func.count()).select_from(Website)) or 0
# Calculate completion rate
completion_rate = (
(completed_count / sessions_count * 100) if sessions_count > 0 else 0
)
# Get AI stats
try:
ai_stats = await get_ai_stats(db)
pending_review = ai_stats.get("pending_review", 0)
total_generated = ai_stats.get("total_generated", 0)
except Exception:
pending_review = 0
total_generated = 0
# Get calibration stats
try:
uncalibrated_result = await db.execute(
select(func.count().label("count"))
.select_from(Item)
.where(Item.calibrated.is_(False))
)
uncalibrated_count = uncalibrated_result.scalar() or 0
except Exception:
uncalibrated_count = 0
# Get recent sessions for activity feed
recent_sessions = await db.execute(
select(Session)
.where(Session.is_completed.is_(True))
.order_by(Session.end_time.desc())
.limit(5)
)
recent_sessions_list = list(recent_sessions.scalars().all())
# Get recent AI runs
recent_runs = await db.execute(
select(AIGenerationRun).order_by(AIGenerationRun.id.desc()).limit(3)
)
recent_runs_list = list(recent_runs.scalars().all())
# Build activity feed
activity_items = []
# Add recent session activity
for session in recent_sessions_list:
if session.end_time:
time_str = _format_relative_time(session.end_time)
activity_items.append(
f"đ¤ {escape(session.wp_user_id)} completed "
f'{escape(session.tryout_id)} '
f"({time_str})"
)
# Add recent AI activity
for run in recent_runs_list:
if run.created_at:
time_str = _format_relative_time(run.created_at)
completed = len(run.generated_items) if run.generated_items else 0
activity_items.append(
f" đ¤ AI generated {completed}/{run.requested_count} "
f'view results '
f"({time_str})"
)
activity_html = ""
if activity_items:
activity_html = f'{" ".join(activity_items[:5])} '
else:
activity_html = 'No recent activity
'
# Build alerts
alerts = []
if uncalibrated_count > 0:
alerts.append(
f''
f"â ī¸ {uncalibrated_count} questions need calibration "
f"(need more student answers to calculate difficulty)"
f"
"
)
if pending_review > 0:
alerts.append(
f''
f"đ
{pending_review} AI-generated questions pending your review "
f'
Review now '
f"
"
)
if total_generated == 0:
alerts.append(
''
"đĄ Tip: Start by importing questions or creating question templates "
"to enable AI generation"
"
"
)
alerts_html = "".join(alerts) if alerts else ""
# Build greeting based on time of day
current_hour = datetime.now().hour
if current_hour < 12:
greeting = "Good Morning"
elif current_hour < 17:
greeting = "Good Afternoon"
else:
greeting = "Good Evening"
body = f"""
{greeting}, {escape(admin.username)}! đ
Here's what's happening with your exam system today.
{alerts_html}
đ System Overview
đĨ
{completed_count}
Completed Tests
{completion_rate:.0f}% completion rate
đ
{websites_count}
Websites
đ Quick Actions
đ Recent Activity
{activity_html}
"""
return _render_admin_page(request, "IRT Bank Soal Admin", "Dashboard", body)
def _format_relative_time(dt: datetime) -> str:
"""Format datetime as relative time string."""
if dt is None:
return "Unknown"
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
diff = now - dt
seconds = diff.total_seconds()
if seconds < 60:
return "just now"
elif seconds < 3600:
minutes = int(seconds / 60)
return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
elif seconds < 86400:
hours = int(seconds / 3600)
return f"{hours} hour{'s' if hours > 1 else ''} ago"
else:
days = int(seconds / 86400)
return f"{days} day{'s' if days > 1 else ''} ago"
# ============================================================
# NEW HUMAN-FRIENDLY ROUTES
# ============================================================
@router.get("/questions", include_in_schema=False)
async def questions_view(
request: Request,
db: AsyncSession = Depends(get_db),
q: str = "",
difficulty: str = "",
status: str = "",
website_id: int | None = None,
tryout_id: str = "",
page: int = 1,
):
"""Questions bank - list all questions with working filters and pagination."""
admin = await _current_admin(request)
if not admin:
return _login_redirect()
# Build query with filters
query = select(Item)
count_query = select(func.count()).select_from(Item)
# Search filter (search in stem)
if q:
search_filter = or_(
Item.stem.ilike(f"%{q}%"),
Item.tryout_id.ilike(f"%{q}%"),
)
query = query.where(search_filter)
count_query = count_query.where(search_filter)
# Website filter
if website_id:
query = query.where(Item.website_id == website_id)
count_query = count_query.where(Item.website_id == website_id)
# Tryout filter
if tryout_id:
query = query.where(Item.tryout_id == tryout_id)
count_query = count_query.where(Item.tryout_id == tryout_id)
# Get total count before pagination
total_result = await db.execute(count_query)
total_items = total_result.scalar() or 0
# Calculate pagination
per_page = 25
total_pages = max(1, (total_items + per_page - 1) // per_page)
page = max(1, min(page, total_pages))
offset = (page - 1) * per_page
# Get paginated items
result = await db.execute(
query.order_by(Item.website_id.asc(), Item.tryout_id.asc(), Item.slot.asc())
.offset(offset)
.limit(per_page)
)
items = list(result.scalars().all())
# Get websites for filter dropdown
websites_result = await db.execute(select(Website).order_by(Website.site_name))
websites = list(websites_result.scalars().all())
# Build question rows
question_rows = []
for item in items:
# Calculate human-readable difficulty
p_value = item.ctt_p
if p_value is None:
difficulty_label = "Unknown"
difficulty_class = "difficulty-unknown"
elif p_value > 0.70:
difficulty_label = "Easy"
difficulty_class = "difficulty-easy"
elif p_value >= 0.30:
difficulty_label = "Medium"
difficulty_class = "difficulty-medium"
else:
difficulty_label = "Hard"
difficulty_class = "difficulty-hard"
# Truncate stem for preview
stem_preview = escape(_truncate(_html_to_text(item.stem or ""), 100))
question_rows.append(f"""
#{item.id}
{stem_preview}
{difficulty_label}
|
Used {item.calibration_sample_size or 0}x
|
Slot {item.slot}
{escape(item.level or "-")}
{"â
Calibrated" if item.calibrated else "âŗ Needs Data"}
View
""")
# Build pagination HTML
pagination_html = ""
if total_pages > 1:
page_links = []
for p in range(max(1, page - 2), min(total_pages + 1, page + 3)):
active_class = "active" if p == page else ""
page_links.append(
f'{p} '
)
pagination_html = f"""
"""
# Filter selects
difficulty_selected = {
"easy": 'value="easy" selected',
"medium": 'value="medium" selected',
"hard": 'value="hard" selected',
}.get(difficulty.lower(), "")
status_selected = {
"calibrated": 'value="calibrated" selected',
"uncalibrated": 'value="uncalibrated" selected',
}.get(status.lower(), "")
# Build website options
website_options = ['All Websites ']
for site in websites:
selected = "selected" if website_id == site.id else ""
website_options.append(
f'{escape(site.site_name)} '
)
table_html = (
'"
)
body = f"""
Manage your question bank. Click any question to see details and options.
All Difficulties
Easy (p > 0.70)
Medium (0.30 - 0.70)
Hard (p < 0.30)
All Status
Calibrated â
Needs Calibration âŗ
{"".join(website_options)}
Filter
Clear
{total_items} questions total
{table_html}
{pagination_html}
"""
return _render_admin_page(request, "Questions", "đ Question Bank", body)
@router.get("/questions/{item_id}", include_in_schema=False)
async def question_detail_view(
item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""Question detail view - shows full question with all options and statistics."""
admin = await _current_admin(request)
if not admin:
return _login_redirect()
# Get the item
item = await db.get(Item, item_id)
if not item:
body = """
â ī¸
Question not found
The question you're looking for doesn't exist or has been deleted.
â Back to Questions
"""
return _render_admin_page(
request, "Question Not Found", "Question Not Found", body
)
# Get tryout info
tryout_result = await db.execute(
select(Tryout).where(
Tryout.tryout_id == item.tryout_id,
Tryout.website_id == item.website_id,
)
)
tryout = tryout_result.scalar_one_or_none()
# Get website info
website_result = await db.execute(
select(Website).where(Website.id == item.website_id)
)
website = website_result.scalar_one_or_none()
# Calculate difficulty
p_value = item.ctt_p
if p_value is None:
difficulty_label = "Unknown"
difficulty_class = "difficulty-unknown"
difficulty_explanation = "Not enough data yet to determine difficulty."
elif p_value > 0.70:
difficulty_label = "Easy"
difficulty_class = "difficulty-easy"
difficulty_explanation = (
f"{p_value:.1%} of students answered correctly. This is an easy question."
)
elif p_value >= 0.30:
difficulty_label = "Medium"
difficulty_class = "difficulty-medium"
difficulty_explanation = f"{p_value:.1%} of students answered correctly. This is a medium difficulty question."
else:
difficulty_label = "Hard"
difficulty_class = "difficulty-hard"
difficulty_explanation = f"{p_value:.1%} of students answered correctly. This is a difficult question."
# Parse options from JSON
options = item.options or {}
# Build options HTML
options_html = ""
correct_key = item.correct_answer or ""
for key in sorted(options.keys()):
is_correct = key.upper() == correct_key.upper()
row_class = "correct-option" if is_correct else ""
check_mark = " â
" if is_correct else ""
options_html += f'{key}{check_mark} {str(options[key])} '
# Build stats cards
stats_html = f"""
Difficulty
{difficulty_label}
{p_value if p_value else "N/A"}
Calibration Status
{"â
Calibrated" if item.calibrated else "âŗ Needs Data"}
Sample Size
{item.calibration_sample_size or 0}
responses
IRT Difficulty (b)
{f"{item.irt_b:.2f}" if item.irt_b else "N/A"}
"""
# Context info
context_html = f"""
đ Context
Website
{escape(website.site_name if website else f"ID: {item.website_id}")}
Exam
{escape(tryout.name if tryout else item.tryout_id)}
Slot
{item.slot}
Level
{escape(item.level or "Not specified")}
Item ID
#{item.id}
Created
{escape(str(item.created_at)[:10] if item.created_at else "Unknown")}
"""
# Difficulty explanation
difficulty_info = f"""
đĄ
About Difficulty
{difficulty_explanation}
"""
body = f"""
â Back to Questions
{difficulty_info}
đ Question
{item.stem or "No question text"}
đ Answer Options
Key Answer Text
{options_html if options_html else 'No options available '}
đ Statistics
{stats_html}
{context_html}
âšī¸ What is Calibration?
A question becomes "calibrated" after many students (100+) have answered it. Once calibrated, the system can accurately measure student ability and provide adaptive testing.
The IRT parameters (difficulty, discrimination, guessing) are calculated from student response patterns.
"""
return _render_admin_page(
request, f"Question #{item_id}", "đ Question Details", body
)
@router.get("/question-quality", include_in_schema=False)
async def question_quality_view(request: Request, db: AsyncSession = Depends(get_db)):
"""Question Quality - shows calibration status with human-friendly explanations."""
admin = await _current_admin(request)
if not admin:
return _login_redirect()
# Get calibration stats by tryout
result = await db.execute(
select(
Tryout.tryout_id,
Tryout.name,
func.count(Item.id).label("total_items"),
func.sum(func.cast(Item.calibrated, Integer)).label("calibrated_items"),
)
.join(
Item,
(Tryout.tryout_id == Item.tryout_id)
& (Tryout.website_id == Item.website_id),
)
.group_by(Tryout.tryout_id, Tryout.name)
.order_by(Tryout.name)
)
tryout_stats = list(result.all())
# Calculate totals
total_items = sum(s.total_items or 0 for s in tryout_stats)
total_calibrated = sum(s.calibrated_items or 0 for s in tryout_stats)
overall_percentage = (
(total_calibrated / total_items * 100) if total_items > 0 else 0
)
# Build tryout rows
tryout_rows = []
for stat in tryout_stats:
total = stat.total_items or 0
calibrated = stat.calibrated_items or 0
percentage = (calibrated / total * 100) if total > 0 else 0
if percentage >= 90:
status = 'â
Ready '
elif percentage >= 50:
status = 'â ī¸ Partial '
else:
status = 'â Needs Data '
# Calculate bar width
bar_width = min(100, percentage)
tryout_rows.append(f"""
{escape(stat.name or stat.tryout_id)}
{total}
{calibrated}
{status}
""")
body = f"""
đ What is Question Quality?
Questions become "calibrated" after many students answer them. Well-calibrated questions give accurate student scores.
How it works: When 100+ students answer a question, we can calculate its true difficulty (p-value) and use it for adaptive testing.
{total_calibrated} of {total_items} questions calibrated
đ By Exam
Exam Name
Total Questions
Calibrated
Progress
Status
{"".join(tryout_rows) if tryout_rows else 'No exams with questions yet. '}
"""
return _render_admin_page(request, "Question Quality", "đ Question Quality", body)
@router.get("/exams", include_in_schema=False)
async def exams_view(request: Request, db: AsyncSession = Depends(get_db)):
"""Exams overview - list all exams with human-friendly display and visual cards."""
admin = await _current_admin(request)
if not admin:
return _login_redirect()
# Get all tryouts with stats
result = await db.execute(
select(Tryout)
.options(selectinload(Tryout.stats))
.order_by(Tryout.created_at.desc())
)
tryouts = list(result.scalars().all())
# Get summary stats
total_tryouts = len(tryouts)
total_participants = sum(
s.stats.participant_count if s.stats else 0 for s in tryouts
)
total_items_result = await db.execute(select(func.count(Item.id)))
total_items = total_items_result.scalar() or 0
# Build exam cards
exam_cards = []
for tryout in tryouts:
stats = tryout.stats
participant_count = stats.participant_count if stats else 0
avg_nm = stats.rataan if stats else None
std_nm = stats.std if stats else None
min_nm = stats.minimum if stats else None
max_nm = stats.maximum if stats else None
# Get item count
items_result = await db.execute(
select(func.count(Item.id)).where(
Item.tryout_id == tryout.tryout_id, Item.website_id == tryout.website_id
)
)
item_count = items_result.scalar() or 0
# Get calibrated items count
calibrated_result = await db.execute(
select(func.count(Item.id)).where(
Item.tryout_id == tryout.tryout_id,
Item.website_id == tryout.website_id,
Item.calibrated == True,
)
)
calibrated_count = calibrated_result.scalar() or 0
# Scoring mode badge with colors
mode_colors = {
"ctt": ("CTT", "background: #dbeafe; color: #1e40af;", "đ"),
"irt": ("IRT", "background: #fce7f3; color: #9d174d;", "đ"),
"hybrid": ("Hybrid", "background: #fef3c7; color: #92400e;", "đ"),
}
mode_info = mode_colors.get(
tryout.scoring_mode, (tryout.scoring_mode.upper(), "", "đ")
)
mode_badge = f'{mode_info[2]} {mode_info[0]} '
# Calibration progress
calibration_pct = (calibrated_count / item_count * 100) if item_count > 0 else 0
calibration_color = (
"#10b981"
if calibration_pct >= 90
else "#f59e0b"
if calibration_pct >= 50
else "#ef4444"
)
exam_cards.append(f"""
ID: {escape(tryout.tryout_id)}
đ
{item_count}
Questions
đĨ
{participant_count}
Students
đ
{"N/A" if avg_nm is None else f"{avg_nm:.0f}"}
Avg Score
Score Range:
{"N/A" if min_nm is None else f"{min_nm:.0f}"} - {"N/A" if max_nm is None else f"{max_nm:.0f}"}
Std Dev:
{"N/A" if std_nm is None else f"{std_nm:.1f}"}
""")
# Summary cards
summary_html = f"""
đ
{total_tryouts}
Total Exams
đĨ
{total_participants}
Total Students
đ
{total_items}
Total Questions
"""
body = f"""
View and manage your exams. Each exam shows student statistics and calibration progress.
{summary_html}
All Exams
{"".join(exam_cards) if exam_cards else '
'}
"""
return _render_admin_page(request, "Exams", "đ Exams", body)
@router.get("/reports", include_in_schema=False)
async def reports_view(request: Request, db: AsyncSession = Depends(get_db)):
"""Reports dashboard - human-friendly report access with quick stats."""
admin = await _current_admin(request)
if not admin:
return _login_redirect()
# Get quick stats for the overview
items_result = await db.execute(select(func.count(Item.id)))
total_items = items_result.scalar() or 0
calibrated_result = await db.execute(
select(func.count(Item.id)).where(Item.calibrated == True)
)
calibrated_items = calibrated_result.scalar() or 0
sessions_result = await db.execute(select(func.count(Session.id)))
total_sessions = sessions_result.scalar() or 0
calibration_pct = (calibrated_items / total_items * 100) if total_items > 0 else 0
body = f"""
Access detailed analysis reports for your exams, questions, and students.
đ
{total_items}
Total Questions
â
{calibrated_items}
Calibrated ({calibration_pct:.0f}%)
đ
{total_sessions}
Student Sessions
Analysis Reports
Quick Actions
"""
return _render_admin_page(request, "Reports", "đ Reports", body)
@router.get("/settings", include_in_schema=False)
async def settings_view(request: Request, db: AsyncSession = Depends(get_db)):
"""Settings dashboard - access to configuration pages."""
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = f"""
Configuration
Account
System Information
đ
Session Timeout
{settings.ADMIN_SESSION_EXPIRE_SECONDS}s
"""
return _render_admin_page(request, "Settings", "âī¸ Settings", body)
# ============================================================
# LEGACY ROUTES (backward compatibility)
# ============================================================
@router.get("/hierarchy", include_in_schema=False)
async def hierarchy_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
context = await _load_hierarchy_context(db)
body = _hierarchy_view_body(context)
return _render_admin_page(request, "Data Hierarchy", "Data Hierarchy", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page(request, "Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page(request, "Websites", "Websites", body)
@router.get("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_view(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page(request, "Websites", "Websites", body)
body = _website_edit_form_body(website)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page(request, "Websites", "Websites", body)
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
body = _website_edit_form_body(
website,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
if not normalized_url.startswith(("http://", "https://")):
body = _website_edit_form_body(
website,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
website.site_name = normalized_name
website.site_url = normalized_url
try:
await db.commit()
except IntegrityError:
await db.rollback()
body = _website_edit_form_body(
website,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
await db.refresh(website)
body = _website_edit_form_body(
website,
success=f"Website #{website.id} updated successfully.",
)
return _render_admin_page(request, "Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/delete", include_in_schema=False)
async def website_delete_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page(request, "Websites", "Websites", body)
deleted_label = f"{website.site_name} ({website.site_url})"
await db.delete(website)
await db.commit()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website deleted successfully: {deleted_label}",
)
return _render_admin_page(request, "Websites", "Websites", body)
@router.get("/tryout-import", include_in_schema=False)
async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(websites, snapshots)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
@router.post("/tryout-import/preview", include_in_schema=False)
async def tryout_import_preview(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
file: UploadFile = File(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
if not file.filename or not file.filename.lower().endswith(".json"):
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be .json format.",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
try:
payload_bytes = await file.read()
payload_text = payload_bytes.decode("utf-8")
payload = json.loads(payload_text)
except UnicodeDecodeError:
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be UTF-8 encoded JSON.",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
except json.JSONDecodeError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Invalid JSON file: {exc}",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
try:
preview = await preview_tryout_json_import(payload, website_id, db)
except TryoutImportError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
preview_token = uuid.uuid4().hex
await _admin_redis.set(
f"{IMPORT_PREVIEW_PREFIX}{preview_token}",
payload_text,
ex=IMPORT_PREVIEW_TTL_SECONDS,
)
body = _tryout_import_form_body(
websites,
snapshots,
selected_website_id=website_id,
preview=preview,
preview_token=preview_token,
upload_filename=file.filename or "",
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
@router.post("/tryout-import", include_in_schema=False)
async def tryout_import_submit(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
preview_token: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
if not payload_text:
body = _tryout_import_form_body(
websites,
snapshots,
error="Preview token expired. Upload the JSON again and preview before importing.",
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
try:
payload = json.loads(payload_text)
result = await import_tryout_json_snapshot(payload, website_id, db)
await db.commit()
except TryoutImportError as exc:
await db.rollback()
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
except Exception:
await db.rollback()
raise
finally:
await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
updated_snapshots = await _recent_snapshots(db)
imported_tryouts = result.get("imported_tryouts") or []
imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts)
body = _tryout_import_form_body(
websites,
updated_snapshots,
success=(
f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)."
),
selected_website_id=website_id,
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
@router.get("/snapshot-questions", include_in_schema=False)
async def snapshot_questions_view(
request: Request,
snapshot_id: int,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(
snapshot, db
)
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot)
return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body)
@router.post("/snapshot-questions/promote-bulk", include_in_schema=False)
async def snapshot_question_promote_bulk(
request: Request,
snapshot_id: int = Form(...),
snapshot_question_ids: list[int] | None = Form(None),
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
if snapshot is None:
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Snapshot not found: {snapshot_id}",
)
return _render_admin_page(request, "Tryout Import", "Tryout Import", body)
if not snapshot_question_ids:
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(
snapshot, db
)
body = _snapshot_questions_body(
snapshot,
questions,
promoted_items_by_slot,
error="Select at least one snapshot question to promote.",
)
return _render_admin_page(
request, "Snapshot Questions", "Snapshot Questions", body
)
question_result = await db.execute(
select(TryoutSnapshotQuestion).where(
TryoutSnapshotQuestion.id.in_(snapshot_question_ids)
)
)
selected_questions = list(question_result.scalars().all())
created_items: list[Item] = []
existing_items: list[Item] = []
missing_option_count = 0
mismatch_count = 0
for question in selected_questions:
item, status = await _promote_snapshot_question_to_item(snapshot, question, db)
if status == "created" and item is not None:
created_items.append(item)
elif status == "existing" and item is not None:
existing_items.append(item)
elif status == "missing_options":
missing_option_count += 1
elif status == "mismatch":
mismatch_count += 1
await db.commit()
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(
snapshot, db
)
success_parts = []
if created_items:
success_parts.append(f"created {len(created_items)} item(s)")
if existing_items:
success_parts.append(f"reused {len(existing_items)} existing item(s)")
if missing_option_count:
success_parts.append(
f"skipped {missing_option_count} question(s) with missing option text"
)
if mismatch_count:
success_parts.append(f"skipped {mismatch_count} mismatched question(s)")
success_message = "Bulk promote finished: " + ", ".join(success_parts) + "."
if created_items:
success_message += f" Latest basis item ID: {created_items[-1].id}."
body = _snapshot_questions_body(
snapshot, questions, promoted_items_by_slot, success=success_message
)
return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id)
)
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f"{status['calibration_percentage']:.2f}%",
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
[
"Tryout ID",
"Name",
"Total Items",
"Calibrated",
"Calibration %",
"Ready for IRT",
],
rows,
)
return _render_admin_page(request, "Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(
select(Item).where(Item.level == level).order_by(Item.slot).limit(10)
)
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = (
sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
)
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
[
"Level",
"Total Items",
"Calibrated",
"Calibration %",
"Responses",
"Avg Correctness",
],
rows,
)
return _render_admin_page(request, "Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Session).order_by(Session.created_at.desc()).limit(50)
)
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
[
"Session ID",
"WP User",
"Tryout",
"Completed",
"Mode",
"Benar",
"NM",
"NN",
"Theta",
],
rows,
)
return _render_admin_page(request, "Session Overview", "Session Overview", body)
@router.get("/basis-items", include_in_schema=False)
async def basis_items_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Item)
.where(Item.level == "sedang", Item.generated_by != "ai")
.order_by(Item.updated_at.desc(), Item.id.desc())
.limit(200)
)
basis_items = list(result.scalars().all())
body = _basis_items_list_body(basis_items)
return _render_admin_page(request, "Basis Items", "Basis Items", body)
@router.get("/basis-items/{basis_item_id}", include_in_schema=False)
async def basis_item_workspace_view(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
status_filter = (request.query_params.get("status") or "").strip()
level_filter = (request.query_params.get("level") or "").strip()
run_id_filter = (request.query_params.get("run_id") or "").strip()
min_frequency_filter = (request.query_params.get("min_frequency") or "").strip()
filters = {
"status": status_filter,
"level": level_filter,
"run_id": run_id_filter,
"min_frequency": min_frequency_filter,
}
basis_item = await db.get(Item, basis_item_id)
if (
basis_item is None
or basis_item.generated_by == "ai"
or basis_item.level != "sedang"
):
result = await db.execute(
select(Item)
.where(Item.level == "sedang", Item.generated_by != "ai")
.order_by(Item.updated_at.desc(), Item.id.desc())
.limit(200)
)
body = _basis_items_list_body(list(result.scalars().all()))
return _render_admin_page(request, "Basis Items", "Basis Items", body)
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
runs = list(run_result.scalars().all())
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
variants_all = list(variant_result.scalars().all())
variants = variants_all
if status_filter:
variants = [item for item in variants if item.variant_status == status_filter]
if level_filter in {"mudah", "sulit"}:
variants = [item for item in variants if item.level == level_filter]
if run_id_filter.isdigit():
rid = int(run_id_filter)
variants = [item for item in variants if item.generation_run_id == rid]
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
if min_frequency_filter:
try:
min_freq = float(min_frequency_filter)
variants = [
item
for item in variants
if usage_metrics.get(item.id, {}).get("frequency", 0.0) >= min_freq
]
except ValueError:
pass
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
)
return _render_admin_page(
request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
@router.post("/basis-items/{basis_item_id}/generate", include_in_schema=False)
async def basis_item_generate_submit(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
target_level: str = Form(...),
ai_model: str = Form(""),
generation_count: int = Form(1),
operator_notes: str = Form(""),
include_note_for_admin: str | None = Form(None),
include_note_in_prompt: str | None = Form(None),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
basis_item = await db.get(Item, basis_item_id)
if (
basis_item is None
or basis_item.generated_by == "ai"
or basis_item.level != "sedang"
):
return RedirectResponse(
url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER
)
# Llama-only policy for production quality consistency.
ai_model = settings.OPENROUTER_MODEL_LLAMA
note_for_admin = include_note_for_admin == "on"
note_in_prompt = include_note_in_prompt == "on"
if not settings.OPENROUTER_API_KEY:
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(
db, basis_item, variants
)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
error="OPENROUTER_API_KEY is not configured.",
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
operator_notes=operator_notes,
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(
request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
if target_level not in {"mudah", "sulit"}:
return RedirectResponse(
url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER
)
if generation_count < 1 or generation_count > 50:
return RedirectResponse(
url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER
)
run_id = await create_generation_run(
basis_item_id=basis_item.id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
target_level=target_level,
requested_count=generation_count,
model=ai_model,
created_by=admin.username,
operator_notes=(operator_notes.strip() or None) if note_for_admin else None,
db=db,
)
generated = await generate_questions_batch(
basis_item=basis_item,
target_level=target_level,
ai_model=ai_model,
count=generation_count,
operator_notes=operator_notes if note_in_prompt else None,
)
from app.schemas.ai import GeneratedQuestion
saved = 0
for generated_question in generated:
item_id = await save_ai_question(
generated_data=GeneratedQuestion(
stem=generated_question.stem,
options=generated_question.options,
correct=generated_question.correct,
explanation=generated_question.explanation or None,
),
tryout_id=basis_item.tryout_id,
website_id=basis_item.website_id,
basis_item_id=basis_item.id,
slot=basis_item.slot,
level=target_level,
ai_model=ai_model,
generation_run_id=run_id,
source_snapshot_question_id=basis_item.source_snapshot_question_id,
variant_status="draft",
db=db,
)
if item_id:
saved += 1
await db.commit()
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
status_message = (
f"Run #{run_id} failed to produce savable variants. "
f"Requested={generation_count}, Generated={len(generated)}, Saved={saved}. "
"Check model output/credentials and server logs."
if saved == 0
else f"Run #{run_id} finished. Requested={generation_count}, Generated={len(generated)}, Saved={saved}."
)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
error=status_message if saved == 0 else None,
success=status_message if saved > 0 else None,
target_level=target_level,
ai_model=ai_model,
generation_count=str(generation_count),
include_note_for_admin=note_for_admin,
include_note_in_prompt=note_in_prompt,
)
return _render_admin_page(
request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
@router.post("/basis-items/{basis_item_id}/review-bulk", include_in_schema=False)
async def basis_item_review_bulk(
basis_item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
item_ids: list[int] = Form([]),
action: str = Form(...),
):
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
admin = await _current_admin(request)
if not admin:
return _login_redirect()
basis_item = await db.get(Item, basis_item_id)
if basis_item is None:
return RedirectResponse(
url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER
)
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
if action in valid_actions and item_ids:
result = await db.execute(
select(Item).where(
Item.id.in_(item_ids),
Item.generated_by == "ai",
Item.basis_item_id == basis_item.id,
)
)
items = list(result.scalars().all())
reviewed_at = datetime.now(timezone.utc)
for item in items:
item.variant_status = action
item.reviewed_by = admin.username
item.reviewed_at = reviewed_at
await db.commit()
run_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == basis_item.id)
.order_by(AIGenerationRun.id.desc())
.limit(50)
)
variant_result = await db.execute(
select(Item)
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(300)
)
runs = list(run_result.scalars().all())
variants = list(variant_result.scalars().all())
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
body = _basis_item_workspace_body(
basis_item,
runs,
variants,
usage_metrics,
family_stats,
filters,
success=f"Applied status '{action}' to selected variants.",
)
return _render_admin_page(
request,
f"Basis Item #{basis_item.id}",
f"Basis Item Workspace #{basis_item.id}",
body,
)
AI_PLAYGROUND_TABS = (
("generate", "Generate"),
("review", "Review Queue"),
("runs", "Batches"),
)
AI_VARIANT_STATUSES = ("draft", "approved", "active", "rejected", "archived", "stale")
AI_VARIANT_LEVELS = ("mudah", "sulit")
def _selected_option(value: str, selected_value: str) -> str:
return "selected" if value == selected_value else ""
def _ai_tab_nav(item_id: int, active_tab: str) -> str:
links = []
for tab, label in AI_PLAYGROUND_TABS:
active_class = "active" if tab == active_tab else ""
aria = ' aria-current="page"' if tab == active_tab else ""
links.append(
f'{escape(label)} '
)
return f'{"".join(links)} '
def _status_pill(status: str | None) -> str:
value = status or "unknown"
css_value = re.sub(r"[^a-z0-9_-]+", "-", value.lower())
return (
f'{escape(value)} '
)
def _ai_status_strip(
key_configured: bool,
stats: dict[str, Any],
generation_runs: list[AIGenerationRun],
generation_summary: dict[str, Any] | None = None,
) -> str:
latest_run = "-"
latest_saved = "-"
if generation_summary:
latest_run = str(generation_summary.get("run_id", "-"))
latest_saved = str(len(generation_summary.get("saved_item_ids") or []))
elif generation_runs:
latest_run = str(generation_runs[0].id)
return f"""
OpenRouter {"Yes" if key_configured else "No"}
AI Items {stats.get("total_ai_items", 0)}
Latest Batch {escape(latest_run)}
Saved {escape(latest_saved)}
"""
def _ai_generation_summary(generation_summary: dict[str, Any] | None) -> str:
if not generation_summary:
return ""
saved_item_ids = generation_summary.get("saved_item_ids") or []
return f"""
Batch ID {generation_summary.get("run_id", "-")}
Requested {generation_summary.get("requested_count", 0)}
Generated {generation_summary.get("generated_count", 0)}
Saved {len(saved_item_ids)}
"""
def _ai_generate_tab(
item: Item,
generation_summary: dict[str, Any] | None,
target_level: str,
ai_model: str,
generation_count: str,
operator_notes: str,
include_note_for_admin: bool,
include_note_in_prompt: bool,
) -> str:
full_stem = escape(_html_to_text(item.stem))
basis_selection_html = f"""
Basis Item Context
Tryout: {escape(str(item.tryout_id))} | Slot: {item.slot} | ID: #{item.id}
"{full_stem}"
"""
return f"""
{_ai_generation_summary(generation_summary)}
{basis_selection_html}
"""
def _ai_runs_tab(
item: Item,
generation_runs: list[AIGenerationRun],
generation_summary: dict[str, Any] | None,
) -> str:
rows = []
for run in generation_runs:
rows.append(
""
f"{run.id} "
f"{run.basis_item_id} "
f"{escape(run.target_level)} "
f"{run.requested_count} "
f"{escape(_truncate(run.model, 54))} "
f"{escape(run.created_by)} "
f"{escape(str(run.created_at))} "
f'Review '
" "
)
table = (
'Batch ID Basis Item Target Requested Model Created By Created At Action '
+ (
"".join(rows)
if rows
else 'No generation batches yet. '
)
+ "
"
)
return f"""
{_ai_generation_summary(generation_summary)}
{table}
"""
def _ai_review_tab(
item: Item,
generated_variants: list[Item],
status_filter: str,
level_filter: str,
run_id_filter: str,
) -> str:
status_options = ['All statuses ']
for status in AI_VARIANT_STATUSES:
status_options.append(
f'{status} '
)
level_options = ['All levels ']
for level in AI_VARIANT_LEVELS:
level_options.append(
f'{level} '
)
variant_rows = []
for item in generated_variants:
stem_preview = _truncate(_html_to_text(item.stem), 120)
variant_rows.append(
""
f' '
f"{item.id} "
f"{item.generation_run_id or '-'} "
f"{item.basis_item_id or '-'} "
f"{escape(item.level)} "
f"{_status_pill(item.variant_status)} "
f"{escape(_truncate(item.ai_model or '-', 42))} "
f"{escape(stem_preview)} "
f"{escape(str(item.created_at))} "
f'View '
" "
)
variant_table_rows = (
"".join(variant_rows)
if variant_rows
else 'No AI-generated variants match this view. '
)
return f"""
"""
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
item: Item,
error: str | None = None,
success: str | None = None,
generation_summary: dict[str, Any] | None = None,
generation_runs: list[AIGenerationRun] | None = None,
generated_variants: list[Item] | None = None,
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_LLAMA,
generation_count: str = "1",
operator_notes: str = "",
include_note_for_admin: bool = True,
include_note_in_prompt: bool = False,
active_tab: str = "generate",
variant_status_filter: str = "",
variant_level_filter: str = "",
variant_run_id_filter: str = "",
) -> str:
error_html = f'{escape(error)}
' if error else ""
success_html = f'{escape(success)}
' if success else ""
generation_runs = generation_runs or []
generated_variants = generated_variants or []
if active_tab not in {tab for tab, _ in AI_PLAYGROUND_TABS}:
active_tab = "generate"
tab_html = {
"generate": _ai_generate_tab(
item,
generation_summary,
target_level,
ai_model,
generation_count,
operator_notes,
include_note_for_admin,
include_note_in_prompt,
),
"review": _ai_review_tab(
item,
generated_variants,
variant_status_filter,
variant_level_filter,
variant_run_id_filter,
),
"runs": _ai_runs_tab(item, generation_runs, generation_summary),
}[active_tab]
return f"""
{_ai_status_strip(key_configured, stats, generation_runs, generation_summary)}
{success_html}
{error_html}
{_ai_tab_nav(item.id, active_tab)}
{tab_html}
"""
def _options_table(options: Any, correct_answer: str | None) -> str:
normalized_correct = str(correct_answer or "").strip().upper()
rows = []
if isinstance(options, dict):
options_by_key = {
str(key).strip().upper(): value for key, value in options.items()
}
option_keys = [key for key in ("A", "B", "C", "D") if key in options_by_key]
option_keys.extend(
sorted(key for key in options_by_key.keys() if key not in option_keys)
)
for key in option_keys:
value = options_by_key.get(key)
row_class = (
' class="correct-option"'
if str(key).upper() == normalized_correct
else ""
)
rows.append(
f""
f'{escape(str(key).upper())} '
f"{escape(_html_to_text(str(value)))} "
" "
)
else:
rows.append(
f'{escape(_html_to_text(str(options or "")))} '
)
return (
'Option Text '
+ (
"".join(rows)
if rows
else 'No options stored. '
)
+ "
"
)
def _ai_variant_detail_body(variant: Item, basis_item: Item | None) -> str:
explanation = _html_to_text(variant.explanation) if variant.explanation else "-"
basis_preview = "-"
if basis_item is not None:
basis_preview = (
f"#{basis_item.id} | Tryout {escape(str(basis_item.tryout_id))} | "
f"Slot {basis_item.slot} | {escape(_truncate(_html_to_text(basis_item.stem), 160))}"
)
review_url = f"/admin/questions/{variant.basis_item_id}/generate?tab=review" if variant.basis_item_id else "/admin/basis-items"
if variant.generation_run_id:
review_url = f"{review_url}&run_id={variant.generation_run_id}"
return f"""
Item {variant.id}
Batch {variant.generation_run_id or "-"}
Level {escape(variant.level)}
Status {escape(variant.variant_status)}
Question
{escape(_html_to_text(variant.stem))}
Answer Options
{_options_table(variant.options, variant.correct_answer)}
Correct Answer
{escape(variant.correct_answer)}
Pembahasan
{escape(explanation)}
Generation Context
Basis item: {basis_preview}
Model: {escape(variant.ai_model or "-")}
Created at: {escape(str(variant.created_at))}
Approve this item
Reject this item
Archive this item
Mark stale
Activate this item
Apply
Back to Review Queue
"""
@router.get("/questions/{item_id}/generate")
async def question_generate_view(
request: Request,
item_id: int,
tab: str = "generate",
status: str = "",
level: str = "",
run_id: str = "",
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item).where(Item.id == item_id))
item = result.scalar_one_or_none()
if not item:
return RedirectResponse(url="/admin/questions", status_code=HTTP_303_SEE_OTHER)
stats = await get_ai_stats(db)
# Fetch runs and variants specific to this item
runs_result = await db.execute(
select(AIGenerationRun)
.where(AIGenerationRun.basis_item_id == item.id)
.order_by(AIGenerationRun.created_at.desc())
)
generation_runs = list(runs_result.scalars().all())
stmt = select(Item).where(
Item.basis_item_id == item.id,
Item.variant_status != None,
)
if status:
stmt = stmt.where(Item.variant_status == status)
if level:
stmt = stmt.where(Item.level == level)
if run_id and run_id.isdigit():
stmt = stmt.where(Item.generation_run_id == int(run_id))
stmt = stmt.order_by(Item.created_at.desc())
variants_result = await db.execute(stmt)
generated_variants = list(variants_result.scalars().all())
body = _ai_form_body(
key_configured=bool(settings.OPENROUTER_API_KEY),
stats=stats,
item=item,
generation_runs=generation_runs,
generated_variants=generated_variants,
target_level="mudah",
ai_model=settings.OPENROUTER_MODEL_LLAMA,
generation_count="1",
operator_notes="",
include_note_for_admin=True,
include_note_in_prompt=False,
active_tab=tab,
variant_status_filter=status,
variant_level_filter=level,
variant_run_id_filter=run_id,
)
return _render_admin_page(
request, f"AI Workflow: #{item.id}", f"AI Workflow for #{item.id}", body
)
@router.post("/questions/{item_id}/generate")
async def question_generate_submit(
request: Request,
item_id: int,
db: AsyncSession = Depends(get_db),
target_level: str = Form("mudah"),
ai_model: str = Form(settings.OPENROUTER_MODEL_LLAMA),
generation_count: str = Form("1"),
operator_notes: str = Form(""),
include_note_for_admin: bool = Form(True),
include_note_in_prompt: bool = Form(False),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item).where(Item.id == item_id))
item = result.scalar_one_or_none()
if not item:
return RedirectResponse(url="/admin/questions", status_code=HTTP_303_SEE_OTHER)
if not settings.OPENROUTER_API_KEY:
return RedirectResponse(url=f"/admin/questions/{item.id}/generate?error=API key missing", status_code=HTTP_303_SEE_OTHER)
count = int(generation_count) if generation_count.isdigit() else 1
from app.services.ai_playground_generator import generate_variants_for_item
try:
run_id, generated = await generate_variants_for_item(
db=db,
item=item,
target_level=target_level,
ai_model=ai_model,
num_variants=count,
operator_notes=operator_notes,
include_note_for_admin=include_note_for_admin,
include_note_in_prompt=include_note_in_prompt,
)
except Exception as e:
return RedirectResponse(url=f"/admin/questions/{item.id}/generate?error={str(e)}", status_code=HTTP_303_SEE_OTHER)
saved_item_ids: list[int] = []
from app.schemas.ai import GeneratedQuestion
from app.services.ai_playground_generator import save_ai_question
for generated_question in generated:
item_id_saved = await save_ai_question(
generated_data=GeneratedQuestion(
stem=generated_question.stem,
options=generated_question.options,
correct=generated_question.correct,
explanation=generated_question.explanation or None,
),
tryout_id=item.tryout_id,
website_id=item.website_id,
basis_item_id=item.id,
slot=item.slot,
level=target_level,
ai_model=ai_model,
generation_run_id=run_id,
source_snapshot_question_id=item.source_snapshot_question_id,
variant_status="draft",
db=db,
)
if item_id_saved:
saved_item_ids.append(item_id_saved)
await db.commit()
return RedirectResponse(
url=f"/admin/questions/{item.id}/generate?tab=review&run_id={run_id}",
status_code=HTTP_303_SEE_OTHER,
)
@router.get("/questions/{item_id}/generate/variants/{variant_id}")
async def ai_playground_variant_detail(
item_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(
select(Item).where(Item.id == item_id, Item.generated_by == "ai")
)
variant = result.scalar_one_or_none()
if variant is None:
body = """
Generated variant was not found.
Back to Review Queue
"""
return _render_admin_page(
request, "Generated Variant", "Generated Variant", body
)
basis_item = None
if variant.basis_item_id:
basis_item = await db.get(Item, variant.basis_item_id)
body = _ai_variant_detail_body(variant, basis_item)
return _render_admin_page(
request,
f"Generated Variant #{variant.id}",
f"Generated Variant #{variant.id}",
body,
)
@router.post("/questions/{item_id}/generate/review-bulk")
async def question_generate_review_bulk(
request: Request,
item_id: int,
db: AsyncSession = Depends(get_db),
item_ids: list[int] = Form([]),
action: str = Form(...),
tab: str = "review",
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
if action not in valid_actions:
return RedirectResponse(url=f"/admin/questions/{item_id}/generate?tab={tab}&error=Invalid action", status_code=HTTP_303_SEE_OTHER)
if not item_ids:
return RedirectResponse(url=f"/admin/questions/{item_id}/generate?tab={tab}&error=No items selected", status_code=HTTP_303_SEE_OTHER)
result = await db.execute(select(Item).where(Item.id.in_(item_ids)))
variants = list(result.scalars().all())
now = datetime.now(timezone.utc)
for v in variants:
v.variant_status = action
v.reviewed_by = admin.username
v.reviewed_at = now
v.updated_at = now
await db.commit()
return RedirectResponse(
url=f"/admin/questions/{item_id}/generate?tab={tab}&success=Successfully applied {action} to {len(variants)} variants.",
status_code=HTTP_303_SEE_OTHER,
)
@router.get("/tryout/list", include_in_schema=False)
@router.get("/item/list", include_in_schema=False)
@router.get("/user/list", include_in_schema=False)
@router.get("/session/list", include_in_schema=False)
@router.get("/tryoutstats/list", include_in_schema=False)
async def legacy_admin_paths(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
return _dashboard_redirect()